Netty学习笔记
官方文档:https://netty.io/
Reactor模型
将关注的 I/O 事件注册到多路复用器上,一旦有 I/O 事件触发,将事件分发到事件处理器中,执行就绪 I/O 事件对应的处理函数中。模型中有三个重要的组件:
- 多路复用器:由操作系统提供接口,Linux 提供的 I/O 复用接口有select、poll、epoll 。
- 事件分离器:将多路复用器返回的就绪事件分发到事件处理器中。
- 事件处理器:处理就绪事件处理函数。
Reactor 有 3 种模型实现:
- 单 Reactor 单线程模型
- 单 Reactor 多线程模型
- 多 Reactor 多线程模型
单 Reactor 单线程模型
这是最基础的单 Reactor 单线程模型。
Reactor 线程,负责多路分离套接字。
- 有新连接到来触发
OP_ACCEPT
事件之后, 交由 Acceptor 进行处理。 - 有 IO 读写事件之后,交给 Handler 处理。
Acceptor 主要任务是构造 Handler 。
- 在获取到 Client 相关的 SocketChannel 之后,绑定到相应的 Handler 上。
- 对应的 SocketChannel 有读写事件之后,基于 Reactor 分发,Handler 就可以处理了。
注意,所有的 IO 事件都绑定到 Selector 上,由 Reactor 统一分发。
该模型适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。
单 Reactor 多线程模型
相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到 IO 的读写事件之后,交由线程池来处理,这样可以减小主 Reactor 的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。
多 Reactor 多线程模型
第三种模型比起第二种模型,是将 Reactor 分成两部分:
- mainReactor 负责监听 ServerSocketChannel ,用来处理客户端新连接的建立,并将建立的客户端的 SocketChannel 指定注册给 subReactor 。
- subReactor 维护自己的 Selector ,基于 mainReactor 建立的客户端的 SocketChannel 多路分离 IO 读写事件,读写网络数据。对于业务处理的功能,另外扔给 worker 线程池来完成。
Netty架构
- Core:核心部分,是底层的网络通用抽象和部分实现。
- Extensible Event Model :可拓展的事件模型。Netty 是基于事件模型的网络应用框架。
- Universal Communication API :通用的通信 API 层。Netty 定义了一套抽象的通用通信层的 API 。
- Zero-Copy-Capable Rich Byte Buffer :支持零拷贝特性的 Byte Buffer 实现。
- Transport Services:传输( 通信 )服务,具体的网络传输的定义与实现。
- Socket & Datagram :TCP 和 UDP 的传输实现。
- HTTP Tunnel :HTTP 通道的传输实现。
- In-VM Piple :JVM 内部的传输实现。
- Protocol Support :协议支持。Netty 对于一些通用协议的编解码实现。例如:HTTP、Redis、DNS 等等。
重点模块
- 本图省略非主要依赖。例如,
handler-proxy
对codec
有依赖,但是并未画出。 - 本图省略非主要的项目。例如,
resolver
、testsuite
、example
等等。
common
common
项目,该项目是一个通用的工具类项目,几乎被所有的其它项目依赖使用,它提供了一些数据类型处理工具类,并发编程以及多线程的扩展,计数器等等通用的工具类。
buffer
buffer
项目,实现了 Netty 架构图中的 Zero-Copy-Capable Rich Byte Buffer,该项目下是 Netty 自行实现的一个 Byte Buffer 字节缓冲区。该包的实现相对于 JDK 自带的 ByteBuffer 有很多优点:无论是 API 的功能,使用体验,性能都要更加优秀。它提供了**一系列( 多种 )**的抽象定义以及实现,以满足不同场景下的需要。
transport
transport
项目,实现了 Netty 架构图中 Transport Services、Universal Communication API 和 Extensible Event Model 等多部分内容,该项目是网络传输通道的抽象和实现。它定义通信的统一通信 API ,统一了 JDK 的 OIO、NIO ( 不包括 AIO )等多种编程接口。
另外,它提供了多个子项目,实现不同的传输类型。例如:transport-native-epoll
、transport-native-kqueue
、transport-rxtx
、transport-udt
和 transport-sctp
等等。
codec
codec
项目,该项目是协议编解码的抽象与部分实现:JSON、Google Protocol、Base64、XML 等等。
另外,它提供了多个子项目,实现不同协议的编解码。例如:codec-dns
、codec-haproxy
、codec-http
、codec-http2
、codec-mqtt
、codec-redis
、codec-memcached
、codec-smtp
、codec-socks
、codec-stomp
、codec-xml
等等。
handler
handler
项目,该项目是提供内置的连接通道处理器( ChannelHandler )实现类。例如:SSL 处理器、日志处理器等等。
example
example
项目,该项目是提供各种 Netty 使用示例。
其它模块
all
:All In One 的pom
声明。bom
:Netty Bill Of Materials 的缩写 。microbench
:微基准测试。resolver
:终端( Endpoint ) 的地址解析器。resolver-dns
tarball
:All In One 打包工具。testsuite
:测试集。testsuite-autobahhn
testsuite-http2
testsuite-osgi
核心组件
Netty 有如下几个核心组件:
- Bootstrap & ServerBootstrap
- Channel
- ChannelFuture
- EventLoop & EventLoopGroup
- ChannelHandler
- ChannelPipeline
Bootstrap & ServerBootstrap
这 2 个类都继承了AbstractBootstrap,因此它们有很多相同的方法和职责。它们都是启动器,能够帮助 Netty 使用者更加方便地组装和配置 Netty ,也可以更方便地启动 Netty 应用程序。相比使用者自己从头去将 Netty 的各部分组装起来要方便得多,降低了使用者的学习和使用成本。它们是我们使用 Netty 的入口和最重要的 API ,可以通过它来连接到一个主机和端口上,也可以通过它来绑定到一个本地的端口上。
它们和其它组件之间的关系是它们将 Netty 的其它组件进行组装和配置,所以它们会组合和直接或间接依赖其它的类。
Bootstrap 用于启动一个 Netty TCP 客户端,或者 UDP 的一端。
- 通常使用
#connet(...)
方法连接到远程的主机和端口,作为一个 Netty TCP 客户端。 - 也可以通过
#bind(...)
方法绑定本地的一个端口,作为 UDP 的一端。 - 仅仅需要使用一个 EventLoopGroup 。
ServerBootstrap 往往是用于启动一个 Netty 服务端。
- 通常使用
#bind(...)
方法绑定本地的端口上,然后等待客户端的连接。 - 使用两个 EventLoopGroup 对象( 当然这个对象可以引用同一个对象 ):第一个用于处理它本地 Socket 连接的 IO 事件处理,而第二个责负责处理远程客户端的 IO 事件处理。
Channel
Channel 是 Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能,如获取该 Channel 的 EventLoop 。
EventLoop && EventLoopGroup
Netty 基于事件驱动模型,使用不同的事件来通知我们状态的改变或者操作状态的改变。它定义了在整个连接的生命周期里当有事件发生的时候处理的核心抽象。
Channel 为Netty 网络操作抽象类,EventLoop 负责处理注册到其上的 Channel 处理 I/O 操作,两者配合参与 I/O 操作。
EventLoopGroup 是一个 EventLoop 的分组,它可以获取到一个或者多个 EventLoop 对象,因此它提供了迭代出 EventLoop 对象的方法。
- 一个 EventLoopGroup 包含一个或多个 EventLoop ,即 EventLoopGroup : EventLoop =
1 : n
。 - 一个 EventLoop 在它的生命周期内,只能与一个 Thread 绑定,即 EventLoop : Thread =
1 : 1
。 - 所有有 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,从而保证线程安全,即 Thread : EventLoop =
1 : 1
。 - 一个 Channel 在它的生命周期内只能注册到一个 EventLoop 上,即 Channel : EventLoop =
n : 1
。 - 一个 EventLoop 可被分配至一个或多个 Channel ,即 EventLoop : Channel =
1 : n
。
当一个连接到达时,Netty 就会创建一个 Channel,然后从 EventLoopGroup 中分配一个 EventLoop 来给这个 Channel 绑定上,在该 Channel 的整个生命周期中都是有这个绑定的 EventLoop 来服务的。
ChannelFuture
Netty 为异步非阻塞,即所有的 I/O 操作都为异步的,因此,我们不能立刻得知消息是否已经被处理了。Netty 提供了 ChannelFuture 接口,通过该接口的 #addListener(...)
方法,注册一个 ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果。
ChannelHandler
ChannelHandler ,连接通道处理器,我们使用 Netty 中最常用的组件。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
ChannelHandler 有两个核心子类 ChannelInboundHandler 和 ChannelOutboundHandler,其中 ChannelInboundHandler 用于接收、处理入站( Inbound )的数据和事件,而 ChannelOutboundHandler 则相反,用于接收、处理出站( Outbound )的数据和事件。
- ChannelInboundHandler 的实现类还包括一系列的 Decoder 类,对输入字节流进行解码。
- ChannelOutboundHandler 的实现类还包括一系列的 Encoder 类,对输入字节流进行编码。
ChannelDuplexHandler 可以同时用于接收、处理入站和出站的数据和时间。
ChannelHandler 还有其它的一系列的抽象实现 Adapter ,以及一些用于编解码具体协议的 ChannelHandler 实现类。
ChannelPipeline
ChannelPipeline 为 ChannelHandler 的链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API 。一个数据或者事件可能会被多个 Handler 处理,在这个过程中,数据或者事件经流 ChannelPipeline ,由 ChannelHandler 处理。在这个处理过程中,一个 ChannelHandler 接收数据后处理完成后交给下一个 ChannelHandler,或者什么都不做直接交给下一个 ChannelHandler。
- 当一个数据流进入 ChannelPipeline 时,它会从 ChannelPipeline 头部开始,传给第一个 ChannelInboundHandler 。当第一个处理完后再传给下一个,一直传递到管道的尾部。
- 与之相对应的是,当数据被写出时,它会从管道的尾部开始,先经过管道尾部的“最后”一个ChannelOutboundHandler ,当它处理完成后会传递给前一个 ChannelOutboundHandler 。
当 ChannelHandler 被添加到 ChannelPipeline 时,它将会被分配一个 ChannelHandlerContext ,它代表了 ChannelHandler 和 ChannelPipeline 之间的绑定。其中 ChannelHandler 添加到 ChannelPipeline 中,通过 ChannelInitializer 来实现,过程如下:
- 一个 ChannelInitializer 的实现对象,被设置到了 BootStrap 或 ServerBootStrap 中。
- 当
ChannelInitializer#initChannel()
方法被调用时,ChannelInitializer 将在 ChannelPipeline 中创建一组自定义的 ChannelHandler 对象。 - ChannelInitializer 将它自己从 ChannelPipeline 中移除。
Bootstrap 详解
服务端-ServerBootstrap 示例
客户端-Bootstrap 示例
EventLoop 详解
服务端-EventLoop
// 创建两个 EventLoopGroup 对象
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 创建 boss 线程组 用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建 worker 线程组 用于进行 SocketChannel 的数据读写
// 创建 ServerBootstrap 对象
ServerBootstrap b = new ServerBootstrap();
// 设置使用的 EventLoopGroup
b.group(bossGroup, workerGroup);
- 对于 Netty NIO 服务端来说,创建两个 EventLoopGroup 。
bossGroup
对应 Reactor 模式的 mainReactor ,用于服务端接受客户端的连接。比较特殊的是,传入了方法参数nThreads = 1
,表示只使用一个 EventLoop ,即只使用一个 Reactor 。这个也符合我们上面提到的,“通常,mainReactor 只需要一个,因为它一个线程就可以处理”。workerGroup
对应 Reactor 模式的 subReactor ,用于进行 SocketChannel 的数据读写。对于 EventLoopGroup ,如果未传递方法参数nThreads
,表示使用 CPU 个数 Reactor 。这个也符合我们上面提到的,“通常,subReactor 的个数和 CPU 个数相等,每个 subReactor 独占一个线程来处理”。
- 因为使用两个 EventLoopGroup ,所以符合【多 Reactor 多线程模型】的多 Reactor 的要求。实际在使用时,
workerGroup
在读完数据时,具体的业务逻辑处理,我们会提交到专门的业务逻辑线程池,例如在 Dubbo 或 Motan 这两个 RPC 框架中。这样一来,就完全符合【多 Reactor 多线程模型】。 - 那么可能有胖友可能和我有一样的疑问,
bossGroup
如果配置多个线程,是否可以使用多个 mainReactor 呢?我们来分析一波,一个 Netty NIO 服务端同一时间,只能 bind 一个端口,那么只能使用一个 Selector 处理客户端连接事件。又因为,Selector 操作是非线程安全的,所以无法在多个 EventLoop ( 多个线程 )中,同时操作。所以这样就导致,即使bossGroup
配置多个线程,实际能够使用的也就是一个线程。 - 那么如果一定要多个 mainReactor 呢?创建多个 Netty NIO 服务端,并绑定多个端口。
客户端-EventLoop
// 创建一个 EventLoopGroup 对象
EventLoopGroup group = new NioEventLoopGroup();
// 创建 Bootstrap 对象
Bootstrap b = new Bootstrap();
// 设置使用的 EventLoopGroup
b.group(group);
- 对于 Netty NIO 客户端来说,仅创建一个 EventLoopGroup 。
- 一个 EventLoop 可以对应一个 Reactor 。因为 EventLoopGroup 是 EventLoop 的分组,所以对等理解,EventLoopGroup 是一种 Reactor 的分组。
- 一个 Bootstrap 的启动,只能发起对一个远程的地址。所以只会使用一个 NIO Selector ,也就是说仅使用一个 Reactor 。即使,我们在声明使用一个 EventLoopGroup ,该 EventLoopGroup 也只会分配一个 EventLoop 对 IO 事件进行处理。
- 因为 Reactor 模型主要使用服务端的开发中,如果套用在 Netty NIO 客户端中,到底使用了哪一种模式呢?
- 如果只有一个业务线程使用 Netty NIO 客户端,那么可以认为是【单 Reactor 单线程模型】。
- 如果有多个业务线程使用 Netty NIO 客户端,那么可以认为是【单 Reactor 多线程模型】。
- 那么 Netty NIO 客户端是否能够使用【多 Reactor 多线程模型】呢?创建多个 Netty NIO 客户端,连接同一个服务端。那么多个 Netty 客户端就可以认为符合多 Reactor 多线程模型了。
- 一般情况下,我们不会这么干。
- 当然,实际也有这样的示例。例如 Dubbo 或 Motan 这两个 RPC 框架,支持通过配置,同一个 Consumer 对同一个 Provider 实例同时建立多个客户端连接。
EventLoopGroup
EventLoopGroup 是一个 EventLoop 的分组,它可以获取到一个或者多个 EventLoop 对象,因此它提供了迭代出 EventLoop 对象的方法。
EventExecutorGroup
io.netty.util.concurrent.EventExecutorGroup
,实现 Iterable、ScheduledExecutorService 接口,EventExecutor ( 事件执行器 )的分组接口。代码如下:
// ========== 自定义接口 ==========
boolean isShuttingDown();
// 优雅关闭
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
Future<?> terminationFuture();
// 选择一个 EventExecutor 对象
EventExecutor next();
// ========== 实现自 Iterable 接口 ==========
@Override
Iterator<EventExecutor> iterator();
// ========== 实现自 ExecutorService 接口 ==========
@Override
Future<?> submit(Runnable task);
@Override
<T> Future<T> submit(Runnable task, T result);
@Override
<T> Future<T> submit(Callable<T> task);
@Override
@Deprecated
void shutdown();
@Override
@Deprecated
List<Runnable> shutdownNow();
// ========== 实现自 ScheduledExecutorService 接口 ==========
@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
- 接口方法返回类型为 Future 不是 Java 原生的
java.util.concurrent.Future
,而是 Netty 自己实现的 Future 接口。 - EventExecutorGroup 自身不执行任务,而是将任务
#submit(...)
或#schedule(...)
给自己管理的 EventExecutor 的分组。至于提交给哪一个 EventExecutor ,一般是通过#next()
方法,选择一个 EventExecutor 。
AbstractEventExecutorGroup
io.netty.util.concurrent.AbstractEventExecutorGroup
,实现 EventExecutorGroup 接口,EventExecutor ( 事件执行器 )的分组抽象类。
submit
#submit(...)
方法,提交一个普通任务到 EventExecutor 中。代码如下:
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return next().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return next().submit(task);
}
- 提交的 EventExecutor ,通过
#next()
方法选择。
schedule
#schedule(...)
方法,提交一个定时任务到 EventExecutor 中。代码如下:
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return next().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return next().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return next().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
- 提交的 EventExecutor ,通过
#next()
方法选择。
execute
#execute(...)
方法,在 EventExecutor 中执行一个普通任务。代码如下:
@Override
public void execute(Runnable command) {
next().execute(command);
}
- 执行的 EventExecutor ,通过
#next()
方法选择。 - 看起来
#execute(...)
和#submit(...)
方法有几分相似,具体的差异,由 EventExecutor 的实现决定。
invokeAll
#invokeAll(...)
方法,在 EventExecutor 中执行多个普通任务。代码如下:
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return next().invokeAll(tasks);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return next().invokeAll(tasks, timeout, unit);
}
- 执行的 EventExecutor ,通过
#next()
方法选择。并且,多个任务使用同一个 EventExecutor
invokeAny
#invokeAll(...)
方法,在 EventExecutor 中执行多个普通任务,有一个执行完成即可。代码如下:
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return next().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return next().invokeAny(tasks, timeout, unit);
}
- 执行的 EventExecutor ,通过
#next()
方法选择。并且,多个任务使用同一个 EventExecutor
shutdown
#shutdown(...)
方法,关闭 EventExecutorGroup 。代码如下:
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD /* 2 */, DEFAULT_SHUTDOWN_TIMEOUT /* 15 */, TimeUnit.SECONDS);
}
@Override
@Deprecated
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
@Override
@Deprecated
public abstract void shutdown();
- 具体的
#shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
和#shutdown()
方法,由子类实现。
MultithreadEventExecutorGroup
io.netty.util.concurrent.MultithreadEventExecutorGroup
,继承 AbstractEventExecutorGroup 抽象类,基于多线程的 EventExecutor ( 事件执行器 )的分组抽象类