f10@t's blog

Netty框架学习

字数统计: 5.9k阅读时长: 25 min
2023/01/07

本篇学习一下Netty框架,Netty是一个Java实现的、非常流行的NIO网络编程框架,很多知名项目如a阿里的Dubbo、Spark、ElasticSearch、Hadoop、蚂蚁金服的SOFABolt、谷歌的gRPC等等。学习前建议先掌握Java中NIO相关API的使用,并理解Unix操作系统中I/O多路复用模型的理论。

components

什么是Netty?为了解决什么问题?

Netty

引用一下官方的描述:

Netty is an NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.

Netty是一个NIO客户机-服务器框架,它可以快速方便地开发网络应用程序,如协议服务器和客户端。它大大简化和简化了网络编程,如TCP和UDP套接字服务器。

Netty本身其实是一个基于Java NIO开发的框架,帮助我们快速开发网络应用而不需要在处理通信流程上花心思。也正因为这样,他也不需要啥其他的依赖,就靠JDK自身就行。Netty 3.x版本最低要求为JDK 5而Netty 4.x的最低要求是JDK 6。

解决什么问题呢?

我自己的理解。看似是个简单的封装,实际上Netty是有很多的创新点的。首先,他使得我们免于编写大量关于Java NIO代码的,这部分涉及到熟悉常用类、多线程编程、JDK 1.7及之前中的epoll bug等等,这大大降低了我们开发的复杂度。其次,他的性能是要比Java原生的性能要更好的,具体来说:

  • 更高的吞吐量以及更低的延迟
  • 更低的资源消耗
  • 更少的内存复制(Zero-Copy技术)

那他是怎么解决的呢?我自己的理解总结一句话是这样的:

Netty是基于多路I/O复用机制实现了同步非阻塞I/O模型,此外,通过运用多线程技术,实现了伪异步的能力。

正儿八经的,Netty的出现的理由,除了我上面提到的,就是官方自己的陈述:

...... However, a general purpose protocol or its implementation sometimes does not scale very well. It is like how we don't use a general purpose HTTP server to exchange huge files, e-mail messages, and near-realtime messages such as financial information and multiplayer game data. What's required is a highly optimized protocol implementation that is dedicated to a special purpose. ......

The Netty project is an effort to provide an asynchronous event-driven network application framework and tooling for the rapid development of maintainable high-performance and high-scalability protocol servers and clients.

In other words, Netty is an NIO client server framework that enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server development.

'Quick and easy' does not mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences learned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

大俗话翻译一下:

...... 然而,通用协议及其实现,如HTTP,他的可扩展性不强。比如我们也不是整天起一个HTTP服务器,然后跟别人交换文件、邮件、实时信息等。为啥子?具体问题具体讨论嘛,我们需要根据具体的需求去设计、去优化得到一个定制化的协议,不然要FTP、SMTP啥的干啥?......

Netty是一个基于异步事件驱动的网络应用框架,旨在提供可快速开发的、可维护的、高性能、高可扩展的服务器和客户端。

换言之,Netty是一个使用了NIO的客户端服务器框架爱,能让你快速开发网络应用,比如TCP、UDP套接字开发,肥肠好用。

虽然但是并不是说Netty快、简单,就代表着它可维护性差、性能差,Netty吸取了诸多前辈协议的实现经验,比如FTP、SMTP、HTTP和其他五花八门的协议。总而言之,很牛。

深入浅出——整体架构与基本概念

Netty整体架构

下面我们先深入浅出的看一下Netty是怎么设计的。过程如下:

  • 首先,我们需要了解一下什么是Reactor模式
  • 然后看看Netty大体架构设计、流程以及常用类
  • 看一个简单的Demo,实现时间服务器的功能

不得不提的Reactor模式

Reactor意思是反应堆,这个倒不是Java的常见24种设计模式,只是一种I/O模式。wiki定义如下:

1
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

Reactor模式是一种事件驱动的I/O模式,核心分为服务句柄(service handlers)和请求句柄(reuest handlers),前者可以处理并发到达的多个输入,并通过后者同步地**将请求派发(dispatch)给不同的请求句柄。其实有点类似生产者消费者模型,只不过没有缓存,而是直接派发了。可以用下图理解:

Reactor模式

其中(我们带入Java语境),Service Handler本质是一个Selector,轮询可用Socket,并将I/O事件分配给Request Handler,在线程池中以非阻塞的方式处理用户请求的线程,。下图会更加清晰(来自Java之父 Doug Lea,地址)

经典服务模式

那中间这个Service Handler我们就把他叫做Reactor,是不是就和一个小的反应堆一样在那里工作?Reactor模式具体来说有三种:

  • 单Reactor单线程模式

    在该模式下,单个Reactor线程利用Selector进行工作,当监听到客户端连接时,将利用dispatch函数,首先通过acceptor接收一个客户端,再通过handler进行处理客户端的读写。而acceptor和handler的执行都是在同一个线程中的,而非新建了一个线程(如下代码,直接run而并非新建了Thread)。

    这个模式可以满足一些小容量的场景。但是难以满足高负载、大并发的场景,即可扩展性差

    从上面这个模式可以看到,若handler的流程过长,则会拖累reactor,严重会导致大量客户端连接超时的问题。此外,若该单线程除了问题,则会导致单点失效的问题。因此我们可以

    1. 加快handler的执行,如使用多线程执行不同的非阻塞IO。
    2. 使用多个Reactor,不同Reacor之间负载均衡。

    上面这两个思路都可以提高我们的可扩展性(Scalability),即可以容纳更大的流量。那其实另外两个模式就分别走的是上面这两个思路了。

  • 单Reactor多线程模式

    相较于第一种模式,可以看到,我们在下面增加了一个线程池来处理handler,这个还是比较好理解的。

    绝大数场景下,该模式都可以满足性能要求。但极特殊场景下,由于是单个Reactor线程负责监听和处理客户端连接,因此例如百万客户端并发连接、或存在认证等时间开销较大的流程。那单个Reactor就不够了。就得引入多个Reactor了。

  • 多Reactor多线程模式

这也是Netty经常采用的Reacor模式了,官方也推荐使用该模型。观察一下有两个特点:

  1. 有线程池,即多线程处理客户端请求。
  2. 有两个Reactor,所以也有人把这个叫主从Reactor模式1+M+N 线程模式,每一个Reactor都可以是一个线程池。是业界成熟的服务器程序设计模式

Netty架构概览

Netty整体可以分为三层:服务编排层、事件调度层、网络通信层。如图所示:

Netty三层架构

分别职责如下:

  • 服务编排层:负责组装各类服务,是Netty的核心业务处理链(可以看到Pipeline字样)
  • 事件调度层:通过指定的Reactor线程模型,对活动事件进行处理,具体交由相关Handler完成
  • 网络通信层:监听网络的读写和连接请求,负责将网络层的数据读取到内存缓冲区中。

ok上面叭叭叭一堆类,其实最下面的网络通信层除了ServerBootstrap,看过我文章的应该已经熟悉了。

简单说一下,ServerBootstrap类的创建是Netty服务端启动的第一步,即通过Builder模式构建一个服务端类,配置其相关参数,并绑定端口启动。网络通信层就这些内容了。我们从下到上,再看看另外两层:

事件调度层

上面说到,该层的职责是利用Reactor线程模型对活动事件进行处理,他的具体实现是与我们采用的Reactor线程模型相关的,比如你用了单Reactor单线程呀、单Reactor多线程呀等那三种。具体来说,是通过ServerBootstrap类指定的。

核心用到的类有两个:EventLoopEventLoopGroup。是不是感觉有点TheadThreadGroup那味?差不多。下图是这两个类的关系图:

EventLoopGroup与EventLoop

可以看到而这都继承了EventExecutorGroup接口,而该接口是由JDK提供的ScheduledExecutorServer子接口以及Iterable接口组成的。

本质上,EventLoop可以看做处理多个Channel生命周期内所有I/O时间的线程,而EventLoopGroup则是注册这些线程的线程池。而EventLoopGroup是有很多实现和用法的,这一点我们最后再讨论。总之这里知道,该层作用是根据所设定的Reactor线程模型,利用线程池技术监听和处理网络I/O事件就好了。

