Reactor模型的Java NIO实现

实现Reactor模型可分为以下三种:

  • 单线程模型
  • 单Reactor多线程模型
  • 主从Reactor多线程模型。

单线程模型

Reactor单线程模型,指的是所有的IO操作都在同一个线程上面完成,线程的职责如下:

  • 作为NIO服务端,接收客户端的TCP连接;

  • 作为NIO客户端,向服务端发起TCP连接;

  • 读取通信对端的请求或者应答消息;

  • 向通信对端发送消息请求或者应答消息。

由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处理所有IO相关的操作。从架构层面看,一个NIO线程确实可以完成其承担的职责。例如,通过Acceptor接收客户端的TCP连接请求消息,链路建立成功之后,通过Dispatch将对应的ByteBuffer派发到指定的Handler上进行消息解码。用户线程可以通过消息编码通过NIO线程将消息发送给客户端。

Server端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Reactor1 {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("accept from "+socketChannel.socket().getInetAddress().toString());
// LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable() && key.isValid()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(buffer);
if (count <= 0) {
socketChannel.close();
key.cancel();
System.out.println("Received invalide data, close the connection");
//LOGGER.info("Received invalide data, close the connection");
continue;
}
System.out.println("Received message"+new String(buffer.array()));
//LOGGER.info("Received message {}", new String(buffer.array()));
}
keys.remove(key);
}
}
}
}
Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Client1 {

public static void main(String[] args) throws IOException, InterruptedException {
SocketChannel socketChannel;
socketChannel = SocketChannel.open();
//socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("localhost", 1234));
Date now = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");//可以方便地修改日期格式
String str = dateFormat.format( now );
byte[] requst = str.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(requst.length);
buffer.put(requst);
buffer.flip();
try {
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
}catch (IOException e) {
e.printStackTrace();
}
socketChannel.close();
}
}

对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发的应用场景却不合适,主要原因如下:

一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;
当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;
可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
为了解决这些问题,演进出了Reactor多线程模型。

单Reactor多线程模型

经典Reactor模式中,尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理。当获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。
http://static.cyblogs.com/20180720174300266.png

Reactor多线程模型的特点:

  1. 有专门一个NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求;

  2. 网络IO操作-读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;

  3. 1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。

在绝大多数场景下,Reactor多线程模型都可以满足性能需求;

服务端的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Reactor2 {
private static ExecutorService pool = Executors.newFixedThreadPool(100);
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
if(selector.selectNow() < 0){
continue;
}
Set<SelectionKey> sets = selector.selectedKeys();
Iterator<SelectionKey> keys = sets.iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if(key.isAcceptable()) {
ServerSocketChannel Serverchannel = (ServerSocketChannel) key.channel();
SocketChannel channel = Serverchannel.accept();
channel.configureBlocking(false);
System.out.println("accept from "+channel.socket().getInetAddress().toString());
channel.register(selector, SelectionKey.OP_READ);
}else if(key.isValid()&&key.isReadable()) {
pool.submit(new Processor(key));
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Processor implements Callable {
SelectionKey key;

public Processor(SelectionKey key) {
this.key = key;
}

@Override
public Object call() throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) key.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
key.cancel();
socketChannel.close();

System.out.println("Received invalide data, close the connection");
return null;
}else if(count==0) {
return null;
}
System.out.println("Received message"+new String(buffer.array()));
System.out.println("current thread"+Thread.currentThread().toString());
return null;
}
}

在极个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。

多个Reactor模式(主从Reactor)

Netty中使用的Reactor模式,引入了多Reactor,也即一个主Reactor负责监控所有的连接请求,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟。并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。

多个Reactor模式架构图
http://static.cyblogs.com/2018072113140256.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MainReactor {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

int coreNum = Runtime.getRuntime().availableProcessors();
FollowerReactor[] followers = new FollowerReactor[coreNum];
for(int i=0; i<coreNum; i++) {
followers[i] = new FollowerReactor();
}

int index = 0;
while(selector.select()>0) {
Set<SelectionKey> keys = selector.selectedKeys();
for(SelectionKey key:keys) {
keys.remove(key);
if(key.isValid()&&key.isAcceptable()) {
ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel1.accept();
socketChannel.configureBlocking(false);
System.out.println("Accept request:" + socketChannel.socket().getInetAddress());
FollowerReactor follower = followers[++index%coreNum];
follower.register(socketChannel);
//follower.wakeUp();
}
}
}

}
}

上面的代码是主Reactor,子Reactor根据前机器可用核数的两倍(与Netty默认的子Reactor个数一致)。对于每个成功连接的SocketChannel,通过round robin的方式交给不同的子Reactor。子Reactor的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class FollowerReactor {
private Selector selector;
private static ExecutorService service =Executors.newFixedThreadPool(
2*Runtime.getRuntime().availableProcessors());
public void register(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(selector, SelectionKey.OP_READ);
}
public void wakeUp() {
}
public FollowerReactor() throws IOException {
selector = Selector.open();
select();
}
public void wakeup() {
this.selector.wakeup();
}
public void select() {
service.submit(() -> {
while(true) {
if(selector.select(500)<=0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if(key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(buffer);
if(count<0) {
channel.close();
key.cancel();
System.out.println(channel+"->red end !");
continue;
}else if(count==0) {
System.out.println(channel+",size is 0 !");
continue;
}else{
System.out.println(channel+",message is :"+new String(buffer.array()));

}
}
}
}
});

}
}

在子Reactor中创建了一个静态的线程池,且线程池的大小为机器核数的两倍,每个字Reactor包换一个Selector实例,同事每次创建一个子Reactor都提交一个任务到线程池,阻塞到selector方法,直到新的channel注册到该Selector上,才继续执行。

参考地址

如果大家喜欢我的文章,可以关注个人订阅号。欢迎随时留言、交流。如果想加入微信群的话一起讨论的话,请加管理员简栈文化-小助手(lastpass4u),他会拉你们进群。

简栈文化服务订阅号