从零实现一个轻量级 RPC 框架-系列文章
Github: https://github.com/DongZhouGu/XRpc

前言

负载均衡在各个层级都有相应的应用。由于单机应用的性能局限性,在负载高的情况下,通常都会采用增加服务器的形式来横向扩展,通过集群和负载均衡来提高整个系统处理能力。
那么在 RPC 项目中,当服务端由集群组成,注册中心的一个 serviceKey 对应多个服务地址,那么该选择哪个进行远程调用呢?这就需要负载均衡算法。这里我们共实现了随机、轮询和一致性 Hash 三种算法

LoadBalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SPI(value = "random")
public interface LoadBalance {
String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest);

default String selectServiceAddress(List<String> serviceAddresses, RpcRequest rpcRequest) {
if (CollectionUtil.isEmpty(serviceAddresses)) {
return null;
}
if (serviceAddresses.size() == 1) {
return serviceAddresses.get(0);
}
return doSelect(serviceAddresses, rpcRequest);
}
}

使用 SPI 机制,加载配置的负载均衡策略
提供了三种负载均衡实现

  • RandomLoadBalance
  • FullRoundBalance
  • ConsistentHashLoadBalance

随机

字如其名,随机中服务节点中选择一个进行调用

1
2
3
4
5
6
7
public class RandomLoadBalance implements LoadBalance {
@Override
public String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
Random random = new Random();
return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
}
}

轮询

按顺序进行访问

1
2
3
4
5
6
7
8
9
10
/**
* @description: 随机
*/
public class RandomLoadBalance implements LoadBalance {
@Override
public String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
Random random = new Random();
return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
}
}

一致性哈希

  • 一致性 Hash,相同参数的请求总是发到同一提供者。
  • 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
  • 算法参见:http://en.wikipedia.org/wiki/Consistent_hashing

image.png
这里我们摘自Dubbo 博客

a、映射服务

将服务地址(ip+端口)按照一定规则构造出特定的识别码(如 md5 码),再用识别码对 2^32 取模,确定服务在 Hash 值区间对应的位置。假设有 Node1、Node2、Node3 三个服务,其映射关系如下:
image.png

b、映射请求、定位服务

在发起请求时,我们往往会带上参数,而这些参数,就可以被我们用来确定具体调用哪一个服务。假设有请求 R1、R2、R3,对它们的参数也经过计算特定识别码、取余的一系列运算之后,有如下映射关系:
image.png
从图中,我们可以看到,R1 请求映射在 0-Node1 中间,R2 请求映射在 Node1-Node2 中间,R3 请求映射在 Node2-Node3 中间。我们取服务 Hash 值大于请求 Hash 值第一个服务作为实际的调用服务。也就是说,R1 请求将调用 Node1 服务,R2 请求将调用 Node2 服务,R3 请求将调用 Node3 服务。

c、新增服务节点

假设新增服务 Node4,映射在 Node3 之前,恰巧破坏了原来的一个映射关系:
image.png
这样,请求 R3 将会实际调用服务 Node4,但请求 R1、R2 不受影响。

d、删除服务节点

假设服务 Node2 宕机,那么 R2 请求将会映射到 Node3:
image.png
原本的 R1、R3 请求不受影响。
可以看出,当新增、删除服务时,受影响的请求是有限的。不至于像简单取模映射一般,服务发生变化时,需要调整全局的映射关系

e、平衡性与虚拟节点

