从零实现一个轻量级 RPC 框架-系列文章
Github: https://github.com/DongZhouGu/XRpc

前言

客户端在调用远程服务时,怎么知道服务端是否有要调用的服务呢,如果有,服务端的地址是什么呢?
因此,在 RPC 框架中,服务注册与发现是非常重要的一个部分。在 Provider 也就是服务端启动时,需要将自己的 IP 地址和 RPC 接口写到配置表中;Consumer 也就是客户端在请求远程服务时,会获取该服务的 IP 地址。这个配置表就叫做注册中心
image.png

注册中心的要求

1. 存储
可以简单地将注册中心理解为一个存储系统,存储着服务与服务提供方的映射表。一般注册中心对存储没有太多特别的要求,甚至夸张一点,你可以基于数据库来实现一个注册中心。
2. 高可用
注册中心一旦挂掉,Consumer 将无法获取 Provider 的地址,整个微服务将无法运转。因此
当然 Consumer 可以添加本地缓存,从某种角度上看,是允许注册中心短暂挂掉的。
3. 健康检查
Provider 向注册中心注册服务之后,注册中心需要定时向 Provider 发起健康检查,当 Provider 宕机的时候,注册中心能更快发现 ,从而将宕机的 Provider 从注册表中移除。
这特性数据库、Redis 都不具有,因此他们不适合做注册中心。
4. 监听状态
当服务增加、减少 Provider 的时候,注册中心除了能及时更新,还要能主动通知 Consumer,以便 Consumer 能快速更新本地缓存,减少错误请求的次数。
这一特性同样数据库、Redis 都不具有。
目前主流的注册中心有:Zookeeper、Eureka、Nacos、Consul 等。

一致性协议(CAP)

CAP 即:

  • Consistency(一致性)对于不同节点的请求,要么给出包含最新的修改响应、要么给出一个出错响应。
  • Availability(可用性)对于每个请求都会给出一个非错响应,有可能是旧数据
  • Partition tolerance(分区容忍性)系统中节点间出现网络分区时,系统仍然能够正常响应请求。

这三个性质对应了分布式系统的三个指标:
而 CAP 理论说的就是:一个分布式系统,不可能同时做到这三点。这是因为在分布式系统总,网络故障和节点宕机是常态,因此网络分区是一定会出现的

注册中心实现的差异

Spring Cloud 有四个注册中心:Eureka、Zookeeper、Nacos 以及 Consul。注册中心必需的功能,肯定要有服务注册、服务发现、健康检查、集群节点间的心跳机制等等,集群架构要么是平级结构、要么是主从结构。节点之间必须考虑服务的数据信息同步,而一旦是主从结构,就必须考虑主节点 leader 的选举等等功能。这几种注册中心的比较如下

比较项 *Eureka * *zookeeper * *Nacos * Consul
集群结构 平级 主从 支持平级和主从 主从
集群角色 主人 Leader、follower observer leader、follower、candidate server-leader、server 以及 client
是否可以及时知道服务状态变化 不能及时知道 会及时知道 不能及时知道 不能及时知道
一致性协议(CAP) 注重可用性(AP) 注重一致性(CP) 支持 CP 和 AP-如何实现 注重一致性(CP)
雪崩保护 没有 没有
社区是否活跃 Eureka2.0 不再维护了 持续维护 持续维护 持续维护
管理端 有现成的 eureka 管理端 没有现成的管理端 有现成的管理端 有现成的管理端
负载均衡策略 使用 ribbon 实现 一般可以直接采用 RPC 的负载均衡 权重/metadata/Selector Fabio
权限控制 使用 ACL 实现节点权限控制 RBAC-用户、角色、权限 ACL
Spring Cloud 集成 支持 支持 支持 支持
健康检查 Client Beat Keep Alive TCP/HTTP/MYSQL/Client Beat TCP/HTTP/gRPC/Cmd
自动注销实例 支持 支持 支持 不支持
访问协议 HTTP TCP HTTP/DNS HTTP/DNS
是否可用作配置中心
多数据中心 不支持 不支持 不支持 支持
跨注册中心同步 不支持 不支持 支持 支持
Dubbo 集成 不支持 支持 支持 不支持
K8S 集成 支持 支持 支持 支持

下面主要用 dubbo 支持的 zookeeper 和 Nacos 两种注册中心进行学习

Zookeeper

ZooKeeper 提供的名称空间与标准文件系统的名称空间非常相似。各个节点名称是由斜杠(/)分隔的一系列路径元素。ZooKeeper 命名空间中的每个 znode 节点都由路径标识。

就像拥有一个文件系统一样,该文件系统也允许文件成为目录。
需要注意的是:ZooKeeper 旨在存储协调数据:状态信息,配置,位置信息等,因此每个节点上存储的数据通常很小,在字节到千字节范围内。

Znode 节点

  • 持久节点:一旦创建,除非主动移除,否则会一直保存在 ZooKeeper。
  • 临时节点:生命周期和客户端会话绑定,会话失效,相关的临时节点被移除。
  • 持久顺序性:同时具备顺序性。
  • 临时顺序性:同时具备顺序性。

Znodes 维护一个统计数据结构,其中包括用于数据更改,ACL 更改和时间戳的版本号,以允许进行缓存验证和协调更新。

ACL(Access Control List):
记录 Znode 的访问权限列表,也就是说存储了哪些人可以访问本节点。
stat:
包含 Znode 的各种元数据,比如事务 ID、版本号、时间戳、大小等等。
child:
当前节点的子节点引用,类似于二叉树的孩子节点,当然不止我画的这么两个孩子节点。

watch 特性

zookeeper 的这种 watch 机制能够使得客户端能够实时感知到 zk 上它所需要调用的那个服务实例的新增、删除以及更新等操作,从而进行相应的服务熔断、回退后续等处理。当客户端调用了任何一种获取数据(包含 getData(),getChildren()和 exist())的操作,并将监听事件参数 watch 设置为 true,则当这个 Znode 发生改变,zookeeper 服务端就会发送变化通知到这个请求监听的的客户端。

集群架构

主从结构的,具体来说是一主多从结构,就是有一个 leader,多个 follower,以及只负责读操作、不参与选举的 observer

ZAB 协议保证一致性

ZAB 有两种基本模式:崩溃恢复和广播模式
崩溃恢复:
集群架构中是有一个 Leader 的,但是这个 leader 万一因为网络故障挂掉就会从 Follower 中投票选举一个新的 leader。当选举产生了新的 Leader,同时集群中有过半的机器与该 Leader 服务器完成了状态同步(即数据同步)之后,Zab 协议就会退出崩溃恢复模式,进入消息广播模式。

广播模式:

1)所有的事务请求必须由一个全局唯一的 leader 服务器来协调处理,leader 接收到写入数据请求(客户端发出写入数据请求给任意 Follower,Follower 将写入数据请求转发给 Leader)
2)进入广播提议的发起
3)转换成一个 事务 Proposal,并将该 Proposal 分发给集群中所有的 Follower 服务器,也就是向所有 Follower 节点发送数据广播请求
4)Follower 接到 Propose 消息,写入日志成功
5)返回 ACK 消息给 Leader
6)Leader 接到半数以上 ACK 消息,返回成功给客户端,并且广播 Commit 请求给 Follower;
7)follower 接收到消息,则提交事务。

XRPC 中的实现

注册中心工厂,用于获取注册中心

1
2
3
4
5
6
7
8
9
10
11
@SPI(value = "zookeeper")
public interface RegisterFactory {
/**
* 获取注册中心
*
* @param address 注册中心的地址。
* @return 如果协议类型跟注册中心匹配上了,返回对应的配置中心实例
*/
Register getRegister(String address);
}

注册中心的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface Register {
default void registerServiceMap(Map<String, Object> serviceMap, InetSocketAddress serverAddress) {
for (String rpcServiceName : serviceMap.keySet()) {
registerService(rpcServiceName, serverAddress);
}
}

/**
* 向注册中心注册服务
*/
void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress);

/**
* 取消所有本机的服务,用于关机的时候
*/
void unregisterAllMyService(InetSocketAddress inetSocketAddress);

