XRPC-实现轻量级 RPC 框架

介绍

为了更深入的学习 RPC 的原理与实现过程,从零实现了一个简易的可拓展 RPC 项目。

技术点包括:网络通信框架 Netty、长连接复用、TCP 粘包 / 拆包、心跳保活、服务注册与发现(Zookeeper、Nacos)、Java 基础(注解、反射、多线程、Future、SPI 、动态代理)、自定义传输协议、多种序列化(ProtoBuf / Kyro / Hessian)、Gzip 压缩、多种负载均衡算法(轮询、随机、一致性哈希)、客户端同步 / 异步调用,集成 SpringBoot 开箱即用

在学习过程中,我也将重点整理为了博客,如果觉得有用,请点个 star 吧!感谢!!

本人能力有限,如有错误和改进欢迎提交 PR

** 文章列表:**

零、如何实现一个轻量级 RPC 框架?

一、如何用 Netty 实现高性能网络通信?

二、网络传输高效序列化协议与实现

三、服务注册与发现

四、采用动态代理去无感调用远程服务

五、使用 SPI 实现可插拔扩展设计

六、去调用哪个服务器呢?负载均衡策略

七、集成 Spring 与 SpringBoot

🔨 实现要点

  • 基于 NIO 的 Netty 网络通讯,实现 Channel 复用、心跳保活
  • 支持 ProtoBuf、Kryo、Hessian2 序列化,反序列化,经测试 Kryo 效率最高,默认 Kyro
  • 支持 Gzip 压缩,可在配置文件配置是否启用包压缩,已经压缩算法,减少数据包的大小。
  • 支持 Zookeeper 和 Nacos 的服务注册发现,启动后将服务信息发布到注册中心,客户端发现并监听服务信息。
  • 客户端实现了基于轮询、随机和一致性哈希负载均衡算法,快速失败和重试的容错策略
  • 自定义 RpcFuture,客户端支持同步和异步调用,设置回调方法,返回调用响应后执行回调。
  • 基于 SPI 的模块化管理,更加方便扩展模块,集成 Spring 通过注解注册服务,SpringBoot 自动装载配置
  • 动态代理使用 Javassist 生成代码,直接调用
  • 支持 Eureka、Consul 等注册中心
  • 调用鉴权、服务监控中心
  • 编写更完整的测试

💻 项目目录

1
以下是重要的包的简介:

|- docs:博文 Markdown 源文件以及绘图 draw.io 文件

|- xrpc-client:RPC 客户端核心
|- async: 实现了 RpcFuture,完成同步、异步回调
|- config: SpringBoot 自动配置类
|- core: Netty 客户端核心逻辑,Channel 复用,心跳保活
|- faultTolerantInvoker: 容错策略
|- loadbalance: 负载均衡算法
|- proxy: 动态代理类, 实现无感调用

|- xrpc-common: RPC 抽取出来的通用模块
|- annotation:自定义的注解,例如 @RpcService(服务提供)、@RpcAutowired(服务引用)
|- codec: Netty 编解码、TCP 粘包、拆包
|- compress: 网络传输过程中的压缩算法
|- dto: 网络传输中的 RpcMessage,Request,Response
|- extension: 增强版 JDK SPI
|- proterties: SpringBoot 的配置 Config
|- registry: 注册中心,例如 Zookeeper、Nacos 注册中心
|- serializer: 序列化算法实现

|- xrpc-server: RPC 服务端核心
|- core: Netty 服务端逻辑,注册服务,接受请求
|- invoke: 反射调用请求的方法,实现了 jdk 和 cglib

|- xrpc-test-client: 样例 demo-客户端
|- xrpc-test-server: 样例 demo-服务端

1

🚀 主要特性

下面为使用 draw.io 绘制的图,源文件位于https://github.com/DongZhouGu/XRPC/blob/master/docs/images/rpc.drawio,可供参考

RPC 调用过程

Netty 服务端 pipline

传输协议

RPC-Client 逻辑

同步调用逻辑

异步调用逻辑

使用方式

  1. 克隆本项目到本地 Maven install。
  2. 添加 maven 依赖到你的SpringBoot项目中。
1
2
3
4
5
6
7
8
9
10
11
12
13
<!--  客户端      -->
<dependency>
<groupId>com.dzgu.xrpc</groupId>
<artifactId>xrpc-client</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>

<!-- 服务端 -->
<dependency>
<groupId>com.dzgu.xrpc</groupId>
<artifactId>xrpc-server</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
  1. 默认配置项在RpcConfig类中,可以通过application.properties来覆盖需要修改的配置项。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
xrpc:
# 是否启用rpc,默认启用
enable: true
# RPC服务端口
serverPort: 18866
# 注册中心,默认zookeeper
register: zookeeper
# 注册中心地址
registerAddress: 127.0.01:8848
# 序列化算没法,默认kryo
serializer: kryo
# 压缩算法,默认gzip
compress: gzip
# 负载均衡算法,默认random
load-balance: random
# 容错策略,默认retry
retry: retry
# 重试次数,只有容错策略是retry时才有效
retry-times: 3
  1. 启动注册中心

服务端

  1. 定义服务接口
1
2
3
public interface HelloService {
String hello(Hello hello);
}
  1. 实现服务接口,并通过@RpcService注册服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RpcService(value = HelloService.class, version = "1.0")
public class HelloServiceImp1 implements HelloService{
static {
System.out.println("HelloServiceImpl1被创建");
}

@Override
public String hello(Hello hello) {
log.info("HelloServiceImpl收到: {}.", hello.getMessage());
String result = "Hello description is " + hello.getDescription();
log.info("HelloServiceImpl返回: {}.", result);
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
}

客户端

同步调用

  1. 使用 @RpcAutowired 注解调用远程服务
  2. 调用接口方法
1
2
3
4
5
6
7
8
9
10
11
12
public class HelloController {
@RpcAutowired(version = "1.0")
private HelloService helloService;

public void test() throws InterruptedException {
for (int i = 0; i < 1000; i++) {
System.out.println(i+"----sync:"+helloService.hello(new Hello("hello", "hello sync")));
Thread.sleep(1000);
}
}

}

异步调用

  1. 使用 @RpcAutowired 注解调用远程服务,并且将注解的 isAsync 置为 ture
  2. 调用接口方法,并立即为RpcContext 上下文设置回调函数(集成 ResponseCallback 抽象类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class HelloController {
@RpcAutowired(version = "1.0",isAsync = true)
private HelloService helloServiceAsync;

public void testAsync() throws InterruptedException {
for (int i = 0; i < 1000; i++) {
helloServiceAsync.hello(new Hello("hello", "hello async"));
RpcContext.setCallback(new ResponseCallback() {
@Override
public void callBack(RpcResponse<Object> result) {
System.out.println("----Async--requetId:"+ result.getRequestId()+"--data:"+result.getData());
}
@Override
public void onException(RpcResponse<Object> result, Exception e) {

}
});
Thread.sleep(1000);
}
}
}

☕ 鸣谢

感谢以下项目,我们从中得到了很大的帮助: