基于 Netty 服务端快速了解核心组件

由于Netty优秀的设计和封装,开发一个高性能网络程序就变得非常简单,本文从一个简单的服务端落地简单介绍一下Netty中的几个核心组件,希望对你有帮助。

快速落地一个服务端

我们希望通过Netty快速落地一个简单的主从reactor模型,由主reactor对应的线程组接收连接交由acceptor创建连接,与之建立的客户端的读写事件都会交由从reactor对应的线程池处理:

基于此设计,我们通过Netty写下如下代码,可以看到我们做了如下几件事:

声明一个服务端创建引导类ServerBootstrap ,负责配置服务端及其启动工作。声明主从reactor线程组,其中boss可以看作监听端口接收新连接的线程组,而work则是负责处理客户端数据读写的线程组。基于上述线程池作为group的入参完成主从reactor模型创建。通过channel函数指定server channe为NioServerSocketChannel即采用NIO模型,而NioServerSocketChannel我们可以直接理解为serverSocket的抽象表示。通过childHandler方法给引导设置每一个连接数据读写的处理器handler。

最后调用bind启动服务端并通过addListener对连接结果进行异步监听:

复制public static void main(String[] args) { //1. 声明一个服务端创建引导类 ServerBootstrap serverBootstrap = new ServerBootstrap(); //2. 声明主从reactor线程组 NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()); serverBootstrap.group(boss, worker)//3. 基于上述线程池创建主从reactor模型 .channel(NioServerSocketChannel.class)//server channel采用NIO模型 .childHandler(new ChannelInitializer<NioSocketChannel>() {//添加客户端读写请求处理器到subreactor中 @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 对于ChannelInboundHandlerAdapter,收到消息后会按照顺序执行即 A -> B->ServerHandler ch.pipeline().addLast(new InboundHandlerA()) .addLast(new InboundHandlerB()) .addLast(new ServerHandler()); // 处理写数据的逻辑,顺序是反着的 B -> A ch.pipeline().addLast(new OutboundHandlerA()) .addLast(new OutboundHandlerB()) .addLast(new OutboundHandlerC()); ch.pipeline().addLast(new ExceptionHandler()); } }); //绑定8080端口并设置回调监听结果 serverBootstrap.bind("127.0.0.1", 8080) .addListener(f -> { if (f.isSuccess()) { System.out.println("连接成功"); } }); }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.

对于客户端的发送的数据,我们都会通过ChannelInboundHandlerAdapter添加顺序处理,就如代码所示我们的执行顺序为InboundHandlerA->InboundHandlerB->ServerHandler,对此我们给出InboundHandlerA的代码,InboundHandlerB内容一样就不展示了:

复制public class InboundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerA : " + ((ByteBuf)msg).toString(StandardCharsets.UTF_8)); //将当前的处理过的msg转交给pipeline的下一个ChannelHandler super.channelRead(ctx, msg); } }1.2.3.4.5.6.7.8.9.

而ServerHandler的则是:

客户端与服务端建立连接,对应客户端channel被激活,触发channelActive方法。ChannelHandlerContext 的 Channel 已注册到其 EventLoop 中,执行channelRegistered。将 ChannelHandler 添加到实际上下文并准备好处理事件后调用。

解析客户端的数据然后回复Hello Netty client :

复制private static class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channel被激活,执行channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("执行channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("执行handlerAdded"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; //打印读取到的数据 System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(StandardCharsets.UTF_8)); // 回复客户端数据 System.out.println(new Date() + ": 服务端写出数据"); //组装数据并发送 ByteBuf out = getByteBuf(ctx); ctx.channel().writeAndFlush(out); super.channelRead(ctx, msg); } private ByteBuf getByteBuf(ChannelHandlerContext ctx) { ByteBuf buffer = ctx.alloc().buffer(); byte[] bytes = "Hello Netty client ".getBytes(StandardCharsets.UTF_8); buffer.writeBytes(bytes); return buffer; } //...... }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.

我们通过telnet 对应ip和端口后发现服务端输出如下内容,也很我们上文说明的一致:

复制执行handlerAdded 执行channelRegistered 端口绑定成功,channel被激活,执行channelActive1.2.3.

然后我们发送消息 1,可以看到触发所有inbound的channelRead方法:

复制InBoundHandlerA : 1 InBoundHandlerB: 1 Wed Jul 24 00:05:18 CST 2024: 服务端读到数据 -> 11.2.3.

然后我们回复hello netty client,按照添加的倒叙触发OutBoundHandler:

复制Wed Jul 24 00:05:18 CST 2024: 服务端写出数据 OutBoundHandlerC: Hello Netty client OutBoundHandlerB: Hello Netty client OutBoundHandlerA: Hello Netty client 1.2.3.4.

详解Netty中的核心组件

channel接口

channel是Netty对于底层class socket中的bind、connect、read、write等原语的封装,简化了我们网络编程的复杂度,同时Netty也提供的各种现成的channel,我们可以根据个人需要自行使用。 下面便是笔者比较常用的Tcp或者UDP中比较常用的几种channel。

NioServerSocketChannel:基于NIO选择器处理新连接。EpollServerSocketChannel:使用 linux EPOLL Edge 触发模式实现最大性能的实现。NioDatagramChannel:发送和接收 AddressedEnvelope 的 NIO 数据报通道。EpollDatagramChannel:使用 linux EPOLL Edge 触发模式实现最大性能的 DatagramChannel 实现。EventLoop接口

在Netty中,所有channel都会注册到某个eventLoop上, 每一个EventLoopGroup中有一个或者多个EventLoop,而每一个EventLoop都绑定一个线程,负责处理一个或者多个channel的事件:

这里我们也简单的给出NioEventLoop中的run方法,它继承自SingleThreadEventExecutor,我们可以大概看到NioEventLoop的核心逻辑本质就是轮询所有注册到NioEventLoop上的channel(socket的抽象)是否有其就绪的事件,然后

复制 @Override protected void run() { for (;;) { try { //基于selectStrategy轮询查看是否有就绪事件 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); //...... if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; //根据IO配比执行网络IO事件方法processSelectedKeys以及其他事件方法runAllTasks if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //...... } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.
pipeline和channelHandler以channelHandlerContext

每一个channel的事件都会交由channelHandler处理,而负责同一个channel的channelHandler都会交由pipeline一条逻辑链进行连接,这两者的的关系都会一一封装成channelHandlerContext,channelHandlerContext主要是负责当前channelHandler和与其同一条channelpipeline上的其他channelHandler之间的交互。

举个例子,当我们收到客户端的写入数据时,这些数据就会交由pipeline上的channelHandler处理,如下图所示,从第一个channelHandler处理完成之后,每个channelHandlerContext就会将消息转交到当前pipeline的下一个channelHandler处理:

假设我们的channelHandler执行完ChannelActive后,如希望继续传播则会调用fireChannelActive:

复制 @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("端口绑定成功,channel被激活,执行channelActive"); ctx.fireChannelActive() }1.2.3.4.5.

查看其内部逻辑即可知晓,它就是通过AbstractChannelHandlerContext得到pipeline的下一个ChannelHandler并执行其channelActive方法:

复制 @Override public ChannelHandlerContext fireChannelActive() { final AbstractChannelHandlerContext next = findContextInbound(); invokeChannelActive(next); return this; }1.2.3.4.5.6.
回调的思想

我们可以说回调其实是一种设计思想,Netty对于连接或者读写操作都是异步非阻塞的,所以我们希望在连接被建立进行一些响应的处理,那么Netty就会在连接建立的时候方法暴露一个回调方法供用户实现个性逻辑。

例如我们的channel连接被建立时,其底层就会调用invokeChannelActive获取我们绑定的ChannelInboundHandler并执行其channelActive方法:

复制private void invokeChannelActive() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); } }1.2.3.4.5.6.7.8.9.10.11.

于是就会调用到我们服务端ServerHandler 的channelActive方法:

复制private static class ServerHandler extends ChannelInboundHandlerAdapter { //...... @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("端口绑定成功,channel被激活,执行channelActive"); } //...... }1.2.3.4.5.6.7.8.9.10.
Future异步监听

为保证网络服务器执行的效率,Netty大部分网络IO操作都采用异步的,以笔者建立连接设置的监听器为例,当前连接成功后,就会返回给监听器一个java.util.concurrent.Future,我们就可以通过这个f获取连接的结果是否成功:

复制//绑定8080端口并设置回调监听结果 serverBootstrap.bind("127.0.0.1", 8080) .addListener(f -> { if (f.isSuccess()) { System.out.println("连接成功"); } });1.2.3.4.5.6.7.

我们步入DefaultPromise的addListener即可发现其内部就是添加监听后判断这个连接的异步任务Future是否完成,如果完成调用notifyListeners回调我们的监听器的逻辑:

复制@Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { //...... //添加监听 synchronized (this) { addListener0(listener); } //连接任务完成,通知监听器 if (isDone()) { notifyListeners(); } return this; }1.2.3.4.5.6.7.8.9.10.11.12.13.14.

THE END
本站服务器由亿华云赞助提供-企业级高防云服务器