专注Java教育14年 全国咨询/投诉热线:400-8080-105
动力节点LOGO图
始于2009,口口相传的Java黄埔军校
首页 hot资讯 Dubbo泛化调用示例

Dubbo泛化调用示例

更新时间:2022-03-18 12:04:45 来源:动力节点 浏览1288次

简单的介绍

通用接口调用方式主要用于没有API接口和模型类元素的客户端,所有...中的参数和返回值POJO都使用Map Express。使用泛化调用时,服务提供者应用没有特殊操作,服务消费者应用不再需要引入服务提供者 SDK 两方包。

适用于 API Gateway 服务、框架集成等场景,提供一个 Dubbo 统一的服务管理平台,让所有消费者调用统一管理平台中注册的服务,无需引入 SDK 双方库。

使用示例

public interface GreetingService {
   String sayHello(String name);
}
/**
* adopt API The method uses generalization to call
*/
@Test
public void test() {
   // Application configuration
   ApplicationConfig applicationConfig = new ApplicationConfig();
   applicationConfig.setName("dubbo-consumer");​
   // Registry configuration
   RegistryConfig registryConfig = new RegistryConfig();
   registryConfig.setAddress("zookeeper://127.0.0.1:2181");​
   // Reference remote service
   ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
   reference.setApplication(applicationConfig);
   reference.setRegistry(registryConfig);
   reference.setInterface("com.xxx.GreetingService");
   reference.setGeneric(true);​
   // obtain GenericService, Instead of
   ReferenceConfigCache cache = ReferenceConfigCache.getCache();
   GenericService genericService = cache.get(reference);​
   // Call the service
   String[] parameterTypes = new String[] { "java.lang.String" };
   Object[] args = Stream.of("xiaoming").toArray();
   Object result = genericService.$invoke("sayHello", parameterTypes, args);
   System.out.println("result = " + result);
}

源码分析

服务消费者

在 dubbo 的责任链中,GenericImplFilter 会拦截泛化调用,检查参数,并发起 RPC 调用。

org.apache.dubbo.rpc.filter.GenericImplFilter#invoke
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
   String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);   
// Determine whether it is a generalization call
   if (invocation.getMethodName().equals(Constants.$INVOKE)
       && invocation.getArguments() != null
       && invocation.getArguments().length == 3
       && ProtocolUtils.isGeneric(generic)) {
       // Call parameters
       Object[] args = (Object[]) invocation.getArguments()[2];       
       if (ProtocolUtils.isJavaGenericSerialization(generic)) {
           for (Object arg : args) {
               if (!(byte[].class == arg.getClass())) {
                   error(generic, byte[].class.getName(), arg.getClass().getName());
              }
          }
      } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
           for (Object arg : args) {
               if (!(arg instanceof JavaBeanDescriptor)) {
                   error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
              }
          }
      }
       // Pass generic call mode parameters , So that the service provider can analyze the request parameters and other data
      ((RpcInvocation) invocation).setAttachment(
           Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
  }   
   // launch RPC call
   return invoker.invoke(invocation);
}

服务提供者

服务提供者使用GenericFilter拦截请求,反序列化并解析泛化参数,将请求转发给具体的服务提供者实现类对象进行业务执行。

org.apache.dubbo.rpc.filter.GenericFilter#invoke
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {   
   // Generalization call
   if (inv.getMethodName().equals(Constants.$INVOKE)
       && inv.getArguments() != null
       && inv.getArguments().length == 3
       && !GenericService.class.isAssignableFrom(invoker.getInterface())) {      
       // parameter information
       String name = ((String) inv.getArguments()[0]).trim();
       String[] types = (String[]) inv.getArguments()[1];
       Object[] args = (Object[]) inv.getArguments()[2];
       try {
           // Get call method
           Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
           Class<?>[] params = method.getParameterTypes();
           if (args == null) {
               args = new Object[params.length];
          }
           String generic = inv.getAttachment(Constants.GENERIC_KEY);
​
           if (StringUtils.isBlank(generic)) {
               generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
          }​
           // The generalized type is null , Use the default deserialization method
           if (StringUtils.isEmpty(generic)
               || ProtocolUtils.isDefaultGenericSerialization(generic)) {
               args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
           // nativejava Deserialization method
          } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
               for (int i = 0; i < args.length; i++) {
                   if (byte[].class == args[i].getClass()) {
                       try(UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
                           args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                              .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                              .deserialize(null, is).readObject();
                      } catch (Exception e) {
                           throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
                      }
                  } else {
                       throw new RpcException(...);
                  }
              }
           // bean Deserialization method
          } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
               for (int i = 0; i < args.length; i++) {
                   if (args[i] instanceof JavaBeanDescriptor) {
                       args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                  } else {
                       throw new RpcException(...));
                  }
              }
          }          
           // Perform specific services
           Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
           if (result.hasException()
               && !(result.getException() instanceof GenericException)) {
               return new RpcResult(new GenericException(result.getException()));
          }           
           // Serialize the result and return
           if (ProtocolUtils.isJavaGenericSerialization(generic)) {
               try {
                   UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                   ExtensionLoader.getExtensionLoader(Serialization.class)
                      .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                      .serialize(null, os).writeObject(result.getValue());
                   return new RpcResult(os.toByteArray());
              } catch (IOException e) {
                   throw new RpcException("Serialize result failed.", e);
              }
          } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
               return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(), JavaBeanAccessor.METHOD));
          } else {
               return new RpcResult(PojoUtils.generalize(result.getValue()));
          }
      } catch (NoSuchMethodException e) {
           throw new RpcException(e.getMessage(), e);
      } catch (ClassNotFoundException e) {
           throw new RpcException(e.getMessage(), e);
      }
  }   
   // Non generalized call , Pass the request to the next filter
   return invoker.invoke(inv);
}

 

提交申请后,顾问老师会电话与您沟通安排学习

免费课程推荐 >>
技术文档推荐 >>