/**
* 查找含有某个服务的所有服务端地址
*/
public List<String> lookupService(String serviceKey);

/**
* 关闭注册中心
*/
public void stop();
}

zookeeper 注册工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ZkRegistryFactory implements RegisterFactory {
private static final Map<String, ZkRegister> cache = new ConcurrentHashMap<>();

@Override
public Register getRegister(String address) {
if (cache.containsKey(address)) {
return cache.get(address);
}
ZkRegister zkRegister = new ZkRegister(address);
cache.putIfAbsent(address, zkRegister);
return cache.get(address);
}
}

zookeeper 注册中心功能实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ZkRegister implements Register {
private CuratorFramework zkClient;

public ZkRegister(String address) {
this.zkClient = CuratorUtils.getZkClient(address);
}

@Override
public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {
String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();
CuratorUtils.createPersistentNode(zkClient, servicePath);
}

@Override
public void unregisterAllMyService(InetSocketAddress inetSocketAddress) {
CuratorUtils.clearRegistry(zkClient, inetSocketAddress);
zkClient.close();
}

@Override
public List<String> lookupService(String serviceKey) {
// 从注册中心 拿到该rpcService下的所有server的Address
List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, serviceKey);
if (CollUtil.isEmpty(serviceUrlList)) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, serviceKey);
}
return serviceUrlList;
}

@Override
public void stop() {
this.zkClient.close();
}
}

从注册中心拿到调用接口的服务地址时,还会走一层本地缓存,当注册中心挂的时候,本地还可以使用缓存中的数据。Zookeeper 还有一个很强的功能:监听。当监听的路径发生状态变化时,会全量更新对应的服务的本地缓存 SERVICE_ADDRESS_MAP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@Slf4j
public class CuratorUtils {
private static final int BASE_SLEEP_TIME = 1000;
private static final int MAX_RETRIES = 3;
public static final String ZK_REGISTER_ROOT_PATH = "/xrpc";
private static final Map<String, List<String>> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>();
private static final Set<String> REGISTERED_PATH_SET = ConcurrentHashMap.newKeySet();
private static CuratorFramework zkClient;

private CuratorUtils() {
}

/**
* Create persistent nodes. Unlike temporary nodes, persistent nodes are not removed when the client disconnects
*
* @param path node path
*/
public static void createPersistentNode(CuratorFramework zkClient, String path) {
try {
if (REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) {
log.info("The node already exists. The node is:[{}]", path);
} else {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
log.info("The node was created successfully. The node is:[{}]", path);
}
REGISTERED_PATH_SET.add(path);
} catch (Exception e) {
log.error("create persistent node for path [{}] fail", path);
}
}

/**
* Gets the children under a node
*
* @return All child nodes under the specified node
*/
public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName) {
if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) {
return SERVICE_ADDRESS_MAP.get(rpcServiceName);
}
List<String> result = null;
String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
try {
result = zkClient.getChildren().forPath(servicePath);
SERVICE_ADDRESS_MAP.put(rpcServiceName, result);
registerWatcher(rpcServiceName, zkClient);
} catch (Exception e) {
log.error("get children nodes for path [{}] fail", servicePath);
}
return result;
}

/**
* Empty the registry of data
*/
public static void clearRegistry(CuratorFramework zkClient, InetSocketAddress inetSocketAddress) {
REGISTERED_PATH_SET.stream().parallel().forEach(p -> {
try {
if (p.endsWith(inetSocketAddress.toString())) {
zkClient.delete().forPath(p);
}
} catch (Exception e) {
log.error("clear registry for path [{}] fail", p);
}
});
log.info("All registered services on the server are cleared:[{}]", REGISTERED_PATH_SET.toString());
}

public static CuratorFramework getZkClient(String zookeeperAddress) {
// if zkClient has been started, return directly
if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) {
return zkClient;
}
buildZkClient(zookeeperAddress);
try {
// wait 30s until connect to the zookeeper
if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) {
throw new RuntimeException("Time out waiting to connect to ZK!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return zkClient;
}

/**
* 基于建造者模式的链式调用来创建客户端
*
* @param connectString 服务器列表,格式host1:port1,host2:port2,...
*/
public static void buildZkClient(String connectString) {
zkClient = CuratorFrameworkFactory.builder()
.connectString(connectString)
.retryPolicy(new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES))
.build();
zkClient.start();
}

