从零实现一个轻量级 RPC 框架-系列文章
Github: https://github.com/DongZhouGu/XRpc

前言

在客户端需要调用远程服务时,我们希望这个过程对于用户来说是无感的,使用框架的开发人员只需要像调用本地服务一样调用远程服务。因此,我们需要使用动态代理来增强接口方法,当调用接口方法时,框架会使用自动代理,将网络通信、编解码等复杂的过程封装在代理类中,本章就是讲解如何实现这个功能。

调用流程与封装

image.png
上图是整个调用过程的流程图,动态代理的部分,是 XRPC-Client 模块的核心代码。

动态代理工厂类

首先,抽象出一个动态代理工厂类,封装为 ProxyFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@Setter
@Accessors(chain = true)
@Slf4j
public class ProxyFactory {
private Register register;

private NettyClient nettyClient;

private LoadBalance loadBalance;

private FaultTolerantInvoker faultTolerantInvoker;
private int retryTime = 3;
private String compress;
private String serializer;

private Map<String, Object> objectCache = new HashMap<>();


/**
* 获取被调用服务的动态代理类
*/
public <T> T getProxy(Class<T> interfaceClass, String version) {
return (T) objectCache.computeIfAbsent(interfaceClass.getName() + version, clz ->
Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new ObjectProxy<T>(interfaceClass, version)
)
);
}

private class ObjectProxy<T> implements InvocationHandler {
private Class<T> clazz;
private String version;

public ObjectProxy(Class<T> clazz, String version) {
this.clazz = clazz;
this.version = version;
}

/**
* 客户端主要逻辑,包括发送请求,相应结果与请求的绑定
*/
@SneakyThrows
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
log.info("client invoked method: [{}]", method.getName());
// 构建request对象
RpcRequest rpcRequest = RpcRequest.builder()
.methodName(method.getName())
.parameters(args)
.parameterTypes(method.getParameterTypes())
.className(method.getDeclaringClass().getName())
.requestId(UUID.randomUUID().toString())
.version(version)
.build();
String rpcServiceName = rpcRequest.getClassName();
String version = rpcRequest.getVersion();
String serviceKey = ServiceUtil.makeServiceKey(rpcServiceName, version);
// 从注册中心 拿到该rpcService下的所有server的Address
List<String> serviceUrlList = register.lookupService(serviceKey);;
// 负载均衡
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
log.info("Successfully found the com.dzgu.xprc.service address:[{}]", targetServiceUrl);
//封装Message
RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest)
.codec(SerializerTypeEnum.getCode(serializer))
.compress(CompressTypeEnum.getCode(compress))
.requestId(REQUEST_ID.getAndIncrement())
.messageType(RpcConstants.REQUEST_TYPE).build();
// Netty向服务端发送请求
RpcResponse<Object> rpcResponse = null;
if (faultTolerantInvoker instanceof RetryInvoker) {
RetryInvoker.DEFAULT_RETRY_TIMES = retryTime;
}
rpcResponse = faultTolerantInvoker.doinvoke(nettyClient, rpcMessage, targetServiceUrl);
this.check(rpcResponse, rpcRequest);
return rpcResponse.getData();

}

private void check(RpcResponse<Object> rpcResponse, RpcRequest rpcRequest) {
if (rpcResponse == null) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, "interfaceName" + ":" + rpcRequest.getMethodName());
}

if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {
throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, "interfaceName" + ":" + rpcRequest.getMethodName());
}

if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCodeEnum.SUCCESS.getCode())) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, "interfaceName" + ":" + rpcRequest.getMethodName());
}
}
}
}
  1. 通过 getProxy 来获取指定接口的代理类 ObjectProxy
  2. 当调用接口方法时,通过实现 InvocationHandler 的 invoke 方法,完成调用逻辑,包括
    1. 构建网络通信对象 Rpcrequest
    2. 从注册中心缓存或注册中心中拿到被调用服务的网络地址
    3. 通过负载均衡策略选择一个地址
    4. 通过配置的容错策略,Netty 向服务端发送请求

容错策略

