从零实现一个轻量级 RPC 框架-系列文章 Github: https://github.com/DongZhouGu/XRpc
前言 既然要调用远程方法,必然需要网络通信,通过网络来传递要调用的目标类信息及相关方法参数,和返回的调用结果。 网络传输具体实现可以使用 Socket 、NIO、Netty:
Socket:Java 中最原始、最基础的网络通信方式。但是 Socket 是阻塞 IO、性能低并且功能单一
NIO:同步非阻塞的 I/O 模型,Java 原生实现,但是用它来进行网络编程太过繁琐
Netty:基于 NIO 的 client-server(客户端服务器)框架,设计了一套优秀的 Reactor 反应器模式使用它可以快速简单地开发网络应用程序。极大地简化并简化了 TCP 和 UDP 套接字服务器等网络编程, 并且性能以及安全性等很多方面甚至都要更好。支持多种协议如 FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议。
Reactor 反应器模式 Reactor 就是基于 NIO 中实现多路复用的一种模式,
单 Reactor 单线程模型
服务端的 Reactor 是一个线程,该线程会启动事件循环,并使用 Selector 实现 IO 多路复用,通过 acceptor 来获取并注册新的连接
客户端发起连接请求,Reactor 监听到了这个时间,并分发给对应的 acceptor 去处理,acceptor 负责建立到这个客户端的 SocketChannel,Reactor 使用 Selector 系统调用进行事件监听
当客户端有 IO 读写事件时,则分发给对应的 handler 进行处理
单线程 Reactor 模式中,不仅 I/O 操作在该 Reactor 线程上,连非 I/O 的业务操作也在该线程上进行处理了,这可能会大大延迟 I/O 请求的响应。所以我们应该将非 I/O 的业务逻辑操作从 Reactor 线程上卸载,以此来加速 Reactor 线程对 I/O 请求的响应。
单 Reactor 多线程模型 与单线程 Reactor 模式不同的是,添加了一个工作者线程池,并将非 I/O 操作从 Reactor 线程中移出转交给工作者线程池来执行。这样能够提高 Reactor 线程的 I/O 响应,不至于因为一些耗时的业务逻辑而延迟对后面 I/O 请求的处理。
多 Reactor 多线程模型 多 Reactor 线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个 Reactor 线程来完成。mainReactor 完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给 subReactor 线程来完成与客户端的通信,这样一来就不会因为 read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多 Reactor 线程模式在海量的客户端并发请求的情况下,还可以通过实现 subReactor 线程池来将海量的连接分发给多个 subReactor 线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。
使用 Netty 来实现 XPRC 服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 @Slf4j public class NettyServer { private Thread thread; private InetSocketAddress serverAddress = getServerAddress(); public void start () { thread = new Thread(new Runnable() { DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup( RuntimeUtil.cpus() * 2 , ThreadPoolFactoryUtil.createThreadFactory("service-handler-group" , false )); @Override public void run () { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true ) .childOption(ChannelOption.SO_KEEPALIVE, true ) .option(ChannelOption.SO_BACKLOG, 128 ) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(15 , 0 , 0 , TimeUnit.SECONDS)); ch.pipeline().addLast(new Spliter()); ch.pipeline().addLast(new RpcDecoder()); ch.pipeline().addLast(new RpcEncoder()); ch.pipeline().addLast(serviceHandlerGroup, new NettyServerHandler()); } }); ChannelFuture f = bootstrap.bind(serverAddress.getAddress(), serverAddress.getPort()).sync(); log.info("Netty Server started on address {}" , serverAddress); f.channel().closeFuture().sync(); } catch (Exception e) { if (e instanceof InterruptedException) { log.info("Rpc server remoting server stop" ); } else { log.error("Rpc server remoting server error" , e); } } finally { try { serviceRegistry.unregisterService(serverAddress); workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } catch (Exception ex) { log.error(ex.getMessage(), ex); } } } }); thread.start(); } public void stop () { if (thread != null && thread.isAlive()) { thread.interrupt(); } } }
NioEventLoop Netty 的反应器类为:NioEventLoop,首先看到,我们创建了两个 NioEventLoopGroup,第一个通 常被称为“包工头”,负责服务器通道新连接的 IO 事件的监听。第二个 通常被称为“工人”,主要负责传输通道的 IO 事件的处理。具体来说,一种类型的 reactor 线程是 boss 线程组,专门用来接受新的连接,然后封装成 channel 对象扔给 worker 线程组;还有一种类型的 reactor 线程是 worker 线程组,专门用来处理连接的读写。不管是 boos 线程还是 worker 线程,所做的事情均分为以下三个步骤
轮询注册在 selector 上的 IO 事件
处理 IO 事件
执行异步 task
对于 boos 线程来说,第一步轮询出来的基本都是 accept 事件,表示有新的连接,而 worker 线程轮询出来的基本都是 read/write 事件,表示网络的读写事件。
ServerBootstrap 服务启动类 Bootstrap 类是 Netty 提供的一个便利的工厂类,可以通过它来完成 Netty 的客户端或服务器端的 Netty 组件的组装,以及 Netty 程序的初始 化。当然,Netty 的官方解释是,完全可以不用这个 Bootstrap 启动器。 但是,一点点去手动创建通道、完成各种设置和启动、并且注册到 EventLoop,这个过程会非常麻烦。通常情况下,还是使用这个便利的 Bootstrap 工具类会效率更高。
创建反应器线程组,并赋值给 ServerBootstrap 启动器实例
在服务器端,建议设置成两个线程组的工作模式。
设置通道的 IO 类型
Netty 不止支持 Java NIO,也支持阻塞式的 OIO(也叫 BIO,BlockIO,即阻塞式 IO)由于 NIO 的优势巨大,通 常不会在 Netty 中使用 BIO。 在 Netty 中,将有接收关系的 NioServerSocketChannel 和 NioSocketChannel,叫作父子通道。其中,NioServerSocketChannel 负 责服务器连接监听和接收,也叫父通道(Parent Channel)。对应于每 一个接收到的 NioSocketChannel 传输类通道,也叫子通道(Child Channel)。服务端使用 b.channel(NioServerSocketChannel.class)来监听
设置传输通道的配置选项
调用了 Bootstrap 的 option()选项设置方法。对于服务器的 Bootstrap 而言,这个方法的作用是:给父通道(Parent Channel)设置一些与传输协议相关的选项。如果要给子通道(Child Channel)设置一些通道选项,则需要调用 childOption()设置方法。具体的 channelOption(): https://juejin.cn/post/6982470261811445791
装配子通道的 Pipeline 流水线
每一个通道的子通道,都用一条 ChannelPipeline 流水线。它的内部有一个双向的链表。装配流水线的方式是:将业务 处理器 ChannelHandler 实例加入双向链表中。 装配子通道的 Handler 流水线调用 childHandler()方法,传递一个 ChannelInitializer 通道初始化类的实例。 在父通道成功接收一个连接, 并创建成功一个子通道后,就会初始化子通道,这里配置的 ChannelInitializer 实例就会被调用。 在 ChannelInitializer 通道初始化类的实例中,有一个 initChannel 初 始化方法,在子通道创建后会被执行到,向子通道流水线增加业务处理器。
1 2 3 4 5 6 7 8 9 10 11 12 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(15 , 0 , 0 , TimeUnit.SECONDS)); ch.pipeline().addLast(new Spliter()); ch.pipeline().addLast(new RpcDecoder()); ch.pipeline().addLast(new RpcEncoder()); ch.pipeline().addLast(serviceHandlerGroup, new NettyServerHandler()); } });
为什么仅装配子通道的流水线,而不需要装配父通道的流水线 呢?原因是:父通道也就是 NioServerSocketChannel 连接接受通道,它 的内部业务处理是固定的:接受新连接后,创建子通道,然后初始化 子通道,所以不需要特别的配置。
开始绑定服务器新连接的监听端口
1 2 3 4 ChannelFuture f = bootstrap.bind(serverAddress.getAddress(), serverAddress.getPort()).sync(); log.info("Netty Server started on address {}" , serverAddress);
bootstrap.bind()方法的功能:返回一个端口绑定 Netty 的异 步任务 channelFuture。在这里,并没有给 channelFuture 异步任务增加回 调监听器,而是阻塞 channelFuture 异步任务,直到端口绑定任务执行 完成。
自我阻塞,直到通道关闭
1 2 f.channel().closeFuture().sync();
关闭 EventLoopGroup
关闭 Reactor 反应器线程组,同时会关闭内部的 subReactor 子反应 器线程,也会关闭内部的 Selector 选择器、内部的轮询线程以及负责查 询的所有的子通道。在子通道关闭后,会释放掉底层的资源,如 TCP Socket 文件描述符等。
至此,Netty 服务端已搭建完成,其中,最为重要的是装配到Pipeline 流水线中 handler,下面我们具体介绍。
Pipline 流水线 每条通道内部都有一条流水线 pipline 来讲 Handler 装配起来来处理业务。Netty 的业务处理器流水线 ChannelPipeline 是基于*责任链设计模式 * 来设计的,内部是一个双向链表结构,能够 支持动态地添加和删除 Handler 业务处理器。 Handler 涉及的环节有:数据包解码、业务处理、目标数据编码、数据包写入通道这几个部分,那么他们在 pipline 中的添加顺序是怎样的呢? 首先,Handler 有入站和出 站两种类型操作
入站处理,触发的方向为:自底向上,Netty 的内部(如通道)到 ChannelInboundHandler 入站处理器。
出站处理,触发的方向为:自顶向下,从 ChannelOutboundHandler 出站处理器到 Netty 的内部(如通道)。
按照这种方向来分,前面数据包解码、业务处理两个环节——属 于入站处理器的工作;后面目标数据编码、把数据包写到通道中两个 环节——属于出站处理器的工作。 入站处理器的流动次序是:从前到后。加在前面的, 执行也在前面;出站流水处理次序为从后向前,最后加入的出 站处理器,反而执行在最前面。这一点和 Inbound 入站处理次序是相反的。需要注意的点
1 2 3 4 5 6 ch.pipeline().addLast(new InBoundHandlerA()); ch.pipeline().addLast(new OutboundHandlerA()); ch.pipeline().addLast(new InBoundHandlerB()); ch.pipeline().addLast(new OutboundHandlerB()); ch.pipeline().addLast(new InBoundHandlerC()); ch.pipeline().addLast(new OutboundHandlerC());
针对 InBoundHandlerC,处理完消息发送时,
当调用 ctx.writeAndFlush(new Object())时代表 Object 从当前的 handler 流向 head 节点
当调用 ctx.channel().writeAndFlush(new Object())时代表 Object 从 tail 节点流向 head 节点。
针对 RPC 框架的 Netty Server 的 pipline 来说,执行顺序为
心跳-空闲检测 网络应用程序普遍会出现连接假死,连接假死的现象是:在某一端(服务端或者客户端)看来,底层的 TCP 连接已经断开了,但是应用程序并没有捕获到,因此会认为这条连接仍然是存在的,从 TCP 层面来说,只有收到四次握手数据包或者一个 RST 数据包,连接的状态才表示已断开。 连接假死会带来以下两大问题
对于服务端来说,因为每条连接都会耗费 cpu 和内存资源,大量假死的连接会逐渐耗光服务器的资源,最终导致性能逐渐下降,程序奔溃。
对于客户端来说,连接假死会造成发送数据超时,影响用户体验。
通常,连接假死由以下几个原因造成的
应用程序出现线程堵塞,无法进行数据的读写。
客户端或者服务端网络相关的设备出现故障,比如网卡,机房故障。
公网丢包。公网环境相对内网而言,非常容易出现丢包,网络抖动等现象,如果在一段时间内用户接入的网络连续出现丢包现象,那么对客户端来说数据一直发送不出去,而服务端也是一直收不到客户端来的数据,连接就一直耗着。
我们分别从客户端和服务端来解决这个问题服务端 利用 Netty 自带的 IdleStateHandler 实现空闲检测,服务端只需要检测一段时间内,是否收到过客户端发来的数据即可,
1 2 3 4 public IdleStateHandler (long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {}
第一个参数是隔多久检查一下读事件是否发生,如果* channelRead() 方法超过 readerIdleTime 时间未被调用则会触发一个 READER_IDLE *的 IdleStateEvent 事件;
第二个参数是隔多久检查一下写事件是否发生,writerIdleTime 写空闲超时时间设定,如果 write() 方法超过 writerIdleTime 时间未被调用则会WRITER_IDLE 的* IdleStateEvent* 事件;
第三个参数是全能型参数,隔多久检查读写事件;
第四个参数表示当前的时间单位。
1 2 3 4 5 6 7 8 9 10 11 12 @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.READER_IDLE) { log.info("idle check happen, so close the connection" ); ctx.close(); } } else { super .userEventTriggered(ctx, evt); } }
服务端当 15 秒内没有读到数据(客户端发来的心跳),则出发 userEventTriggered 事件,关闭假死的 channel 连接。
客户端 客户端同样添加 IdleStateHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 ch.pipeline().addLast(new IdleStateHandler(0 , 5 , 0 , TimeUnit.SECONDS)); 当5 秒内没有主动远程调用,也就是没有写事件发生时候,触发userEventTriggered主动写并发送心跳数据包 @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { log.info("write idle happen [{}]" , ctx.channel().remoteAddress()); Channel channel = nettyClient.getChannel((InetSocketAddress) ctx.channel().remoteAddress()); RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setCodec(SerializerTypeEnum.PROTOSTUFF.getCode()); rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode()); rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE); rpcMessage.setData(RpcConstants.PING); channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } else { super .userEventTriggered(ctx, evt); } }
###
自定义协议 无论是使用 Netty 还是原始的 Socket 编程,基于 TCP 通信的数据包格式均为二进制,协议指的就是客户端与服务端事先商量好的,每一个二进制数据包中每一段字节分别代表什么含义的规则。对于 XRPC 来说,使用了消息头+消息体 的方式制定私有协议。其格式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+----- --+-----+-----+----+---+ | magic code |version | full length | messageType| codec|compress| RequestId | +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+ | | | body | | | | ... ... | +-------------------------------------------------------------------------------------------------------+ 4B magic code(魔法数) 1B version(版本) 4B full length(消息长度) 1B messageType(消息类型) 1B compress(压缩类型) 1B codec(序列化类型) 4B requestId(请求的Id) body(object类型数据)
字段解释 1. magic(魔数) 是通信双方协商的一个暗号,4 个字节,定义在 RpcConstants._MAGIC_NUMBER_。 魔数的作用是用于服务端在接收数据时先解析出魔数做正确性对比。如果和协议中的魔数不匹配,则认为是非法数据,可以直接关闭连接或采取其他措施增强系统安全性。 注意:这只是一个简单的校验,如果有安全性方面的需求,需要使用其他手段,例如 SSL/TLS。 魔数的思想在很多场景中都有体现,如 Java Class 文件开头就存储了魔数 OxCAFEBABE,在 JVM 加载 Class 文件时首先就会验证魔数对的正确性。2. version(版本) 为了应对业务需求的变化,可能需要对自定义协议的结构或字段进行改动。不同版本的协议对应的解析方法也是不同的。所以在生产级项目中强烈建议预留协议版本这个字段。3. full length(消息长度) 记录了整个消息的长度,这个字段是报文拆包的关键。4. messageType(消息类型)
1 2 3 4 5 6 7 byte REQUEST_TYPE = 1 ;byte RESPONSE_TYPE = 2 ;byte HEARTBEAT_REQUEST_TYPE = 3 ;byte HEARTBEAT_RESPONSE_TYPE = 4 ;
5. compress(压缩类型) 序列化的字节流,还可以进行压缩,使得体积更小,在网络传输更快,但是同时会消耗 CPU 资源。 如果使用压缩效果好的序列化器,可以考虑不使用压缩
1 2 3 4 5 6 DUMMY((byte ) 0 , "dummy" ), GZIP((byte ) 1 , "gzip" ), UNZIP((byte ) 2 , "unzip" );
7. serialize(序列化类型) 通过这个类型来确定使用哪种序列化方式,将字节流序列化成对应的对象。 序列化类型定义如下:
1 2 3 HESSIAN((byte ) 1 , "hessian" ), KRYO((byte ) 2 , "kryo" ), PROTOSTUFF((byte ) 3 , "protostuff" );
8. requestId(请求的 Id) 每个请求分配好请求 Id,这样响应数据的时候,才能对的上。使用 4 字节的 int 类型9. body body 里面放具体的数据,通常来说是请求的参数 request、响应的结果 response,再经过序列化、压缩后的字节数组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 @AllArgsConstructor @NoArgsConstructor @Getter @Setter @Builder @ToString public class RpcMessage { private byte messageType; private byte codec; private byte compress; private int requestId; private Object data; } @AllArgsConstructor @NoArgsConstructor @Getter @Builder @ToString public class RpcRequest implements Serializable { private static final long serialVersionUID = 2176648719840392878L ; private String requestId; private String className; private String methodName; private Class<?>[] parameterTypes; private Object[] parameters; private String version; } @AllArgsConstructor @NoArgsConstructor @Getter @Setter @Builder @ToString public class RpcResponse <T > implements Serializable { private static final long serialVersionUID = 715745410605631233L ; private String requestId; private Integer code; private String message; private T data; }
编解码与粘包拆包
TCP/IP 协议,在用户数据量非常小的情况下,极端情况下,一个字节,该 TCP 数据包的有效载荷非常低,传递 100 字节的数据,需要 100 次 TCP 传送,100 次 ACK,在应用及时性要求不高的情况下,将这 100 个有效数据拼接成一个数据包,那会缩短到一个 TCP 数据包,以及一个 ack,有效载荷提高了,带宽也节省了 非极端情况,有可能两个数据包拼接成一个数据包,也有可能一个半的数据包拼接成一个数据包,也有可能两个半的数据包拼接成一个数据包 拆包和粘包是相对的,一端粘了包,另外一端就需要将粘过的包拆开,举个栗子,发送端将三个数据包粘成两个 TCP 数据包发送到接收端,接收端就需要根据应用协议将两个数据包重新组装成三个数据包,还有一种情况就是用户数据包超过了 mss(最大报文长度),那么这个数据包在发送的时候必须拆分成几个数据包,接收端收到之后需要将这些数据包粘合起来之后,再拆开
编码 Encode 编码器相对比较简单,按照协议定义的长度和值进行设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public ByteBuf encode (RpcMessage rpcMessage, ByteBuf out) { try { out.writeBytes(RpcConstants.MAGIC_NUMBER); out.writeByte(RpcConstants.VERSION); out.writerIndex(out.writerIndex() + 4 ); byte messageType = rpcMessage.getMessageType(); out.writeByte(messageType); out.writeByte(rpcMessage.getCodec()); out.writeByte(CompressTypeEnum.GZIP.getCode()); out.writeInt(rpcMessage.getRequestId()); byte [] bodyBytes = null ; int fullLength = RpcConstants.HEAD_LENGTH; if (messageType != RpcConstants.HEARTBEAT_REQUEST_TYPE && messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE) { String codecName = SerializerTypeEnum.getName(rpcMessage.getCodec()); log.info("encode name: [{}] " , codecName); Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class) .getExtension(codecName); bodyBytes = serializer.serialize(rpcMessage.getData()); String compressName = CompressTypeEnum.getName(rpcMessage.getCompress()); Compress compress = ExtensionLoader.getExtensionLoader(Compress.class) .getExtension(compressName); bodyBytes = compress.compress(bodyBytes); fullLength += bodyBytes.length; } if (bodyBytes != null ) { out.writeBytes(bodyBytes); } int writeIndex = out.writerIndex(); out.writerIndex(MAGIC_LENGTH + VERSION_LENGTH); out.writeInt(fullLength); out.writerIndex(writeIndex); } catch (Exception e) { log.error("Encode request error!" , e); } return out; }
解码 decode 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public Object decode (ByteBuf in) { int fullLength = in.readInt(); byte messageType = in.readByte(); byte codecType = in.readByte(); byte compressType = in.readByte(); int requestId = in.readInt(); RpcMessage rpcMessage = RpcMessage.builder() .codec(codecType) .requestId(requestId) .compress(compressType) .messageType(messageType).build(); if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) { rpcMessage.setData(RpcConstants.PING); return rpcMessage; } if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) { rpcMessage.setData(RpcConstants.PONG); return rpcMessage; } int bodyLength = fullLength - RpcConstants.HEAD_LENGTH; byte [] bs = new byte [bodyLength]; in.readBytes(bs); String compressName = CompressTypeEnum.getName(compressType); Compress compress = ExtensionLoader.getExtensionLoader(Compress.class) .getExtension(compressName); bs = compress.decompress(bs); String codecName = SerializerTypeEnum.getName(rpcMessage.getCodec()); Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class) .getExtension(codecName); Object object = serializer.deserialize(messageTypeMap.get(messageType), bs); rpcMessage.setData(object); return rpcMessage; }
Netty 拆包器 使用最为常用的基于长度域拆包器 LengthFieldBasedFrameDecoder 只要自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。
1 new LengthFieldBasedFrameDecoder(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
maxFrameLength:指定包的最大长度,如果超过,直接丢弃 lengthFieldOffset: 描述长度的字段在第几个字节 lengthFieldLength:length 字段本身的长度(几个字节) lengthAdjustment:包的总长度调整,去掉 lengthFieldOffset+lengthFieldLength initialBytesToStrip: 跳过的字节数,之前的几个参数,已经足够识别出整个数据包了。但是很多时候,调用者只关心包的内容,包的头部完全可以丢弃掉,initialBytesToStrip 就是用来告诉 Netty,识别出整个数据包之后,截掉 initialBytesToStrip 之前的数据 因此,这里我们的拆包参数为
1 new LengthFieldBasedFrameDecoder(RpcConstants.MAX_FRAME_LENGTH, 5 , 4 , -9 , 0 );
因为我们还需要检测 魔数 和 版本号,所以 initialBytesToStrip=0,不能去除,当 魔数 和 版本号不符合规定时,拒绝非本协议连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @Slf4j public class Spliter extends LengthFieldBasedFrameDecoder { public Spliter () { this (MAX_FRAME_LENGTH, MAGIC_LENGTH + VERSION_LENGTH, FULL_LENGTH_LENGTH, -(MAGIC_LENGTH + VERSION_LENGTH + FULL_LENGTH_LENGTH), 0 ); } public Spliter (int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { super (maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); } @Override protected Object decode (ChannelHandlerContext ctx, ByteBuf in) throws Exception { Object decoded = super .decode(ctx, in); ByteBuf frame = (ByteBuf) decoded; if (frame.readableBytes() >= RpcConstants.HEAD_LENGTH) { if (!checkMagicNumberAndVersion(in)){ ctx.channel().close(); return null ; } } return decoded; } private boolean checkMagicNumberAndVersion (ByteBuf in) { int len = RpcConstants.MAGIC_NUMBER.length; byte [] bytes = new byte [len]; in.readBytes(bytes); for (int i = 0 ; i < len; i++) { if (bytes[i] != RpcConstants.MAGIC_NUMBER[i]) { log.error("Unknown magic code: " + Arrays.toString(bytes)); return false ; } } byte version = in.readByte(); if (version != RpcConstants.VERSION) { log.error("version isn't compatible" + version); return false ; } return true ; } }
服务端业务 Handler 服务器端需要隔离 EventLoop(Reactor)线程和业务 线程。所以需要使用独立的、异步的线程任务去执行用户验证 的逻辑;而不在 EventLoop 线程中去执行用户验证的逻辑。 在默认情况下,Netty 的一个 EventLoop 实例会开启 2 倍 CPU 核数的内部线程。通常情况下,一个 Netty 服务器端会有几万或者 几十万的连接通道。也就是说,一个 EventLoop 内部线程会负责处理着 几万个或者上十万个通道连接的 IO 处理,而耗时的入站/出站处理越 多,就越会拖慢整个线程的其他 IO 处理,最终导致严重的性能问题。 因此这里我们是用一个独立的异步任务处理队列去处理业务逻辑。
1 2 3 4 5 DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup( RuntimeUtil.getProcessorCount() * 2 , ThreadUtil.newNamedThreadFactory("service-handler-group" , false ) ); ch.pipeline().addLast(serviceHandlerGroup, new NettyServerHandler());
具体的 handler 业务逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class NettyServerHandler extends SimpleChannelInboundHandler <RpcMessage > { private final ServiceProvider serviceProvider; public NettyServerHandler () { serviceProvider = SingletonFactory.getInstance(ServiceProvider.class); } @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { log.info("server receive msg: [{}] " , rpcMessage); byte messageType = rpcMessage.getMessageType(); if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) { rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE); rpcMessage.setData(RpcConstants.PONG); } else { RpcRequest rpcRequest = (RpcRequest) rpcMessage.getData(); Object result = handle(rpcRequest); log.info(String.format("server get result: %s" , result.toString())); rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE); if (ctx.channel().isActive() && ctx.channel().isWritable()) { RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId()); rpcMessage.setData(rpcResponse); } else { RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL); rpcMessage.setData(rpcResponse); log.error("not writable now, message dropped" ); } } ctx.writeAndFlush(rpcMessage).addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { future.channel().close(); log.error("Fail!! Send response for request " + rpcMessage.getRequestId()); } else { log.info("Send response for request " + rpcMessage.getRequestId()); } } }); } } private Object handle (RpcRequest request) { String className = request.getClassName(); String version = request.getVersion(); String serviceKey = ServiceUtil.makeServiceKey(className, version); Object serviceBean = serviceProvider.getService(serviceKey); if (serviceBean == null ) { log.error("Can not find service implement with interface name: {} and version: {}" , className, version); } return invokeCglib(request, serviceBean); }
解析 request 请求服务,在服务端注册服务的时候本地缓存一个服务 Map,从 Map 中找到服务,使用反射调用,并将结果返回,构造 response,并写入到 channal,最后 encode 发送
使用 Netty 来实现 XPRC 客户端 整体架构逻辑 当用户要调用一个远程服务时,给该服务添加@RpcAutowired 注解,那么该服务将自动被替换为其动态代理类,代理中包含从调用-构造 RPC request- 获取连接 channel-编码-发送, 收到回复-拆包-解码-与发送的 request 关联 response-返回调用结果
动态代理逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 @Slf4j public class ObjectProxy <T > implements InvocationHandler { private Class<T> clazz; private String version; public ObjectProxy (Class<T> clazz, String version) { this .clazz = clazz; this .version = version; } public static <T> T createService (Class<T> interfaceClass, String version) { return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new ObjectProxy<T>(interfaceClass, version) ); } @SneakyThrows @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { log.info("client invoked method: [{}]" , method.getName()); RpcRequest rpcRequest = RpcRequest.builder() .methodName(method.getName()) .parameters(args) .parameterTypes(method.getParameterTypes()) .className(method.getDeclaringClass().getName()) .requestId(UUID.randomUUID().toString()) .version(version) .build(); CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) NettyClient.getInstance().sendRequest(rpcRequest); RpcResponse<Object> rpcResponse = completableFuture.get(); this .check(rpcResponse, rpcRequest); return rpcResponse.getData(); } private void check (RpcResponse<Object> rpcResponse, RpcRequest rpcRequest) { if (rpcResponse == null ) { throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, "interfaceName" + ":" + rpcRequest.getMethodName()); } if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) { throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, "interfaceName" + ":" + rpcRequest.getMethodName()); } if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCodeEnum.SUCCESS.getCode())) { throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, "interfaceName" + ":" + rpcRequest.getMethodName()); } } }
发送请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public Object sendRequest (RpcRequest rpcRequest) { CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>(); InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest); Channel channel = getChannel(inetSocketAddress); if (channel.isActive()) { pendingRpcRequests.put(rpcRequest.getRequestId(), resultFuture); RpcMessage rpcMessage = RpcMessage.builder().data(rpcRequest) .codec(SerializerTypeEnum.HESSIAN.getCode()) .compress(CompressTypeEnum.GZIP.getCode()) .requestId(REQUEST_ID.getAndIncrement()) .messageType(RpcConstants.REQUEST_TYPE).build(); channel.writeAndFlush(rpcMessage).addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (future.isSuccess()) { log.info("client send message: [{}]" , rpcMessage); } else { future.channel().close(); resultFuture.completeExceptionally(future.cause()); log.error("Send failed:" , future.cause()); } } }); } else { throw new IllegalStateException(); } return resultFuture; }
Channel 复用与重连 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @SneakyThrows public Channel getChannel (InetSocketAddress inetSocketAddress) { Channel channel = channelProvider.get(inetSocketAddress); if (channel == null ) { CompletableFuture<Channel> completableFuture = new CompletableFuture<>(); channel = doConnect(completableFuture,inetSocketAddress, MAX_RETRY).get(); channelProvider.set(inetSocketAddress, channel); } return channel; } @SneakyThrows public CompletableFuture<Channel> doConnect (CompletableFuture<Channel> completableFuture,InetSocketAddress inetSocketAddress, int retry) { bootstrap.connect(inetSocketAddress).addListener(future -> { if (future.isSuccess()) { log.info("The client has connected [{}] successful!" , inetSocketAddress.toString()); completableFuture.complete(((ChannelFuture) future).channel()); } else if (retry == 0 ) { log.error("the number of retries expired, connect fail. address:" , inetSocketAddress); } else { int now = MAX_RETRY - retry + 1 ; int delay = 1 << now; log.warn("connect fail, attempt to reconnect. retry:" + now); bootstrap.config().group().schedule(() -> doConnect(completableFuture,inetSocketAddress, retry - 1 ), delay, TimeUnit.SECONDS); } }); return completableFuture; }
Client 的 Pipline 流水线 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Slf4j public class NettyClient { private final ServiceDiscovery serviceDiscovery; private final ChannelProvider channelProvider; private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup; private final PendingRpcRequests pendingRpcRequests; private static volatile NettyClient instance = null ; public static NettyClient getInstance () { if (instance == null ) { synchronized (NettyClient.class) { if (instance == null ) { instance = new NettyClient(); } } } return instance; } public NettyClient () { eventLoopGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000 ) .option(ChannelOption.SO_KEEPALIVE, true ) .option(ChannelOption.TCP_NODELAY, true ) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(0 , 5 , 0 , TimeUnit.SECONDS)); ch.pipeline().addLast(new RpcEncoder()); ch.pipeline().addLast(new Spliter()); ch.pipeline().addLast(new RpcDecoder()); ch.pipeline().addLast(new NettyClientHandler()); } }); this .channelProvider = SingletonFactory.getInstance(ChannelProvider.class); this .serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension("zk" ); this .pendingRpcRequests = SingletonFactory.getInstance(PendingRpcRequests.class); } }
Client 接受响应的逻辑处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Override protected void channelRead0 (ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception { log.info("client receive msg: [{}]" , rpcMessage); byte messageType = rpcMessage.getMessageType(); if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) { log.info("heart receive[{}]" , rpcMessage.getData()); } else if (messageType == RpcConstants.RESPONSE_TYPE) { RpcResponse<Object> rpcResponse = (RpcResponse<Object>) rpcMessage.getData(); pendingRpcRequests.complete(rpcResponse); } } public class PendingRpcRequests { public static final Map<String, CompletableFuture<RpcResponse<Object>>> PENDING_RESPONSE_FUTURES = new ConcurrentHashMap<>(); public void put (String requestId, CompletableFuture<RpcResponse<Object>> future) { PENDING_RESPONSE_FUTURES.put(requestId, future); } public void complete (RpcResponse<Object> rpcResponse) { CompletableFuture<RpcResponse<Object>> future = PENDING_RESPONSE_FUTURES.remove(rpcResponse.getRequestId()); if (null != future) { future.complete(rpcResponse); } else { throw new IllegalStateException(); } } }
如何将调用的结果 response 和请求 request 绑定呢? 1、 通过 channel 的 Attributekey 绑定
通 CompletableFuture 包装返回结,使用 request 和 response 统一的 ID 作为 key,服务端收到请求之后,将 RequestId 原封不动写到响应结果中。客户端收到响应结果后,拿出 RequestId 找到对应的 Future 并写入结果。
参考: Netty 入门与实战:仿写微信 IM 即时通讯系统 Netty、Redis、Zookeeper 高并发实战