从零实现一个轻量级 RPC 框架-系列文章
Github: https://github.com/DongZhouGu/XRpc
前言
SpringBoot 最强大的功能就是把我们常用的场景抽取成了一个个 starter(场景启动器),我们通过引入 springboot 为我提供的这些场景启动器,我们再进行少量的配置就能使用相应的功能。
因此对于实现的 XRPC 同样需要制作 starter,并将相关配置和 bean 加载交由 Spring 来管理,最后通过 Maven 发布
自动配置原理
- 首先,SpringBoot 在启动时会去依赖的 starter 包中寻找 resources/META-INF/spring.factories 文件,然后根据文件中配置的 Jar 包去扫描项目所依赖的 Jar 包,这类似于 Java 的 SPI 机制。
- 第二步,根据 spring.factories 配置加载 AutoConfigure 类。
- 最后,根据 @Conditional 注解的条件,进行自动配置并将 Bean 注入 Spring Context 上下文当中。
POM 依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.4</version> </parent> <dependencies> <!-- SpringBoot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> </dependency> </dependencies>
|
XRPC-Client 集成 SpringBoot
1. 提供 RpcConfig
编写配置类,这样就可以在 SpringBoot 的配置中配置 xrpc 的参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| @Data @ConfigurationProperties(prefix = "xrpc") public class RpcConfig {
private boolean enable = true;
private String registerAddress = "127.0.0.1:2181";
private String register = "zookeeper";
private Integer serverPort = 9999;
private String serializer = "kryo";
private String compress = "gzip";
private String loadBalance = "random";
private String faultTolerant = "retry";
private Integer retryTimes = 3;
private String proxyType = "cglib"; }
|
###
2. 编写 RpcAutoConfiguration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| @Slf4j @Configuration @EnableConfigurationProperties(RpcConfig.class) @ConditionalOnProperty(prefix = "xrpc", name = "enable", havingValue = "true", matchIfMissing = true) public class RpcAutoConfiguration implements DisposableBean {
private Register register; private NettyClient nettyClient; private ProxyFactory proxyFactory;
@Bean public Register serviceDiscovery(@Autowired RpcConfig rpcConfig) { RegisterFactory registerFactory = ExtensionLoader.getExtensionLoader(RegisterFactory.class).getExtension(rpcConfig.getRegister()); register = registerFactory.getRegister(rpcConfig.getRegisterAddress()); return register; }
@Bean public NettyClient nettyClient() { nettyClient = new NettyClient(); return nettyClient; }
@Bean public ProxyFactory proxyFactory(@Autowired RpcConfig rpcConfig) { LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(rpcConfig.getLoadBalance()); FaultTolerantInvoker tolerantInvoker = ExtensionLoader.getExtensionLoader(FaultTolerantInvoker.class).getExtension(rpcConfig.getFaultTolerant());
proxyFactory = new ProxyFactory(); proxyFactory.setNettyClient(nettyClient) .setLoadBalance(loadBalance) .setRegister(register) .setFaultTolerantInvoker(tolerantInvoker) .setRetryTime(rpcConfig.getRetryTimes()) .setCompress(rpcConfig.getCompress()) .setSerializer(rpcConfig.getSerializer()); return proxyFactory; }
@Bean public ProxyInjectProcessor injectProcessor() { ProxyInjectProcessor proxyInjectProcessor = new ProxyInjectProcessor(); proxyInjectProcessor.setProxyFactory(proxyFactory); return proxyInjectProcessor; }
@Override public void destroy() { register.stop(); nettyClient.stop(); }
}
|
在 SpringBoot 启动进行自动装载时,根据 RpcConfig 中的参数进行 SPI 扩展类的注入,比如负载均衡和容错策略的选择。最后我们需要使用 ProxyFactory 来获得远程服务调用的代理类,但是 ProxyFactory 还依赖了其他类,(可能是更复杂的关联),所以当我们去使用这个类做事情时发现包空指针错误,这是因为我们这个类有可能已经初始化完成,但是引用的其他类不一定初始化完成,所以发生了空指针错误。
为了解决这个问题,我们还注入了 ProxyInjectProcessor,这个类中的主要作用就是等待 Spring 装载完成后,将需要远程调用的方法使用动态代理类替换。具体是继承 spring 的 ApplicationListener 监听,并监控 ContextRefreshedEvent 事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Slf4j @Setter public class ProxyInjectProcessor implements ApplicationListener<ContextRefreshedEvent> { private ProxyFactory proxyFactory;
@Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { if (Objects.isNull(contextRefreshedEvent.getApplicationContext().getParent())) { ApplicationContext context = contextRefreshedEvent.getApplicationContext(); String[] names = context.getBeanDefinitionNames(); for (String name : names) { Object bean = context.getBean(name); Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { RpcAutowired rpcAutowired = field.getAnnotation(RpcAutowired.class); if (rpcAutowired != null) { String version = rpcAutowired.version(); boolean isAsync = rpcAutowired.isAsync(); field.setAccessible(true); try { field.set(bean, proxyFactory.getProxy(field.getType(), version,isAsync)); } catch (IllegalAccessException e) { log.error("field.set error. bean={}, field={}", bean.getClass(), field.getName(), e); } } } } } } }
|
3. 编写 spring.factories
最后,编写 spring.factories,让 Springboot 自动装载的时候去加载我们的 AutoConfiuration
1
| org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.dzgu.xrpc.client.config.RpcAutoConfiguration
|
XRPC-Server 集成 SpringBoot
1. 提供 RpcConfig
和上一节一样,略过
2. 编写 RpcAutoConfiguration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Slf4j @Configuration @EnableConfigurationProperties(RpcConfig.class) @ConditionalOnProperty(prefix = "xrpc", name = "enable", havingValue = "true", matchIfMissing = true) public class RpcAutoConfiguration implements DisposableBean {
private NettyServer nettyServer; private ServiceRegisterCache serviceRegisterCache; private Invoker invoker;
@Bean public ServiceRegisterCache serviceProvider() { serviceRegisterCache = new ServiceRegisterCache(); return serviceRegisterCache; }
@Bean public Invoker invoker(@Autowired RpcConfig rpcConfig) { invoker = ExtensionLoader.getExtensionLoader(Invoker.class).getExtension(rpcConfig.getProxyType()); return invoker; }
@Bean public NettyServer nettyServer(@Autowired RpcConfig rpcConfig) { RegisterFactory registerFactory = ExtensionLoader.getExtensionLoader(RegisterFactory.class).getExtension(rpcConfig.getRegister()); Register register = registerFactory.getRegister(rpcConfig.getRegisterAddress()); String host = null; try { host = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { log.error("occur exception when getHostAddress", e); } InetSocketAddress inetSocketAddress = new InetSocketAddress(host == null ? "127.0.0.1" : host, rpcConfig.getServerPort()); nettyServer = new NettyServer(); nettyServer.setRegister(register); nettyServer.setInvoker(invoker); nettyServer.setServerAddress(inetSocketAddress); return nettyServer; }
@Bean public ServiceInjectProcessor injectProcessor() { ServiceInjectProcessor serviceInjectProcessor = new ServiceInjectProcessor(); serviceInjectProcessor.setNettyServer(nettyServer); serviceInjectProcessor.setServiceRegisterCache(serviceRegisterCache); return serviceInjectProcessor; }
@Override public void destroy() { nettyServer.stop(); } }
|
一样的,我们需要在所有 bean 装载完之后,将服务加载到 ServiceRegisterCache 缓存中,同时将 nettyServer 启动,并根绝缓存一次性注册到注册中心
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Setter public class ServiceInjectProcessor implements ApplicationListener<ContextRefreshedEvent> { private NettyServer nettyServer; private ServiceRegisterCache serviceRegisterCache;
@Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { if (Objects.isNull(contextRefreshedEvent.getApplicationContext().getParent())) { ApplicationContext context = contextRefreshedEvent.getApplicationContext(); Map<String, Object> serviceBeanMap = context.getBeansWithAnnotation(RpcService.class); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class); String interfaceName = rpcService.value().getName(); String version = rpcService.version(); serviceRegisterCache.addService(interfaceName, version, serviceBean);
} } nettyServer.setServiceRegisterCache(serviceRegisterCache); nettyServer.start(); } } }
|
3. 编写 spring.factories
最后,编写 spring.factories,让 Springboot 自动装载的时候去加载我们的 AutoConfiuration
1
| org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.dzgu.xrpc.server.config.RpcAutoConfiguration
|
##