从零实现一个轻量级 RPC 框架-系列文章
Github: https://github.com/DongZhouGu/XRpc
前言
在客户端需要调用远程服务时,我们希望这个过程对于用户来说是无感的,使用框架的开发人员只需要像调用本地服务一样调用远程服务。因此,我们需要使用动态代理来增强接口方法,当调用接口方法时,框架会使用自动代理,将网络通信、编解码等复杂的过程封装在代理类中,本章就是讲解如何实现这个功能。
调用流程与封装
上图是整个调用过程的流程图,动态代理的部分,是 XRPC-Client 模块的核心代码。
动态代理工厂类
首先,抽象出一个动态代理工厂类,封装为 ProxyFactory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| @Setter @Accessors(chain = true) @Slf4j public class ProxyFactory { private Register register;
private NettyClient nettyClient;
private LoadBalance loadBalance;
private FaultTolerantInvoker faultTolerantInvoker; private int retryTime = 3; private String compress; private String serializer;
private Map<String, Object> objectCache = new HashMap<>();
public <T> T getProxy(Class<T> interfaceClass, String version) { return (T) objectCache.computeIfAbsent(interfaceClass.getName() + version, clz -> Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new ObjectProxy<T>(interfaceClass, version) ) ); }
private class ObjectProxy<T> implements InvocationHandler { private Class<T> clazz; private String version;
public ObjectProxy(Class<T> clazz, String version) { this.clazz = clazz; this.version = version; }
@SneakyThrows @Override public Object invoke(Object proxy, Method method, Object[] args) { log.info("client invoked method: [{}]", method.getName()); RpcRequest rpcRequest = RpcRequest.builder() .methodName(method.getName()) .parameters(args) .parameterTypes(method.getParameterTypes()) .className(method.getDeclaringClass().getName()) .requestId(UUID.randomUUID().toString()) .version(version) .build(); String rpcServiceName = rpcRequest.getClassName(); String version = rpcRequest.getVersion(); String serviceKey = ServiceUtil.makeServiceKey(rpcServiceName, version); List<String> serviceUrlList = register.lookupService(serviceKey);; String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest); log.info("Successfully found the com.dzgu.xprc.service address:[{}]", targetServiceUrl); RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest) .codec(SerializerTypeEnum.getCode(serializer)) .compress(CompressTypeEnum.getCode(compress)) .requestId(REQUEST_ID.getAndIncrement()) .messageType(RpcConstants.REQUEST_TYPE).build(); RpcResponse<Object> rpcResponse = null; if (faultTolerantInvoker instanceof RetryInvoker) { RetryInvoker.DEFAULT_RETRY_TIMES = retryTime; } rpcResponse = faultTolerantInvoker.doinvoke(nettyClient, rpcMessage, targetServiceUrl); this.check(rpcResponse, rpcRequest); return rpcResponse.getData();
}
private void check(RpcResponse<Object> rpcResponse, RpcRequest rpcRequest) { if (rpcResponse == null) { throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, "interfaceName" + ":" + rpcRequest.getMethodName()); }
if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) { throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, "interfaceName" + ":" + rpcRequest.getMethodName()); }
if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCodeEnum.SUCCESS.getCode())) { throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, "interfaceName" + ":" + rpcRequest.getMethodName()); } } } }
|
- 通过 getProxy 来获取指定接口的代理类 ObjectProxy
- 当调用接口方法时,通过实现 InvocationHandler 的 invoke 方法,完成调用逻辑,包括
- 构建网络通信对象 Rpcrequest
- 从注册中心缓存或注册中心中拿到被调用服务的网络地址
- 通过负载均衡策略选择一个地址
- 通过配置的容错策略,Netty 向服务端发送请求
容错策略
实现了两种简单的容错策略,分别是 fail-fast 快速失败和重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Slf4j public class RetryInvoker implements FaultTolerantInvoker {
public static int DEFAULT_RETRY_TIMES = 3;
@Override public RpcResponse<Object> doinvoke(NettyClient nettyClient, RpcMessage rpcMessage, String targetServiceUrl) { for (int i = 0; i < DEFAULT_RETRY_TIMES; i++) { try { RpcResponse<Object> result = nettyClient.sendRequest( rpcMessage, targetServiceUrl); if (result != null) { return result; } } catch (RpcException ex) { log.error("invoke error. retry times=" + i, ex); } } throw new RpcException(SERVICE_INVOCATION_FAILURE); } }
public class FailFastInvoker implements FaultTolerantInvoker { @Override public RpcResponse<Object> doinvoke(NettyClient nettyClient, RpcMessage rpcMessage, String targetServiceUrl) { return nettyClient.sendRequest(rpcMessage, targetServiceUrl); } }
|
容错策略中的主要逻辑就是通过 Netty 来发送请求消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public RpcResponse<Object> sendRequest(RpcMessage rpcMessage, String targetServiceUrl) { String[] socketAddressArray = targetServiceUrl.split(":"); String host = socketAddressArray[0]; int port = Integer.parseInt(socketAddressArray[1]); InetSocketAddress remoteaddress = new InetSocketAddress(host, port); CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>(); Channel channel = getChannel(remoteaddress); RpcResponse<Object> rpcResponse = null; try { pendingRpcRequests.put(((RpcRequest) rpcMessage.getData()).getRequestId(), resultFuture); channel.writeAndFlush(rpcMessage).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { log.info("client send message: [{}]", rpcMessage); } else { future.channel().close(); resultFuture.completeExceptionally(future.cause()); log.error("Send failed:", future.cause()); } } }); rpcResponse = resultFuture.get(10, TimeUnit.SECONDS); } catch (Exception e) { log.error("send request error: " + e.getMessage()); throw new RpcException("send request error:", e); } finally { pendingRpcRequests.remove(((RpcRequest) rpcMessage.getData()).getRequestId()); } return rpcResponse;
}
|
调用方式
这里使用了 JDK 的 CompletableFuture 实现了同步调用
其中,future.get 仍然会阻塞线程,等待响应,
当然,还可以利用 CallBack 实现真正的异步回调,这里笔者没有实现,后面会继续补充
更新:异步调用
自己定义一个 RpcFuture,然后在 complete 的时候调用 用户传入的 callback 函数不就可以异步调用了嘛。执行远程调用方法时,直接返回空的 response,当收到 server 端返回的调用结果后,在调用 callback 函数,从而达到异步的效果。思路还是挺简单的,但是有一个问题是我们怎么把 RpcFuture 给到服务调用者,让用户自己添加回调的具体逻辑呢?
这里模仿 Dubbo 的方式,抽取了简单的 RpcContext 类,类内有 localCallback 静态变量,这是一个 ThreadLocal 类型的,也就是说,每个线程都会有私有的 ResponseCallback 对象,只要我们在一个线程中 set 和 put 回调函数就解决问题了。
1 2 3 4 5 6 7 8 9 10 11
| public class RpcContext { private static ThreadLocal<ResponseCallback> localCallback = new ThreadLocal<>();
public static void setCallback(ResponseCallback callback) { localCallback.set(callback); }
public static ResponseCallback getCallback() { return localCallback.get(); } }
|
具体来说,我们来看一个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @RpcAutowired(version = "1.0",isAsync = true) private HelloService helloServiceAsync;
helloServiceAsync.hello(new Hello("hello", "hello async")); RpcContext.setCallback(new ResponseCallback() { @Override public void callBack(RpcResponse<Object> result) { System.out.println("----Async--requetId:"+ result.getRequestId()+"--data:"+result.getData()); } @Override public void onException(RpcResponse<Object> result, Exception e) {
} });
private RpcResponse<Object> sendAsyncRequest(Channel channel, RpcFuture resultFuture, RpcMessage rpcMessage) { RpcResponse<Object> rpcResponse = null; String requestId = ((RpcRequest) rpcMessage.getData()).getRequestId(); ResponseCallback callback = RpcContext.getCallback(); resultFuture.setResponseCallback(callback); try { pendingRpcRequests.put(requestId, resultFuture); channel.writeAndFlush(rpcMessage).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { log.info("client send message: [{}]", rpcMessage); } else { future.channel().close(); log.error("Send failed:", future.cause()); } } }); rpcResponse = RpcResponse.success(null, requestId); } catch (Exception e) { pendingRpcRequests.remove(requestId); log.error("send request error: " + e.getMessage()); throw new RpcException("send request error:", e); } return rpcResponse; }
|
- 首先,我们对注解@RpcAutowired 添加了一个 isAsync 字段来标识是否是异步调用
- RpcContext 是一个 ThreadLocal 的临时状态记录器。我们在调用服务时,给线程私有变量添加一个继承自抽象方法 ResponseCallback 的 callback 对象,实现 callback 和 onException 即可。
- 在使用 Netty 发送消息时, 获取上下文的 callback 函数 ResponseCallback callback = RpcContext.getCallback(); 并把它添加到这个请求的 rpcFuture 中 resultFuture.setResponseCallback(callback);
之前我们使用 CompleteFuture 来实现请求和响应的绑定,现在我们自己实现一个 RpcFuture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class RpcFuture implements Future { private RpcResponse<Object> response;
private ResponseCallback responseCallback; private CountDownLatch countDownLatch;
public RpcFuture() { countDownLatch = new CountDownLatch(1); }
@Override public boolean cancel(boolean mayInterruptIfRunning) { return false; }
@Override public boolean isCancelled() { return false; }
@Override public boolean isDone() { return false; }
@Override public RpcResponse<Object> get() throws InterruptedException, ExecutionException { countDownLatch.await(); return response; }
@Override public RpcResponse<Object> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (countDownLatch.await(timeout, unit)) { return response; } return null; }
public void complete(RpcResponse<Object> response) { this.response = response; countDownLatch.countDown(); if(responseCallback!=null){ responseCallback.success(response); } }
public void setResponseCallback(ResponseCallback responseCallback) { this.responseCallback = responseCallback; } }
|
这里我们使用 countDownLatch 来实现 completFuture 的 get 阻塞调用
同时,在 complete 方法中实现了异步调用的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Slf4j public abstract class ResponseCallback {
public void success(RpcResponse<Object> response) { AsyncCallBackExecutor.execute(() -> { log.debug("AsyncReceiveHandler success context:{} response:{}", response); if (response.getCode() == RpcResponseCodeEnum.SUCCESS.getCode()) { try { callBack(response); } catch (Exception e) { onException(response, e); } } else { onException(response, new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE)); } }); }
public abstract void callBack(RpcResponse<Object> result);
public abstract void onException(RpcResponse<Object> result, Exception e); }
|
AsyncCallBackExecutor 是一个处理异步调用的线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class AsyncCallBackExecutor { private static final int worker = 4;
private static class ThreadPoolExecutorHolder { static { log.info("call back executor work count is " + worker); }
private final static ThreadPoolExecutor callBackExecutor = new ThreadPoolExecutor( worker, worker, 2000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), ThreadPoolFactoryUtil.createThreadFactory("XRPC-Client-AsyncCallBackExecutor", false)); } public static void execute(Runnable runnable) { ThreadPoolExecutorHolder.callBackExecutor.execute(runnable); }
}
|
整体流程图如下