本篇是用Netty实现一个RPC中间件的第三篇,未来还会持续更新,代码在最后一片更新完毕之后将会上传到github、gitee代码托管平台,供大家拉取学习,并进行讨论分享,让该RPC框架持续精进。
在Netty中,数据的处理是通过处理器链(pipeline)完成的,这些处理器可以被分类为出站处理器(outbound handlers)和入站处理器(inbound handlers)。出站处理器负责处理从应用程序向网络发送的数据,而入站处理器则处理从网络接收到的应用程序数据。
本篇主要接扫通过构建自定义的处理器,让我们的消息在发送、接收阶段进行一定的逻辑处理,包括心跳处理,编解码,业务处理器等等。
愿我们在技术的浩瀚中游刃有余。
rpc_core
模块
rpc_client
模块
rpc_server
模块
netty使用了一种事件驱动的应用范式,因此数据处理的管道是一个经过处理器的事件链。事件和处理器可以关联到 xxInbound
入站 与 xxOutbound
出站 数据流。
入站和出栈事件都会经过预设的处理器链(多个处理器);
网络传输全是字节形式,而业务逻辑处理是对象形式,所以需要编码器把对象转字节,需要解码器把字节转对象;
ByteToMessageDecoder
字节转消息(对象)解码器;MessageToByteEncoder
消息(对象)转字节编码器;以客户端服务器模式介绍入站与出站处理器的事件处理过程
【图解】
客户端的处理器有:
服务端的处理器有:
补充:多个处理器封装到通道管道 ChannelPipeline;
1)需求描述:
自定义编码器和解码器实现客户端与服务器间的数据传输; 2)通道管道``ChannelPipeline` 可以封装多个处理器;其处理器执行顺序特别重要(前后关系特别重要,如入栈解码处理器要第1个执行,又如出站编码器要最后一个执行),否则客户端与服务器将无法通信(因为事件或数据要经过所有的处理器);类似于如下:
java
代码解读
复制代码
for (event event : events) { handler1(event); handler2(event); handler3(event); }
3)入站与出站处理器执行顺序:
3.1)服务器初始化器,添加处理器到管道;
Netty中的入站事件是指那些从网络流向应用程序的数据或状态变化。这些事件通常是由网络I/O操作触发的,然后由Netty框架传递给用户定义的处理器进行处理。以下是Netty中的一些典型入站事件:
在Netty中,这些事件会被传递给ChannelPipeline中的Inbound Handlers,这些Handlers按照它们在Pipeline中的顺序被调用。每个Handler都有机会处理事件,并可以选择继续传递给下一个Handler或停止传递。这允许开发者构建复杂的事件处理逻辑,同时保持代码的模块化和易于维护。
假设你需要实现一个入站处理器,该处理器用于解析接收到的消息:
整理了一份好像面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafka 面试专题
需要全套面试笔记【点击此处即可】免费获取
java
代码解读
复制代码
public class MessageDecoderHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 检查msg是否是我们需要处理的对象类型 if (msg instanceof ByteBuf) { ByteBuf in = (ByteBuf) msg; try { // 解析ByteBuf中的数据 String receivedMessage = in.toString(CharsetUtil.UTF_8); System.out.println("Received: " + receivedMessage); // 处理数据,例如转发给其他组件 } finally { // 释放ByteBuf资源 in.release(); } } else { // 如果不是我们处理的对象类型,则直接传递给下一个处理器 ctx.fireChannelRead(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 打印异常信息 cause.printStackTrace(); // 关闭连接 ctx.close(); } }
在这个例子中,MessageDecoderHandler
会在接收到数据时解析ByteBuf中的内容,并打印出来。如果解析完成后,会释放ByteBuf资源以避免内存泄漏。
要将入站处理器添加到ChannelPipeline中,可以使用如下方法:
java
代码解读
复制代码
channel.pipeline().addLast(new MessageDecoderHandler());
通过这种方式,你可以根据需要添加多个入站处理器,并且这些处理器会按照它们在Pipeline中的顺序被调用。这种设计使得网络编程更加模块化和灵活。
在Netty中,出站处理器(Outbound Handlers)主要负责处理应用程序发起的网络请求或数据发送。出站处理器通常是按照相反的顺序执行的,即它们按照在ChannelPipeline中的逆序被调用。以下是一些常见的出站事件及其处理方式:
出站处理器通常会实现ChannelOutboundHandler
接口或其子接口。以下是如何处理这些事件的一些示例:
假设你需要实现一个出站处理器,该处理器在数据被发送前对其进行加密:
java
代码解读
复制代码
public class EncryptionOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 检查msg是否是我们需要加密的对象类型 if (msg instanceof String) { String message = (String) msg; byte[] encryptedMessage = encrypt(message); // 自定义的加密逻辑 ByteBuf encoded = ctx.alloc().buffer(); encoded.writeBytes(encryptedMessage); ctx.write(encoded, promise); } else { // 如果不是我们处理的对象类型,则直接传递给下一个处理器 ctx.write(msg, promise); } } private byte[] encrypt(String message) { // 加密逻辑 return message.getBytes(); // 示例中简单地转换为字节数组 } @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
在这个例子中,EncryptionOutboundHandler会在数据被发送之前进行加密处理,并使用ctx.write()
方法将加密后的数据传递给下一个出站处理器。如果没有需要加密的对象,则直接传递给下一个处理器。
要将出站处理器添加到ChannelPipeline中,可以使用如下方法:
java
代码解读
复制代码
channel.pipeline().addLast(new EncryptionOutboundHandler());
服务器端的RpcRequestMessageHandler
处理器
java
代码解读
复制代码
/** * RPC请求处理器 * @author XiaoSheng * @date 2024/8/21 上午11:26 */ @Slf4j @ChannelHandler.Sharable public class RpcRequestMessageHandler extends SimpleChannelInboundHandler
客户端的RpcResponseMessageHandler
处理器
java
代码解读
复制代码
/** * @author XiaoSheng * @date 2024/8/22 上午11:48 */ @Slf4j @ChannelHandler.Sharable public class RpcResponseMessageHandler extends SimpleChannelInboundHandler
通过上述代码可以发现,这里我们都是将处理器类直接继承SimpleChannelInboundHandler
类(这里也可以继承ChannelInboundHandlerAdapter
来实现自定义的类),并且通过重写channelRead0()
方法来编写业务逻辑代码。
这里也对SimpleChannelInboundHandler
类进行介绍
SimpleChannelInboundHandler
是 Netty 中的一个抽象类,用于处理入站数据(即从客户端到服务器的数据)。它是 ChannelInboundHandlerAdapter
的一个简化版本,专门用于处理某种类型的消息。它自动释放消息,以避免内存泄漏。
io.netty.channel.SimpleChannelInboundHandler
是一个``ChannelInboundHandler`,它允许只处理一种明确的消息,这个消息的类型由泛型参数指定,例如String。ChannelPipeline
中后续的``ChannelHandler`,这个特性是有帮助的。SimpleChannelInboundHandler
子类的实例的时候,调用SimpleChannelInboundHandler
的构造函数SimpleChannelInboundHandler(boolean autoRelease)
,将构造函数中的参数autoRelease
的值设置为false
。如果不设置,默认为true。SimpleChannelInboundHandler
这个是一个抽象类,一个必须子类实现的函数是channelRead0(ChannelHandlerContext ctx, I msg)
。但这个函数不是ChannelInboundHandler
中的方法,而是SimpleChannelInboundHandler
自己增加的方法。channelRead0(ChannelHandlerContext ctx, I msg)
这个方法会被channelRead(ChannelHandlerContext ctx, Object msg)
方法调用。Channel
从对端读取到消息后,会调用channelRead(ChannelHandlerContext ctx, Object msg)
方法,而channelRead(ChannelHandlerContext ctx, Object msg)
方法会调用channelRead0(ChannelHandlerContext ctx, I msg)
方法,所以SimpleChannelInboundHandler
的子类实现channelRead0(ChannelHandlerContext ctx, I msg)
方法即可。我们看看SimpleChannelInboundHandler
中channelRead(ChannelHandlerContext ctx, Object msg)
方法的实现:
java
代码解读
复制代码
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
上面源代码中,channelRead0(ctx, imsg)这条语句就是调用了被子类实现的channelRead0方法。
这个if判断语句中,autoRelease就是构造函数传入的值,如果不传的话默认为true。当传入channelRead函数的消息类型跟泛型参数中的类型一致时,这个消息可以被处理,此时release的值为true。如果传入的消息跟泛型参数中指定的类型不同时,release的值被设置为false,这个消息不会被处理,只会简单地传递给ChannelPipeline
中后续的ChannelHandler
处理,这就是所谓的只明确处理一种消息的含义。
下面再分析下if条件判断的逻辑:
java
代码解读
复制代码
if (autoRelease && release) { ReferenceCountUtil.release(msg); }
ReferenceCountUtil.release(msg)
这个函数的作用就是将msg的引用计数减少1。如果引用计数减少到0,那么就将msg释放。当然,前提是msg实现了io.netty.util.ReferenceCounted
接口。如果没有实现该接口,那么ReferenceCountUtil.release(msg)
这条语句等于什么也没有做。
SimpleChannelInboundHandler
是一个泛型类,需要指定处理的消息类型。例如,如果你想处理字符串消息,你可以指定 String
作为泛型类型。
channelRead0(ChannelHandlerContext ctx, I msg)
ctx
是 ChannelHandlerContext
对象,用于与 ChannelPipeline
交互。msg
是接收到的消息。SimpleChannelInboundHandler
处理特定类型的消息,使用泛型指定。channelRead0
方法完成后自动释放消息,避免内存泄漏。SimpleChannelInboundHandler
中的代码通常在 I/O 线程中执行,确保线程安全。exceptionCaught
方法中处理异常,避免未捕获的异常导致应用程序崩溃。通过使用 SimpleChannelInboundHandler
,你可以简化处理入站消息的代码,提高代码的可读性和可维护性。
rpc_core
模块的HeartBeatClientHandler
客户端心跳消息处理器
java
代码解读
复制代码
/** * 客户端的心跳 * @author XiaoSheng * @date 2024/8/21 上午10:40 */ @Slf4j @ChannelHandler.Sharable public class HeartBeatClientHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; IdleState idleState = event.state(); // 长时间没有写入数据,发送心跳包 if (idleState == IdleState.WRITER_IDLE) { //获取ip log.debug("发送心跳包 {}", ctx.channel().remoteAddress()); PingMessage pingMessage = new PingMessage(); pingMessage.setMessageType(PingMessage.PingMessage); pingMessage.setSequenceId(0); ctx.writeAndFlush(pingMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } return; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.debug("远程调用出错"); cause.printStackTrace(); ctx.close(); super.exceptionCaught(ctx, cause); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered"); ctx.close(); super.channelUnregistered(ctx); } }
rpc_core
模块的HeartBeatClientHandler
服务端心跳消息处理器
java
代码解读
复制代码
/** * @author XiaoSheng * @date 2024/8/21 上午11:21 */ @Slf4j @ChannelHandler.Sharable public class HeartBeatServerHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.debug("长时间没有收到消息了,断开连接"); ctx.close(); } } super.userEventTriggered(ctx, evt); } }
上述的心跳消息,使用了继承ChannelDuplexHandler
,并重写userEventTriggered(ChannelHandlerContext ctx, Object evt)
下面对ChannelDuplexHandler
进行讲解:
ChannelDuplexHandler
是 Netty 中一个非常重要的类,它继承自 ChannelInboundHandlerAdapter
和 ChannelOutboundHandlerAdapter
,用于同时处理入站和出站的事件。通常情况下,处理入站和出站事件需要分别实现 ChannelInboundHandler
和 ChannelOutboundHandler
接口,而 ChannelDuplexHandler
允许你在一个类中同时处理这两种类型的事件。
ChannelDuplexHandler
继承自 ChannelInboundHandlerAdapter
,而 ChannelInboundHandlerAdapter
又实现了 ChannelInboundHandler
接口。因此,ChannelDuplexHandler
主要用于:
以下是 ChannelDuplexHandler
中常用的方法,这些方法可以被重写来处理不同的事件:
channelRead(ChannelHandlerContext ctx, Object msg)
: 读取数据。channelActive(ChannelHandlerContext ctx)
: 处理通道激活事件(连接建立)。channelInactive(ChannelHandlerContext ctx)
: 处理通道非激活事件(连接关闭)。exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
: 处理异常。write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
: 写入数据。flush(ChannelHandlerContext ctx)
: 刷新数据。close(ChannelHandlerContext ctx, ChannelPromise promise)
: 关闭通道。userEventTriggered
方法是该类中的一个方法,用于处理用户自定义事件(user events)。这是 Netty 提供的一种机制,允许用户在管道(pipeline)中触发和处理自定义事件。
用户事件是在 ChannelHandlerContext.fireUserEventTriggered(Object event)
被调用时触发的。这些事件可以是任何你想要传递给处理器链下游的对象。通常情况下,这些事件被用来传递非标准的、应用程序特定的信息或通知。
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
参数
ctx
:ChannelHandlerContext
对象,提供了与 Channel 及其管道进行交互的各种操作。evt
:用户自定义事件,通常是一个对象,可以是任何类型。抛出
Exception
:方法可以抛出异常,异常将由 Netty 的异常处理机制捕获和处理。示例二
以下是一个简单的示例,展示了如何使用 ChannelDuplexHandler
来同时处理入站和出站事件:
java
代码解读
复制代码
public class MyDuplexHandler extends ChannelDuplexHandler { // 处理入站事件:读取数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Inbound: Received message: " + msg); // 将消息传递给下一个处理器 ctx.fireChannelRead(msg); } // 处理入站事件:通道激活 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Inbound: Channel is active"); // 调用父类方法继续处理 super.channelActive(ctx); } // 处理入站事件:异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.err.println("Inbound: Exception caught: " + cause.getMessage()); // 关闭通道 ctx.close(); } // 处理出站事件:写入数据 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("Outbound: Writing message: " + msg); // 将消息传递给下一个处理器 ctx.write(msg, promise); } // 处理出站事件:刷新数据 @Override public void flush(ChannelHandlerContext ctx) throws Exception { System.out.println("Outbound: Flushing data"); // 调用父类方法继续处理 super.flush(ctx); } // 处理出站事件:关闭通道 @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("Outbound: Closing channel"); // 调用父类方法继续处理 super.close(ctx, promise); } }
示例二
java
代码解读
复制代码
/** * 客户端的心跳handler * * @author chenlei */ @Slf4j public class HeartBeatClientHandler extends ChannelDuplexHandler { /** * idlStatus写事件 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; IdleState state = event.state(); //长时间没有写入数据 发送心跳包 if (state == IdleState.WRITER_IDLE) { //获取ip log.debug("发送心跳包 {}", ctx.channel().remoteAddress()); log.error("发送心跳包 {}", ctx.channel().remoteAddress()); PingMessage message = new PingMessage(); message.setSequenceId(0); message.setMessageType(Message.PingMessage); ctx.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } return; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.debug("远程调用出错"); cause.printStackTrace(); ctx.close(); super.exceptionCaught(ctx, cause); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered"); ctx.close(); super.channelUnregistered(ctx); } }
示例三
java
代码解读
复制代码
public class MyHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof HeartbeatEvent) { // 假设 HeartbeatEvent 是自定义的心跳事件 HeartbeatEvent heartbeat = (HeartbeatEvent) evt; System.out.println("Received heartbeat: " + heartbeat); } else { super.userEventTriggered(ctx, evt); } } }
在这个例子中,我们创建了一个名为 MyHandler
的自定义处理器,它覆盖了 userEventTriggered
方法,并检查传入的事件是否为 HeartbeatEvent
类型。
super.userEventTriggered(ctx, evt);
将事件传递给下一个处理器。ChannelHandlerContext.fireUserEventTriggered(Object event)
来向处理器链中发送用户事件。继续回到我们自定义的心跳处理类中的userEventTriggered
方法;
java
代码解读
复制代码
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; IdleState idleState = event.state(); // 长时间没有写入数据,发送心跳包 if (idleState == IdleState.WRITER_IDLE) { //获取ip log.debug("发送心跳包 {}", ctx.channel().remoteAddress()); PingMessage pingMessage = new PingMessage(); pingMessage.setMessageType(PingMessage.PingMessage); pingMessage.setSequenceId(0); ctx.writeAndFlush(pingMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } return; }
这里通过结合IdleStateHandler
处理器,将在写空闲15s后触发
IdleStateHandler
调用源码如下:
最后通过定时任务对任务进行执行。
上面就是关于Netty实现RPC中出入、站处理器的内容讲解,中间也介绍了SimpleChannelInbound
、ChannelDuplexHandler
的核心内容
上一篇:Linux常用命令及组件