服务编排层

我觉得吧,这名字翻译的不太好,我以为是应用层的什么东西在编排,其实就是对信息流的拦截操作。你Servlet、Struts2啥的都有这种类似的设计。

该层负责组装各类服务,是Netty的核心业务处理链,确保网络事件的动态编排和有序传播。你是不是感觉在哪里听过?对的,类似Servlet技术的拦截过滤器javax.servlet.Filter,具体可以看我的文章:JavaEE基础 - Servlet · float's blog (lzwgiter.github.io)

Netty是一样的原理。Netty将Channel抽象为ChannelPipeline,消息在该管道中流动。然后针对这个管道,我们定义了I/O事件的拦截器ChannelHandler(感觉叫ChannelFilter岂不是更好理解),而ChannelPipeline中保存了多个这样的“拦截器"。

既然是事件的拦截器,包含哪些事件呢?由于消息都是I/O的相关消息,所以事件都是与I/O相关的。从大类上可以分为inbound事件和outbound事件。前者通常为由I/O线程触发,后者通常为用户主动发起的网络I/O操作(其实和我们发起一个TCP请求的过程很像,绑定、连接、写、读、关闭)。

  • inbound事件(*代表“ChannelHandlerContext”):
    1. *.fireChannelRegistered() -> Channel注册事件
    2. *.fireChannelActive() -> TCP链路建立成功,Channel激活事件
    3. *.fireChannelRead(Object) -> 读事件
    4. *.fireChannelReadComplete() -> 读操作完成通知事件
    5. *.fireExceptionCaught(Throwable) -> 异常通知事件
    6. *.fireUserEventTriggered(Object) -> 用户自定义事件
    7. *.fireChannelWritabilityChanged() -> Channel的可写状态变化通知事件
    8. *.fireChannelInactive() -> TCP连接关闭,链路不可用通知事件
  • outbound事件(*代表“ChannelHandlerContext”):
    1. *.bind(SocketAddress, ChannelPromise) -> 绑定本地地址事件
    2. *.connect(SocketAddress, SocketAddress, ChannelPromise) -> 连接服务端事件
    3. *.write(Object, ChannelPromise) -> 写事件
    4. *.flush() -> 刷新事件
    5. *.read() -> 读事件
    6. *.disconnect(ChannelPromise) -> 断开连接事件
    7. *.close(ChannelPromise) -> 关闭当前Channel事件

上面这些事件数量很多,但是也很好区分。Inbound那都是In了,肯定是消息进来引发的事件(注册channel、TCP握手、读、TCP关闭等),这些事件对应函数的命名都是fire开头的。outbound那肯定就是消息出去引发的事件了(写、刷新等)。

当我们用户的请求到达Netty后,首先Netty利用底层SocketChannel的read方法读取得到ByteBuf类型的数据。这一步触发了ChannelRead事件,由I/O线程(常用的如NioEventLoop)调用ChannelPipelinefireChannelRead(Object msg)方法,将消息(ByteBuf类型)传递到ChannelPipeline中。

处理后,我们再调用write函数返回请求信息,此时会触发write事件,反方向经过各个拦截器返回。

上述出站和入站的流程如图所示:

ChannelPipeline对I/O事件流的拦截和处理

通常,我们需要自己定义一个拦截器并添加到pipeline中。这个操作在Servlet中,我们需要实现javax.servlet.Filter接口,并实现对应的doFilter方法,然后添加注解@WebFilter或写xml文档就可以了。