实现了两种简单的容错策略,分别是 fail-fast 快速失败和重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class RetryInvoker implements FaultTolerantInvoker {
/**
* 默认重试次数
*/
public static int DEFAULT_RETRY_TIMES = 3;


@Override
public RpcResponse<Object> doinvoke(NettyClient nettyClient, RpcMessage rpcMessage, String targetServiceUrl) {
for (int i = 0; i < DEFAULT_RETRY_TIMES; i++) {
try {
RpcResponse<Object> result = nettyClient.sendRequest( rpcMessage, targetServiceUrl);
if (result != null) {
return result;
}
} catch (RpcException ex) {
log.error("invoke error. retry times=" + i, ex);
}
}
throw new RpcException(SERVICE_INVOCATION_FAILURE);
}
}

public class FailFastInvoker implements FaultTolerantInvoker {
@Override
public RpcResponse<Object> doinvoke(NettyClient nettyClient, RpcMessage rpcMessage, String targetServiceUrl) {
return nettyClient.sendRequest(rpcMessage, targetServiceUrl);
}
}

容错策略中的主要逻辑就是通过 Netty 来发送请求消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public RpcResponse<Object> sendRequest(RpcMessage rpcMessage, String targetServiceUrl) {
String[] socketAddressArray = targetServiceUrl.split(":");
String host = socketAddressArray[0];
int port = Integer.parseInt(socketAddressArray[1]);
InetSocketAddress remoteaddress = new InetSocketAddress(host, port);
// 构造返回Future
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
// Channel复用,获取之前连接过的或者断线重连得Netty Channel
Channel channel = getChannel(remoteaddress);
RpcResponse<Object> rpcResponse = null;
try {
// 将请求放入未完成请求的Map缓存中,key为请求的唯一ID, value存放异步回调Future
pendingRpcRequests.put(((RpcRequest) rpcMessage.getData()).getRequestId(), resultFuture);
// 发送请求
channel.writeAndFlush(rpcMessage).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
log.info("client send message: [{}]", rpcMessage);
} else {
future.channel().close();
resultFuture.completeExceptionally(future.cause());
log.error("Send failed:", future.cause());
}
}
});
// 阻塞等待调用请求的结果,当 Netty Client 收到对应请求的回复时,future.complete(response),完成相应
// TODO 异步调用
rpcResponse = resultFuture.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("send request error: " + e.getMessage());
throw new RpcException("send request error:", e);
} finally {
pendingRpcRequests.remove(((RpcRequest) rpcMessage.getData()).getRequestId());
}
return rpcResponse;

}

调用方式

这里使用了 JDK 的 CompletableFuture 实现了同步调用
image.png
其中,future.get 仍然会阻塞线程,等待响应,
当然,还可以利用 CallBack 实现真正的异步回调,这里笔者没有实现,后面会继续补充

更新:异步调用

自己定义一个 RpcFuture,然后在 complete 的时候调用 用户传入的 callback 函数不就可以异步调用了嘛。执行远程调用方法时,直接返回空的 response,当收到 server 端返回的调用结果后,在调用 callback 函数,从而达到异步的效果。思路还是挺简单的,但是有一个问题是我们怎么把 RpcFuture 给到服务调用者,让用户自己添加回调的具体逻辑呢?
这里模仿 Dubbo 的方式,抽取了简单的 RpcContext 类,类内有 localCallback 静态变量,这是一个 ThreadLocal 类型的,也就是说,每个线程都会有私有的 ResponseCallback 对象,只要我们在一个线程中 set 和 put 回调函数就解决问题了。

1
2
3
4
5
6
7
8
9
10
11
public class RpcContext {
private static ThreadLocal<ResponseCallback> localCallback = new ThreadLocal<>();

public static void setCallback(ResponseCallback callback) {
localCallback.set(callback);
}

public static ResponseCallback getCallback() {
return localCallback.get();
}
}

具体来说,我们来看一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@RpcAutowired(version = "1.0",isAsync = true)
private HelloService helloServiceAsync;


helloServiceAsync.hello(new Hello("hello", "hello async"));
RpcContext.setCallback(new ResponseCallback() {
@Override
public void callBack(RpcResponse<Object> result) {
System.out.println("----Async--requetId:"+ result.getRequestId()+"--data:"+result.getData());
}
@Override
public void onException(RpcResponse<Object> result, Exception e) {

}
});

