Reactor模型的Java NIO实现

实现Reactor模型可分为以下三种:

  • 单线程模型
  • 单Reactor多线程模型
  • 主从Reactor多线程模型。

单线程模型

Reactor单线程模型,指的是所有的IO操作都在同一个线程上面完成,线程的职责如下:

  • 作为NIO服务端,接收客户端的TCP连接;

  • 作为NIO客户端,向服务端发起TCP连接;

  • 读取通信对端的请求或者应答消息;

  • 向通信对端发送消息请求或者应答消息。

由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处理所有IO相关的操作。从架构层面看,一个NIO线程确实可以完成其承担的职责。例如,通过Acceptor接收客户端的TCP连接请求消息,链路建立成功之后,通过Dispatch将对应的ByteBuffer派发到指定的Handler上进行消息解码。用户线程可以通过消息编码通过NIO线程将消息发送给客户端。

Server端
public class Reactor1 {  
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(1234));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (selector.select() > 0) {
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = acceptServerSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    System.out.println("accept from "+socketChannel.socket().getInetAddress().toString());
                  //  LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable() && key.isValid()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int count = socketChannel.read(buffer);
                    if (count <= 0) {
                        socketChannel.close();
                        key.cancel();
                        System.out.println("Received invalide data, close the connection");
                        //LOGGER.info("Received invalide data, close the connection");
                        continue;
                    }
                    System.out.println("Received message"+new String(buffer.array()));
                    //LOGGER.info("Received message {}", new String(buffer.array()));
                }
                keys.remove(key);
            }
        }
    }
}
Client
public class Client1 {

    public static void main(String[] args) throws IOException, InterruptedException {
        SocketChannel socketChannel;
        socketChannel = SocketChannel.open();
        //socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("localhost", 1234));
        Date now = new Date();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");//可以方便地修改日期格式
        String str = dateFormat.format( now );
        byte[] requst = str.getBytes();
        ByteBuffer buffer = ByteBuffer.allocate(requst.length);
        buffer.put(requst);
        buffer.flip();
        try {
            while (buffer.hasRemaining()) {
                socketChannel.write(buffer);
            }
        }catch (IOException e) {
            e.printStackTrace();
        }
        socketChannel.close();
    }
}

对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发的应用场景却不合适,主要原因如下:

一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送; 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈; 可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。 为了解决这些问题,演进出了Reactor多线程模型。

单Reactor多线程模型

经典Reactor模式中,尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理。当获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。 http://static.cyblogs.com/20180720174300266.png

Reactor多线程模型的特点:

  1. 有专门一个NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求;

  2. 网络IO操作-读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;

  3. 1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。

在绝大多数场景下,Reactor多线程模型都可以满足性能需求;

服务端的实现
public class Reactor2 {  
    private static ExecutorService pool = Executors.newFixedThreadPool(100);
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(1234));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while(true) {
            if(selector.selectNow() < 0){
                continue;
            }
            Set<SelectionKey> sets = selector.selectedKeys();
            Iterator<SelectionKey> keys = sets.iterator();
            while(keys.hasNext()) {
                SelectionKey key = keys.next();
                keys.remove();
                if(key.isAcceptable()) {
                    ServerSocketChannel Serverchannel = (ServerSocketChannel) key.channel();
                    SocketChannel channel = Serverchannel.accept();
                    channel.configureBlocking(false);
                    System.out.println("accept from "+channel.socket().getInetAddress().toString());
                    channel.register(selector, SelectionKey.OP_READ);
                }else if(key.isValid()&&key.isReadable()) {
                    pool.submit(new Processor(key));
                }
            }
        }
    }
}
class Processor implements Callable {  
    SelectionKey key;

    public Processor(SelectionKey key) {
        this.key = key;
    }

    @Override
    public Object call() throws Exception {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        SocketChannel socketChannel = (SocketChannel) key.channel();
        int count = socketChannel.read(buffer);
        if (count <  0) {
            key.cancel();
            socketChannel.close();

            System.out.println("Received invalide data, close the connection");
            return null;
        }else if(count==0) {
            return null;
        }
            System.out.println("Received message"+new String(buffer.array()));
            System.out.println("current thread"+Thread.currentThread().toString());
        return null;
    }
}

在极个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。

多个Reactor模式(主从Reactor)

Netty中使用的Reactor模式,引入了多Reactor,也即一个主Reactor负责监控所有的连接请求,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟。并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。

多个Reactor模式架构图 http://static.cyblogs.com/2018072113140256.png

public class MainReactor {  
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(1234));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        int coreNum = Runtime.getRuntime().availableProcessors();
        FollowerReactor[] followers = new FollowerReactor[coreNum];
        for(int i=0; i<coreNum; i++) {
            followers[i] = new FollowerReactor();
        }

        int index = 0;
        while(selector.select()>0) {
            Set<SelectionKey> keys = selector.selectedKeys();
            for(SelectionKey key:keys) {
                keys.remove(key);
                if(key.isValid()&&key.isAcceptable()) {
                    ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = serverSocketChannel1.accept();
                    socketChannel.configureBlocking(false);
                    System.out.println("Accept request:" + socketChannel.socket().getInetAddress());
                    FollowerReactor follower = followers[++index%coreNum];
                    follower.register(socketChannel);
                    //follower.wakeUp();
                }
            }
        }

    }
}

上面的代码是主Reactor,子Reactor根据前机器可用核数的两倍(与Netty默认的子Reactor个数一致)。对于每个成功连接的SocketChannel,通过round robin的方式交给不同的子Reactor。子Reactor的代码如下:

public class FollowerReactor {  
    private Selector selector;
    private static  ExecutorService service =Executors.newFixedThreadPool(
            2*Runtime.getRuntime().availableProcessors());
    public void register(SocketChannel socketChannel) throws ClosedChannelException {
        socketChannel.register(selector, SelectionKey.OP_READ);
    }
    public void wakeUp() {
    }
    public FollowerReactor() throws IOException {
        selector = Selector.open();
        select();
    }
    public void wakeup() {
        this.selector.wakeup();
    }
    public void select() {
        service.submit(() -> {
            while(true) {
                if(selector.select(500)<=0) {
                    continue;
                }
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while(iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if(key.isReadable()) {
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        SocketChannel channel = (SocketChannel) key.channel();
                        int count = channel.read(buffer);
                        if(count<0) {
                            channel.close();
                            key.cancel();
                            System.out.println(channel+"->red end !");
                            continue;
                        }else if(count==0) {
                            System.out.println(channel+",size is 0 !");
                            continue;
                        }else{
                            System.out.println(channel+",message is :"+new String(buffer.array()));

                        }
                    }
                }
            }
        });

    }
}

在子Reactor中创建了一个静态的线程池,且线程池的大小为机器核数的两倍,每个字Reactor包换一个Selector实例,同事每次创建一个子Reactor都提交一个任务到线程池,阻塞到selector方法,直到新的channel注册到该Selector上,才继续执行。

参考地址

如果大家喜欢我的文章,可以关注个人订阅号。欢迎随时留言、交流。如果想加入微信群的话一起讨论的话,请加管理员简栈文化-小助手(lastpass4u),他会拉你们进群。

简栈文化服务订阅号