那在Netty中(v4版本),对于上述两种不同的事件。对应地,我们需要继承不同的抽象类:

  • inbound事件: ChannelInboundHandlerAdapter,下面是定义以及部分方法,我们只需要重写感兴趣的就行了。方法注释中表明了事件,如channelActive注释中Calls {@link ChannelHandlerContext#fireChannelActive()},这和上面列出的事件是一一对应的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

/**
* Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}

/**
* Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}

/**
* Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}

/**
* Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}

/**
* Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}

/**
* Calls {@link ChannelHandlerContext#fireChannelReadComplete()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}

/**
* Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}

/**
* Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}

/**
* Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
* to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
@SuppressWarnings("deprecation")
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}
  • outbound事件:ChannelOutboundHandlerAdapter,一样的,重写感兴趣的就行了。事件同样和前面列出的相同。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {

/**
* Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}

/**
* Calls {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}

/**
* Calls {@link ChannelHandlerContext#disconnect(ChannelPromise)} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.disconnect(promise);
}

/**
* Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.close(promise);
}

/**
* Calls {@link ChannelHandlerContext#deregister(ChannelPromise)} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}

/**
* Calls {@link ChannelHandlerContext#read()} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}

/**
* Calls {@link ChannelHandlerContext#write(Object, ChannelPromise)} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}

/**
* Calls {@link ChannelHandlerContext#flush()} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}

一个简单的Netty Demo

Maven依赖如下,官网上说版本5已经弃用了,我这里用的最新的Netty 4.1.28。

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>
</dependencies>

我们用Netty框架实现一个服务端的demo。首先我们回想一下,如果你用java的NIO类库,大概流程是啥?

  • 服务端

    1. 创建ServerSocketChannel,绑定地址
    2. 设置为非阻塞模式,开始监听
    3. 创建监听器Selector,注册ServerSocketChannel到该Selector,并注册感兴趣的SelectionKeySelectionKey.OP_ACCEPT
    4. 执行select(),轮询就绪的SocketChannel
    • 若有SocketChannel可用(其状态为OP_ACCEPT),则调用accept()方法接受客户端
      1. 设置新接入的客户端的SocketChannel为非阻塞模式
      2. 将其注册到Selector,感兴趣的SelectionkeySelectionKey.OP_READ
    • 若有SocketChannel可用(其状态为OP_READ),则构造ByteBuffer读取数据包。
    • 若有SocketChannel可用(其状态为OP_WRITE),说明数据包没发完,得继续发送。

是不是有点小复杂,在Netty框架下,上面的事情会简单一些:

  • 服务端
    1. 根据Reactor模式(这里是多actor多线程模型),创建两个NioEventLoopGroup分别处理监听和业务。
    2. 创建ServerBootstrap以快速配置,包括使用的线程组(监听 and 业务)、TCP相关(最大连接数等)、业务Handler。
    3. 绑定端口开始监听。
    4. 若有SocketChannel可用,自动按照我们编写的回调函数处理。此间,读取与写入均使用Netty提供的ByteBuf,相较于ByteBuffer更为方便。

ok结了,是不是相较于原生的用法简单太多了。具体C,S代码如下。

服务端

根据上面提到的流程,我们首先看一下大致流程:

Netty服务创建流程图

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.nio.charset.StandardCharsets;
import java.util.Date;

/**
* @author lzwgiter
* @since 2023/01/09
*/
public class TimeServer {

private static class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 准备接受数据的ByteBuf,该类是Netty对应ByteBuffer提供的
ByteBuf buf = (ByteBuf) msg;
byte[] request = new byte[buf.readableBytes()];
buf.readBytes(request);
String body = new String(request, StandardCharsets.UTF_8).trim();
System.out.println("** 新的请求 ** : " + body);
String result = "QUERY".equalsIgnoreCase(body)
? new Date(System.currentTimeMillis()).toString()
: "Unknown Command";
ByteBuf response = Unpooled.copiedBuffer(result.getBytes(StandardCharsets.UTF_8));
ctx.write(response);
}
}

private static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
}

public void bind(int port) throws Exception {
// Reactor模式
// 配置服务端的NIO线程组(默认线程工厂和线程数量)
EventLoopGroup upperGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置相关信息,Builder模式
// 配置Reactor模式所属线程组
bootstrap.group(upperGroup, workerGroup)
// 配置channel类
.channel(NioServerSocketChannel.class)
// 配置Channel可选项。此处为“可连接队列大小”
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
// 绑定端口,同步等待成功
ChannelFuture f = bootstrap.bind(port).sync();

// 等待服务监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 退出,释放线程池资源
// 区别于shutdownNow(),他还会有个平静期DEFAULT_SHUTDOWN_QUIET_PERIOD,此间若有任务提交他还是会继续工作
// 可以这很优(nei)雅(juan)
upperGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int listenPort = 9999;
new TimeServer().bind(listenPort);
}
}