private RpcResponse<Object> sendAsyncRequest(Channel channel, RpcFuture resultFuture, RpcMessage rpcMessage) {
RpcResponse<Object> rpcResponse = null;
String requestId = ((RpcRequest) rpcMessage.getData()).getRequestId();
ResponseCallback callback = RpcContext.getCallback();
resultFuture.setResponseCallback(callback);
try {
pendingRpcRequests.put(requestId, resultFuture);
// 发送请求
channel.writeAndFlush(rpcMessage).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
log.info("client send message: [{}]", rpcMessage);
} else {
future.channel().close();
log.error("Send failed:", future.cause());
}
}
});
// 直接返回空的数据体
rpcResponse = RpcResponse.success(null, requestId);
} catch (Exception e) {
pendingRpcRequests.remove(requestId);
log.error("send request error: " + e.getMessage());
throw new RpcException("send request error:", e);
}
return rpcResponse;
}
  • 首先,我们对注解@RpcAutowired 添加了一个 isAsync 字段来标识是否是异步调用
  • RpcContext 是一个 ThreadLocal 的临时状态记录器。我们在调用服务时,给线程私有变量添加一个继承自抽象方法 ResponseCallback 的 callback 对象,实现 callback 和 onException 即可。
  • 在使用 Netty 发送消息时, 获取上下文的 callback 函数 ResponseCallback callback = RpcContext.getCallback(); 并把它添加到这个请求的 rpcFuture 中 resultFuture.setResponseCallback(callback);

之前我们使用 CompleteFuture 来实现请求和响应的绑定,现在我们自己实现一个 RpcFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class RpcFuture implements Future {
private RpcResponse<Object> response;

private ResponseCallback responseCallback;
private CountDownLatch countDownLatch;

public RpcFuture() {
countDownLatch = new CountDownLatch(1);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return false;
}

/**
* 阻塞获取结果
*/
@Override
public RpcResponse<Object> get() throws InterruptedException, ExecutionException {
countDownLatch.await();
return response;
}

@Override
public RpcResponse<Object> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (countDownLatch.await(timeout, unit)) {
return response;
}
return null;
}

public void complete(RpcResponse<Object> response) {
this.response = response;
countDownLatch.countDown();
if(responseCallback!=null){
responseCallback.success(response);
}
}

public void setResponseCallback(ResponseCallback responseCallback) {
this.responseCallback = responseCallback;
}
}

这里我们使用 countDownLatch 来实现 completFuture 的 get 阻塞调用
同时,在 complete 方法中实现了异步调用的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j
public abstract class ResponseCallback {

public void success(RpcResponse<Object> response) {
AsyncCallBackExecutor.execute(() -> {
log.debug("AsyncReceiveHandler success context:{} response:{}", response);
if (response.getCode() == RpcResponseCodeEnum.SUCCESS.getCode()) {
try {
callBack(response);
} catch (Exception e) {
onException(response, e);
}
} else {
onException(response, new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE));
}
});
}

/**
* 重写此方法,添加异步接收到结果之后的业务逻辑
*
* @param result
*/
public abstract void callBack(RpcResponse<Object> result);

/**
* 重写此方法,可以在callBack中自行处理业务处理异常,也可以重写此方法兜底处理
*
* @param result
* @param e
*/
public abstract void onException(RpcResponse<Object> result, Exception e);
}

AsyncCallBackExecutor 是一个处理异步调用的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class AsyncCallBackExecutor {
private static final int worker = 4;

private static class ThreadPoolExecutorHolder {
static {
log.info("call back executor work count is " + worker);
}

private final static ThreadPoolExecutor callBackExecutor = new ThreadPoolExecutor(
worker, worker, 2000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
ThreadPoolFactoryUtil.createThreadFactory("XRPC-Client-AsyncCallBackExecutor", false));
}
public static void execute(Runnable runnable) {
ThreadPoolExecutorHolder.callBackExecutor.execute(runnable);
}

}

整体流程图如下
image.png