多线程模型

多线程模型

多线程模型与单线程模型最大的区别是有专门的一组 NIO 线程处理 IO 操作,一般使用线程池的方式实现。一个 NIO 线程同时处理多条连接,一个连接只能属于 1 个 NIO 线程,这样能够防止并发操作问题。

Reactor 模式的多线程版本

多线程模型中有一个专门的线程负责监听和处理所有的客户端连接,网络 IO 操作由一个 NIO 线程池负责,它包含一个任务队列和 N 个可用的线程,由这些 NIO 线程负责消息的读取、解码、编码和发送。1 个 NIO 线程可以同时处理 N 条链路,但是 1 个链路只对应 1 个 NIO 线程,防止发生并发操作问题。在绝大多数场景下,Reactor 多线程模型都可以满足性能需求。但是在更高并发连接的场景(如百万客户端并发连接),它的性能似乎不能满足需求,于是便出现了下面的多 Reactor(主从 Reactor)模型。

代码案例

public class Acceptor implements Runnable {
    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }

    @Override
    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("有客户端连接上来了," + socketChannel.getRemoteAddress());
            socketChannel.configureBlocking(false);
            SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
            // 修改此处就可以切换成多线程模型了
            selectionKey.attach(new WorkerHandlerThreadPool(socketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

public class WorkerHandlerThreadPool implements Runnable {

    static ExecutorService pool = Executors.newFixedThreadPool(2);

    private SocketChannel socketChannel;

    public WorkerHandlerThreadPool(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            System.out.println("WorkerHandlerThreadPool thread :" + Thread.currentThread().getName());
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            socketChannel.read(buffer);
            pool.execute(new Process(socketChannel, buffer));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

public class Process implements Runnable {

    private SocketChannel socketChannel;

    private ByteBuffer byteBuffer;

    public Process(SocketChannel socketChannel, ByteBuffer byteBuffer) {
        this.byteBuffer = byteBuffer;
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            System.out.println("sleep 10s");
            Thread.sleep(10000);
            System.out.println("process thread:" + Thread.currentThread().getName());
            String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
            System.out.println(socketChannel.getRemoteAddress() + "发来的消息是:" + message);
            socketChannel.write(ByteBuffer.wrap("你的消息我收到了".getBytes(StandardCharsets.UTF_8)));
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

/**
    有客户端连接上来了,/127.0.0.1:64797
    有客户端连接上来了,/127.0.0.1:64800
    WorkerHandlerThreadPool thread :main
    sleep 10s
    process thread:pool-1-thread-1
    /127.0.0.1:64797发来的消息是:I am one
    WorkerHandlerThreadPool thread :main
    sleep 10s
    WorkerHandlerThreadPool thread :main
    sleep 10s
    process thread:pool-1-thread-2
    /127.0.0.1:64800发来的消息是:I am two
    process thread:pool-1-thread-1
    /127.0.0.1:64797发来的消息是:hah
*/

单 Reactor 多线程模型看起来是很不错了,但是还是有缺点:一个 Reactor 还是既然负责连接请求,又要负责读写请求,连接请求是很快的,而且一个客户端一般只要连接一次就可以了,但是会发生很多次写请求,如果可以有多个 Reactor,其中一个 Reactor 负责处理连接事件,多个 Reactor 负责处理客户端的写事件就好了,这样更符合单一职责,所以主从 Reactor 模型诞生了。

上一页
下一页