从零实现一个轻量级 RPC 框架-系列文章 Github: https://github.com/DongZhouGu/XRpc
前言 SPI 全称为 Service Provider Interface,对应中文为服务发现机制。SPI 类似一种可插拔机制,首先需要定义一个接口或一个约定,然后不同的场景可以对其进行实现,调用方在使用的时候无需过多关注具体的实现细节。在 Java 中,SPI 体现了面向接口编程的思想,满足开闭设计原则。
JDK 自带 SPI 实现 以序列化为例,如果想要实现可插拔的序列化实现,使用 JDK 原生 SPI 过程如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface Serializer { byte [] serialize(Object object); } public class JSONSerializer implements Serializer { @Override public byte [] serialize(Object object) { return JSONUtil.toJsonStr(object).getBytes(); } } public class ProtostuffSerializer implements Serializer { private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); @Override public byte [] serialize(Object object) { Schema schema = RuntimeSchema.getSchema(object.getClass()); return ProtostuffIOUtil.toByteArray(object, schema, BUFFER); } }
在 resources/META-INF/services 目录下添加一个 com.xxx.Serializer 的文件,这是 JDK SPI 的配置文件:
1 2 com.xxx.JSONSerializer com.xxx.ProtostuffSerializer
然后,就可以使用 JDK 提供的 ServiceLoader 来加载扩展类了
1 2 3 4 5 6 7 8 public static void main (String[] args) { ServiceLoader<Serializer> serviceLoader = ServiceLoader.load(Serializer.class); Iterator<Serializer> iterator = serviceLoader.iterator(); while (iterator.hasNext()) { Serializer serializer= iterator.next(); System.out.println(serializer.getClass().getName()); } }
JDK 对 SPI 的加载实现存在一个较为突出的小缺点,无法按需加载实现类,通过 ServiceLoader.load 加载时会将文件中的所有实现都进行实例化,如果想要获取具体某个具体的实现类需要进行遍历判断。
Dubbo SPI 和 Java SPI 区别? Dubbo 就是通过 SPI 机制加载所有的组件。不过,Dubbo 并未使用 Java 原生的 SPI 机制,而是对其进行了增强,使其能够更好的满足需求。
配置文件改为键值对形式,可以获取任一实现类,懒加载,而无需加载所有实现类,节约资源;
增加了缓存来存储实例,提高了读取的性能;
Dubbo SPI 还提供了默认值的指定方式,@SPI(“xxx”)指定
增加了对扩展点 IOC 和 AOP 的支持,一个扩展点可以直接 setter 注入其
它扩展点。
XRpc 的 SPI 实现 SPI 注解 定义 SPI 注解
1 2 3 4 5 6 7 8 9 10 11 @Documented @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface SPI { String value () default "default " ; }
被@SPI 注解的接口为扩展类型,SPI 注解含有默认值,可选择默认扩展实现
获取对应接口的扩展加载器实例 当我们想要获取特定的扩展类实例时,调用如下
1 2 3 4 5 ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension("zk" ); ExtensionLoader.getExtensionLoader(ServiceDiscovery.class) .getExtension("zk" );
下面是获取对应接口的扩展加载器实例的具体逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 ** * 扩展类加载器实例缓存 */ private static final Map<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();public static <S> ExtensionLoader<S> getExtensionLoader (Class<S> type) { if (type == null ) { throw new IllegalArgumentException("Extension type should not be null." ); } if (!type.isInterface()) { throw new IllegalArgumentException("Extension type must be an interface." ); } if (type.getAnnotation(SPI.class) == null ) { throw new IllegalArgumentException("Extension type must be annotated by @SPI" ); } ExtensionLoader<S> extensionLoader = (ExtensionLoader<S>) EXTENSION_LOADERS.get(type); if (extensionLoader == null ) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<S>(type)); extensionLoader = (ExtensionLoader<S>) EXTENSION_LOADERS.get(type); } return extensionLoader; }
加载器指的就是 ExtensionLoader,为了减少对象的开销, 屏蔽了加载器的构造函数,提供了一个静态方法来获取加载器。EXTENSION_LOADERS 是一个 Map,缓存了各种类型的加载器。获取的时候先从缓存获取,缓存不存在则去实例化,节省资源
扩展类懒加载 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();private final Map<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();private final Map<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();public T getExtension (String name) { if (StrUtil.isBlank(name)) { log.warn("Extension name is null or empty, load the default Extension" ); return getDefaultExtension(); } Holder<Object> holder = cachedInstances.get(name); if (holder == null ) { cachedInstances.putIfAbsent(name, new Holder<>()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null ) { synchronized (holder) { instance = holder.get(); if (instance == null ) { instance = createExtension(name); holder.set(instance); } } } return (T) instance; }
一个接口如果有很多实现类,而我们只需要其中一个的时候,就会产生其他不必要的实现类。 例如 Dubbo 的序列化接口,实现类就有 fastjson、gson、hession2、jdk、kryo、protobuf 等等,通常我们只需要选择一种序列化方式。 这里,我们根据名字来实例化需要加载的扩展类。同样,使用一个 cachedInstances 缓存已经实例化的单例扩展类。
扩展类的创建 当获取扩展类不存在缓存时,会加锁创建单例,并放入到缓存中 实例化的流程如下:
从配置文件中,加载该接口所有的实现类的 Class 对象,并放到缓存中。
根据要获取的扩展名字,找到对应的 Class 对象。
调用 clazz.newInstance() 实例化。(Class 需要有无参构造函数)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private T createExtension (String name) { Class<?> clazz = getAllExtensionClasses().get(name); if (clazz == null ) { throw new RuntimeException("No such extension of name " + name); } T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null ) { try { EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } catch (Exception e) { log.error(e.getMessage()); } } return instance; } private Map<String, Class<?>> getAllExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null ) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null ) { classes = new HashMap<>(); loadDirectory(classes); cachedClasses.set(classes); } } } return classes; }
默认扩展类 在使用 @SPI 的时候可以指定一个默认的实现类名,例如 @SPI(“zk”)。 这样当获取扩展名留空没有配置的时候,就会直接获取默认扩展,减少了配置的量
1 2 3 4 5 6 7 8 9 10 private final String defaultNameCache;private ExtensionLoader (Class<T> type) { this .type = type; SPI annotation = type.getAnnotation(SPI.class); defaultNameCache = annotation.value(); } public T getDefaultExtension () { return getExtension(defaultNameCache); }
SPI 自适应扩展 在 Dubbo 中,SPI 配置有两种。一种是固定的系统级别的配置,在 Dubbo 启动之后就不会再改了。还有一种是运行时的配置,可能对于每一次的 RPC,这些配置都不同。
对于固定的配置,在配置 Config 中写死,在调用时从 ExtensionLoader 中拿到对应的扩展类,这样的话,虽然可以支持可插拔的第三方实现,但是在应用启动时,到底用哪个扩展类就已经确定了
对于运行时配置,Dubbo 提供了自适应扩展,也可被理解为扩展代理类,其就是 Extension 的代理,它实现了扩展点接口。在调用扩展点的接口方法时,会根据实际的参数来决定要使用哪个扩展。
在 Dubbo 中,因为 dubbo 是 url 驱动,即服务的配置信息都是通过&拼接在 url 之后,换句话说,当 Provider 收到调用请求时,其相关配置是通过查 url 后的参数获得;这样做的目的是, Consumer 在注册中心拿到相应服务的 url 后,可以根据自身的配置对请求 url 再次进行拼接(修改)。因此,对于 Dubbo 而言,每一次的 RPC 调用的参数都是未知的,只有在运行时,根据这些参数才能做出正确的决定。
获取自适应扩展类
1 2 3 4 5 6 public T getAdaptiveExtension () { InvocationHandler handler = new AdaptiveInvocationHandler<>(type); return (T) Proxy.newProxyInstance(ExtensionLoader.class.getClassLoader(), new Class<?>[]{type}, handler); }
适配扩展类其实是一个代理类,接下来来看看这个代理类 AdaptiveInvocationHandler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class AdaptiveInvocationHandler <T > implements InvocationHandler { private final Class<T> clazz; public AdaptiveInvocationHandler (Class<T> clazz) { this .clazz = clazz; } @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { if (args.length==0 ){ return method.invoke(proxy,args); } URL url=null ; for (Object arg : args) { if (arg instanceof URL) { url = (URL) arg; break ; } } if (url == null ) { return method.invoke(proxy, args); } Adaptive adaptive = method.getAnnotation(Adaptive.class); if (adaptive == null ) { return method.invoke(proxy, args); } String extendNameKey = adaptive.value(); String extendName; if (URLKeyConst.PROTOCOL.equals(extendNameKey)) { extendName = url.getProtocol(); } else { extendName = url.getParam(extendNameKey, method.getDeclaringClass() + "." + method.getName()); } ExtensionLoader<T> extensionLoader = ExtensionLoader.getExtensionLoader(clazz); T extension = extensionLoader.getExtension(extendName); return method.invoke(extension, args); } }
扩展代理类的流程如下
从方法参数中拿到 URL 参数,拿不到就直接执行方法
获取配置 Key。从 @Adaptive#value() 拿扩展名的配置 key,如果拿不到就直接执行方法
获取扩展名。判断配置 key 是不是协议,如果是就拿协议类型,否则拿 URL 后面的参数。 例如 URL 是:zk://localhost:2181?type=eureka
如果 @Adaptive(“protocol”),那么扩展名就是协议类型:zk
如果 @Adaptive(“type”),那么扩展名就是 type 参数:eureka
最后根据扩展名获取扩展 extensionLoader.getExtension(extendName)