单线程模型

单线程模型

所有的 IO 操作都在同一个 NIO 线程上面完成,NIO 线程需要处理客户端的 TCP 连接,同时读取客户端 Socket 的请求或者应答消息以及向客户端发送请求或者应答消息。如下图:

基础版本

单线程模型

所有的 I/O 操作都在同一个 NIO 线程上面完成。可以看到有多个客户端连接到 Reactor,Reactor 内部有一个 dispatch(分发器)。有连接请求后,Reactor 会通过 dispatch 把请求交给 Acceptor 进行处理,有 IO 读写事件之后,又会通过 dispatch 交给具体的 Handler 进行处理。此时一个 Reactor 既然负责处理连接请求,又要负责处理读写请求,一般来说处理连接请求是很快的,但是处理具体的读写请求就要涉及到业务逻辑处理了,相对慢太多了。Reactor 正在处理读写请求的时候,其他请求只能等着,只有等处理完了,才可以处理下一个请求。

该模型的处理时序图如下:

Reactor 单线程模型时序图

单线程模型只适用并发量比较小的应用场景。当一个 NIO 线程同时处理上万个连接时,处理速度会变慢,会导致大量的客户端连接超时,超时之后如果进行重发,更加会加重了 NIO 线程的负载,最终会有大量的消息积压和处理超时,NIO 线程会成为系统的瓶颈。对于一些小容量应用场景,可以使用单线程模型,但是对于高负载、大并发的应用却不合适。实际当中基本不会采用单线程模型。

代码案例

public class Main {

    public static void main(String[] args) {
        Reactor reactor = new Reactor(9090);
        reactor.run();
    }
}

public class Reactor implements Runnable {

    ServerSocketChannel serverSocketChannel;
    Selector selector;

    // 在构造方法中,注册了连接事件,并且在 selectionKey 对象附加了一个 Acceptor 对象,这是用来处理连接请求的类。
    public Reactor(int port) {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);
            // 注册了连接事件
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            // 并且在 selectionKey 对象附加了一个 Acceptor 对象,这是用来处理连接请求的类
            // 发生了连接事件后,Reactor 类的 dispatcher 方法拿到了 Acceptor 附加对象,调用了 Acceptor 的 run 方法
            selectionKey.attach(new Acceptor(selector, serverSocketChannel));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // Reactor类实现了Runnable接口,并且实现了run方法,在run方法中,监听各种事件,有了事件后,调用dispatcher方法
    @Override
    public void run() {
        while (true) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    dispatcher(selectionKey);
                    iterator.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    // 在 dispatcher 方法中,拿到了 selectionKey 附加的对象,随后调用 run 方法,注意此时是调用 run 方法,并没有开启线程,只是一个普通的调用而已。
    private void dispatcher(SelectionKey selectionKey) {
        Runnable runnable = (Runnable) selectionKey.attachment();
        runnable.run();
    }
}

public class Acceptor implements Runnable {
    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }

    // 在 Acceptor 的 run 方法中又注册了读事件,然后在 selectionKey 附加了一个 WorkHandler 对象;
    // Acceptor 的 run 方法执行完毕后,就会继续回到 Reactor 类中的 run 方法,负责监听事件。
    @Override
    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("有客户端连接上来了," + socketChannel.getRemoteAddress());
            socketChannel.configureBlocking(false);
            SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
            // 当客户端写事件发生后,Reactor 又会调用 dispatcher 方法,此时拿到的附加对象是WorkHandler,所以又跑到了 WorkHandler 中的run方法。
            selectionKey.attach(new WorkerHandlerThreadPool(socketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

// WorkHandler就是真正负责处理客户端写事件的了
public class WorkHandler implements Runnable {
    private SocketChannel socketChannel;

    public WorkHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
            System.out.println(socketChannel.getRemoteAddress() + "发来的消息是:" + message);
            //System.out.println("sleep 10s");
            //Thread.sleep(10000);

            socketChannel.write(ByteBuffer.wrap("你的消息我收到了".getBytes(StandardCharsets.UTF_8)));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

对应的测试用客户端代码如下:

public class Client {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket();
            socket.connect(new InetSocketAddress("localhost", 9090));
            new Thread(() -> {
                while (true) {
                    try {
                        InputStream inputStream = socket.getInputStream();
                        byte[] bytes = new byte[1024];
                        inputStream.read(bytes);
                        System.out.println(new String(bytes, StandardCharsets.UTF_8));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();

            while (true) {
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String s = scanner.nextLine();
                    socket.getOutputStream().write(s.getBytes());
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

/*
    有客户端连接上来了,/127.0.0.1:64571
    有客户端连接上来了,/127.0.0.1:64577
    /127.0.0.1:64571发来的消息是:123
    /127.0.0.1:64577发来的消息是:456
    /127.0.0.1:64571发来的消息是:我是第一个人
    /127.0.0.1:64577发来的消息是:我是第二个人
*/
上一页
下一页