单线程模型
单线程模型
所有的


所有的
该模型的处理时序图如下

单线程模型只适用并发量比较小的应用场景。当一个
代码案例
public class Main {
public static void main(String[] args) {
Reactor reactor = new Reactor(9090);
reactor.run();
}
}
public class Reactor implements Runnable {
ServerSocketChannel serverSocketChannel;
Selector selector;
// 在构造方法中,注册了连接事件,并且在 selectionKey 对象附加了一个 Acceptor 对象,这是用来处理连接请求的类。
public Reactor(int port) {
try {
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
// 注册了连接事件
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 并且在 selectionKey 对象附加了一个 Acceptor 对象,这是用来处理连接请求的类
// 发生了连接事件后,Reactor 类的 dispatcher 方法拿到了 Acceptor 附加对象,调用了 Acceptor 的 run 方法
selectionKey.attach(new Acceptor(selector, serverSocketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
// Reactor类实现了Runnable接口,并且实现了run方法,在run方法中,监听各种事件,有了事件后,调用dispatcher方法
@Override
public void run() {
while (true) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatcher(selectionKey);
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 在 dispatcher 方法中,拿到了 selectionKey 附加的对象,随后调用 run 方法,注意此时是调用 run 方法,并没有开启线程,只是一个普通的调用而已。
private void dispatcher(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
}
public class Acceptor implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
// 在 Acceptor 的 run 方法中又注册了读事件,然后在 selectionKey 附加了一个 WorkHandler 对象;
// Acceptor 的 run 方法执行完毕后,就会继续回到 Reactor 类中的 run 方法,负责监听事件。
@Override
public void run() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("有客户端连接上来了," + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
// 当客户端写事件发生后,Reactor 又会调用 dispatcher 方法,此时拿到的附加对象是WorkHandler,所以又跑到了 WorkHandler 中的run方法。
selectionKey.attach(new WorkerHandlerThreadPool(socketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
// WorkHandler就是真正负责处理客户端写事件的了
public class WorkHandler implements Runnable {
private SocketChannel socketChannel;
public WorkHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
System.out.println(socketChannel.getRemoteAddress() + "发来的消息是:" + message);
//System.out.println("sleep 10s");
//Thread.sleep(10000);
socketChannel.write(ByteBuffer.wrap("你的消息我收到了".getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
e.printStackTrace();
}
}
}
对应的测试用客户端代码如下:
public class Client {
public static void main(String[] args) {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost", 9090));
new Thread(() -> {
while (true) {
try {
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
inputStream.read(bytes);
System.out.println(new String(bytes, StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
while (true) {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
socket.getOutputStream().write(s.getBytes());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*
有客户端连接上来了,/127.0.0.1:64571
有客户端连接上来了,/127.0.0.1:64577
/127.0.0.1:64571发来的消息是:123
/127.0.0.1:64577发来的消息是:456
/127.0.0.1:64571发来的消息是:我是第一个人
/127.0.0.1:64577发来的消息是:我是第二个人
*/