在我们上面的假设中,我们假设 Node1、Node2、Node3 三个服务在经过 Hash 映射后所分布的位置恰巧把环切成了均等的三分,请求的分布也基本是平衡的。但是实际上计算服务 Hash 值的时候,是很难这么巧的。也许一不小心就映射成了这个样子:
image.png
这样,就会导致大部分请求都会被映射到 Node1 上。因此,引出了虚拟节点。
所谓虚拟节点,就是除了对服务本身地址进行 Hash 映射外,还通过在它地址上做些处理(比如 Dubbo 中,在 ip+port 的字符串后加上计数符 1、2、3……,分别代表虚拟节点 1、2、3),以达到同一服务映射多个节点的目的。通过引入虚拟节点,我们可以把上图中映射给 Node1 的请求进一步拆分:
image.png
如上图所示,若有请求落在 Node3-Node1’区间,该请求应该是调用 Node1’服务,但是因为 Node1’是 Node1 的虚拟节点,所以实际调用的是 Node1 服务。通过引入虚拟节点,请求的分布就会比较平衡了

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
public class ConsistentHashLoadBalance implements LoadBalance {
private final ConcurrentHashMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();

@Override
public String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
// 用来识别Invoker列表是否发生变更的Hash码
int identityHashCode = System.identityHashCode(serviceAddresses);
String rpcServiceName = rpcRequest.getMethodName();
ConsistentHashSelector selector = selectors.get(rpcServiceName);
// 若不存在"接口.方法名"对应的选择器,或是Invoker列表已经发生了变更,则初始化一个选择器
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode));
selector = selectors.get(rpcServiceName);
}
return selector.select(rpcServiceName + Arrays.stream(rpcRequest.getParameters()));
}
}

这里有个很重要的概念:选择器——selector。这是 Dubbo 一致性 Hash 实现中,承载着整个映射关系的数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static class ConsistentHashSelector {
// 存储Hash值与节点映射关系的TreeMap
private final TreeMap<Long, String> virtualInvokers;
// 用来识别Invoker列表是否发生变更的Hash码
private final int identityHashCode;

ConsistentHashSelector(List<String> invokers, int replicaNumber, int identityHashCode) {
this.virtualInvokers = new TreeMap<>();
this.identityHashCode = identityHashCode;
// 对每个invoker生成replicaNumber个虚拟结点,并存放于TreeMap中
for (String invoker : invokers) {
for (int i = 0; i < replicaNumber / 4; i++) {
// 根据md5算法为每4个结点生成一个消息摘要,摘要长为16字节128位。
byte[] digest = md5(invoker + i);
// 随后将128位分为4部分,0-31,32-63,64-95,95-128,并生成4个32位数,存于long中,long的高32位都为0
// 并作为虚拟结点的key。
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
}

在新建 ConsistentHashSelector 对象的时候,就会遍历所有 Invoker 对象,然后计算出其地址(ip+port)对应的 md5 码,并按照配置的节点数目 replicaNumber 的值来初始化服务节点和所有虚拟节点。

这里值得注意的是:以 replicaNumber 取默认值 160 为例,假设当前遍历到的 Invoker 地址为 127.0.0.1:20880,它会依次获得“127.0.0.1:208800”、“127.0.0.1:208801”、……、“127.0.0.1:2088040”的 md5 摘要,在每次获得摘要之后,还会对该摘要进行四次数位级别的散列。大致可以猜到其目的应该是为了加强散列效果。(希望有人能告诉我相关的理论依据。)

如果找到对应的 selector,则会调用 selector 的 select 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 选择节点
*/
public String select(String rpcServiceKey) {
//生成消息摘要
byte[] digest = md5(rpcServiceKey);
//调用hash(digest, 0),将消息摘要转换为hashCode,这里仅取0-31位来生成HashCode
return selectForKey(hash(digest, 0));
}


/**
* 根据hashCode选择结点
*/
public String selectForKey(long hashCode) {
// 1、先找当前key对应的entity,若不存在,走2
// 2、找环上hash比key大,且最近的 entry
// 3、若无则返回null
Map.Entry<Long, String> entry = virtualInvokers.ceilingEntry(hashCode);
if (entry == null) {
// 若找不到,则直接返回环上第一个entry
entry = virtualInvokers.firstEntry();
}
// 返回具体invoker
return entry.getValue();
}