这里着重注意以下片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Reactor模式
// 配置服务端的NIO线程组(默认线程工厂和线程数量)
EventLoopGroup upperGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置相关信息,Builder模式
// 配置Reactor模式所属线程组
bootstrap.group(upperGroup, workerGroup)
// 配置channel类
.channel(NioServerSocketChannel.class)
// 配置Channel可选项。此处为“可连接队列大小”
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());

回顾一下前面的Reactor模式,我们首先创建了两个默认的NioEventLoopGroup,并在构造ServerBootstrap时传入,代表我们使用是多actor多线程的模型

当然如果是单actor多线程的模型,我们构造方式一样,区别代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
// Reactor模式
// 配置服务端的NIO线程组(默认线程工厂和线程数量)
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置相关信息,Builder模式
// 配置Reactor模式所属线程组
bootstrap.group(workerGroup)
// 配置channel类
.channel(NioServerSocketChannel.class)
// 配置Channel可选项。此处为“可连接队列大小”
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());

再退化一下,那单actor单线程呢?

1
2
3
4
5
6
7
8
9
10
11
// 配置服务端的NIO线程组,线程池容量为1
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置相关信息,Builder模式
// 配置Reactor模式所属线程组
bootstrap.group(workerGroup)
// 配置channel类
.channel(NioServerSocketChannel.class)
// 配置Channel可选项。此处为“可连接队列大小”
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());

是不是很简单了。

客户端

客户端构建大致流程如下:

Netty客户端构建流程

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;

/**
* @author lzwgiter
* @since 2023/01/09
*/
public class TimerClient {
/**
* 连接事件拦截器
*/
private static class ConnectHandler extends ChannelOutboundHandlerAdapter {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
System.out.println("** from ConnectHandler : " + Thread.currentThread().getName() + " 连接成功 **");
super.connect(ctx, remoteAddress, localAddress, promise);
}
}

/**
* 写事件拦截器
*/
private static class WriteHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("** from WriteHandler : " + Thread.currentThread().getName()
+ " 消息: ## " + buf.toString(StandardCharsets.UTF_8)
+ " ## 发送成功 **");
super.write(ctx, msg, promise);
}
}

/**
* 读事件拦截器
*/
private static class ResponseHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
byte[] response = new byte[buf.readableBytes()];
buf.readBytes(response);
System.out.println("** from TimeClientHandler : " + Thread.currentThread().getName()
+ " 消息: ## " + new String(response, StandardCharsets.UTF_8)
+ " ## 接收成功 **");
}
}

/**
* 将我们的handlers注册到pipeline中
*/
private static class HandlersRegister extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ConnectHandler(), new WriteHandler(), new ResponseHandler());
}
}

public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new HandlersRegister());

// 异步连接服务端
ChannelFuture f = bootstrap.connect(host, port).sync();

// 发送消息
if (f.channel().isActive()) {
byte[] request = "QUERY".getBytes(StandardCharsets.UTF_8);
ByteBuf msg = Unpooled.buffer(request.length);
msg.writeBytes(request);
f.channel().writeAndFlush(msg);
}

// 关闭
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
new TimerClient().connect(9999, "127.0.0.1");
}
}

运行结果如下图:

运行结果

客户端和我分别发出了请求,终端中输出表明服务结果没问题。着重看右边的终端,可以看到连接、写、读事件都可以看到对应拦截的日志输出,这也说明我们定义的拦截器发挥作用了。

参考文献

CATALOG
  1. 1. 什么是Netty?为了解决什么问题?
    1. 1.1. Netty
    2. 1.2. 解决什么问题呢?
  2. 2. 深入浅出——整体架构与基本概念
    1. 2.1. Netty整体架构
      1. 2.1.1. 不得不提的Reactor模式
      2. 2.1.2. Netty架构概览
        1. 2.1.2.1. 事件调度层
        2. 2.1.2.2. 服务编排层
    2. 2.2. 一个简单的Netty Demo
      1. 2.2.1. 服务端
      2. 2.2.2. 客户端
  3. 3. 参考文献