/**
* Registers to listen for changes to the specified node
*
*/
private static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception {
String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);
PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start();
}
}

Nacos

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
public class NacosRegister implements Register {
private NamingService namingService;

public NacosRegister(String address) {
this.namingService = NacosUtils.getNacosClient(address);
}

@Override
public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {
try {
NacosUtils.registerInstance(namingService, rpcServiceName, inetSocketAddress);
} catch (NacosException e) {
log.error("register service [{}] fail", rpcServiceName, e);
}
}

@Override
public void unregisterAllMyService(InetSocketAddress inetSocketAddress) {
NacosUtils.clearRegistry(namingService, inetSocketAddress);
}

@Override
public List<String> lookupService(String serviceKey) {
// 从注册中心 拿到该rpcService下的所有server的Address
List<String> serviceUrlList = null;
try {
serviceUrlList = NacosUtils.getAllInstance(namingService, serviceKey);
} catch (NacosException e) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, serviceKey);
}

return serviceUrlList;
}

@Override
public void stop() {
namingService=null;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@Slf4j
public class NacosUtils {
private static final Map<String, List<String>> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>();
private static final Set<String> REGISTERED_PATH_SET = ConcurrentHashMap.newKeySet();
private static NamingService namingService;

public static NamingService getNacosClient(String address) {
try {
namingService = NamingFactory.createNamingService(address);
} catch (NacosException e) {
log.error("connect to nacos [{}] fail", address);
}
return namingService;
}

/**
* 根据服务名称和地址注册服务
*
* @param rpcServiceName 服务名称
* @param address 服务地址
* @throws NacosException
*/
public static void registerInstance(NamingService namingService, String rpcServiceName, InetSocketAddress address) throws NacosException {
namingService.registerInstance(rpcServiceName, address.getHostName(), address.getPort());
REGISTERED_PATH_SET.add(rpcServiceName);
}

/**
* 根绝服务名称获取服务的所有实例
*
* @param serviceName 服务名称
* @return 服务实例集合
* @throws NacosException
*/
public static List<String> getAllInstance(NamingService namingService, String serviceName) throws NacosException {
if (SERVICE_ADDRESS_MAP.containsKey(serviceName)) {
return SERVICE_ADDRESS_MAP.get(serviceName);
}
List<Instance> allInstances = namingService.getAllInstances(serviceName);
List<String> addressList = new ArrayList<>();
for (Instance instance : allInstances) {
addressList.add(instance.getIp() + ":" + instance.getPort());
}
SERVICE_ADDRESS_MAP.put(serviceName, addressList);
registerWatcher(namingService, serviceName);
return addressList;
}

/**
* 根据服务地址清理 Nacos
*
* @param address 服务地址
*/
public static void clearRegistry(NamingService namingService, InetSocketAddress address) {
String host = address.getHostName();
int port = address.getPort();
REGISTERED_PATH_SET.stream().parallel().forEach(serviceName -> {
try {
namingService.deregisterInstance(serviceName, host, port);
} catch (NacosException e) {
log.error("clear registry for service [{}] fail", serviceName, e);
}
});
log.info("All registered services on the server are cleared:[{}]", REGISTERED_PATH_SET.toString());

}

/**
* 监听服务,更改时刷缓存
*/
@SneakyThrows
private static void registerWatcher(NamingService namingService, String serviceName) {
namingService.subscribe(serviceName, new EventListener() {
@SneakyThrows
@Override
public void onEvent(Event event) {
List<Instance> allInstances = namingService.getAllInstances(serviceName);
List<String> addressList = new ArrayList<>();
for (Instance instance : allInstances) {
addressList.add(instance.getIp() + ":" + instance.getPort());
}
SERVICE_ADDRESS_MAP.put(serviceName, addressList);
}
});

}
}