通过选择器 Selector 开发高性能聊天室

x33g5p2x  于2022-05-23 转载在 其他  
字(7.2k)|赞(0)|评价(0)|浏览(204)

一 点睛

选择器(Selector)的核心作用:可以在一个选择器上注册多个通道,并且可以通过选择器切换使用这些通道。

如果不使用选择器,服务端的 I/O 代码,会给每个客户端创建一个新线程,也就是用 N 个线程去处理 N 个客户端请求。因此,如果有 1 万个客户请求,就会创建 1 万个线程,这显然是不合理地。而 NIO 处理这种问题的思路是,用一个线程处理全部请求,并通过 Selector 切换处理不同的请求通道。

二 实战

1 需求

使用 NIO 实现一个聊天室功能。要求如下:服务端启动后可以接收多个客户端连接,每个客户端都可以向服务端发送消息;服务端接受到消息后,会在控制台打印此客户端的信息,并且将此消息发给全部的客户端。

2 服务端

服务端只创建一个处理请求的线程

package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/**
 * @className: ChatServer
 * @description: 服务器
 * @date: 2022/5/23
 * @author: cakin
 */
public class ChatServer {
    /*
        clientsMap:保存所有的客户端
        key:客户端的名字
        value:客户端连接服务端的Channel
     */
    private static Map<String, SocketChannel> clientsMap = new HashMap();

    public static void main(String[] args) throws IOException {
        int[] ports = new int[]{7777, 8888, 9999};
        Selector selector = Selector.open();

        for (int port : ports) {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            ServerSocket serverSocket = serverSocketChannel.socket();
            // 将聊天服务绑定到 7777、8888和9999 三个端口上
            serverSocket.bind(new InetSocketAddress(port));
            System.out.println("服务端启动成功,端口" + port);
            // 在服务端的选择器上,注册一个通道,并标识该通道所感兴趣的事件是:接收客户端连接(接收就绪)
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        }

        while (true) {
            // 一直阻塞,直到选择器上存在已经就绪的通道(包含感兴趣的事件)
            selector.select();
            // selectionKeys包含了所有通道与选择器之间的关系(接收连接、读、写)
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            // 如果selector中有多个就绪通道(接收就绪、读就绪、写就绪等),则遍历这些通道
            while (keyIterator.hasNext()) {
                SelectionKey selectedKey = keyIterator.next();
                String receive = null;
                // 与客户端交互的通道
                SocketChannel clientChannel;
                try {
                    // 接收就绪(已经可以接收客户端的连接了)
                    if (selectedKey.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) selectedKey.channel();
                        clientChannel = server.accept();
                        // 切换到非阻塞模式
                        clientChannel.configureBlocking(false);
                        // 再在服务端的选择器上,注册第二个通道,并标识该通道所感兴趣的事件是:接收客户端发来的消息(读就绪)
                        clientChannel.register(selector, SelectionKey.OP_READ);
                        // 用“key四位随机数”的形式模拟客户端的key值
                        String key = "key" + (int) (Math.random() * 9000 + 1000);
                        // 将该建立完毕连接的 通道 保存到clientsMap中
                        clientsMap.put(key, clientChannel);
                        // 读就绪(已经可以读取客户端发来的信息了)
                    } else if (selectedKey.isReadable()) {
                        clientChannel = (SocketChannel) selectedKey.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        int result = -1;
                        try {
                            // 将服务端读取到的客户端消息,放入 readBuffer 中
                            result = clientChannel.read(readBuffer);
                            // 如果终止客户端,则 read() 会抛出 IOException 异常,可以依次判断是否有客户端退出。
                        } catch (IOException e) {
                            // 获取退出连接的 client 对应的 key
                            String clientKey = getClientKey(clientChannel);
                            System.out.println("客户端" + clientKey + "退出聊天室");
                            clientsMap.remove(clientKey);
                            clientChannel.close();
                            selectedKey.cancel();
                            continue;
                        }
                        if (result > 0) {
                            readBuffer.flip();
                            Charset charset = Charset.forName("utf-8");
                            receive = String.valueOf(charset.decode(readBuffer).array());
                            // 将读取到的客户端消息,打印在服务端的控制台
                            System.out.println(clientChannel + ":" + receive);
                            // 处理客户端第一次发来的连接测试信息
                            if ("connecting".equals(receive)) {
                                receive = "新客户端加入聊天!";
                            }
                            // 将读取到的客户消息保存在 attachment 中,用于后续向所有客户端转发此消息
                            selectedKey.attach(receive);
                            // 将通道所感兴趣的事件标识为:向客户端发送消息(写就绪)
                            selectedKey.interestOps(SelectionKey.OP_WRITE);
                        }
                        // 写就绪
                    } else if (selectedKey.isWritable()) {
                        clientChannel = (SocketChannel) selectedKey.channel();
                        // 获取发送消息从client对应的key
                        String sendKey = getClientKey(clientChannel);
                        // 将接收到的消息,拼接成“发送消息的客户端Key:消息”的形式,再广播给所有client
                        for (Map.Entry<String, SocketChannel> entry : clientsMap.entrySet()) {
                            SocketChannel eachClient = entry.getValue();
                            ByteBuffer broadcastMsg = ByteBuffer.allocate(1024);
                            broadcastMsg.put((sendKey + ":" + selectedKey.attachment()).getBytes());
                            broadcastMsg.flip();
                            eachClient.write(broadcastMsg);
                        }
                        selectedKey.interestOps(SelectionKey.OP_READ);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            selectionKeys.clear();
        }
    }

    public static String getClientKey(SocketChannel clientChannel) {
        String sendKey = null;
        //很多client在发下消息,通过for找到是哪个client在发消息,找到该client的key
        for (Map.Entry<String, SocketChannel> entry : clientsMap.entrySet()) {
            if (clientChannel == entry.getValue()) {
                //找到发送消息的client所对应的key
                sendKey = entry.getKey();
                break;
            }
        }
        return sendKey;
    }
}

3 客户端

package nio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @className: ChatClient
 * @description: 客户端
 * @date: 2022/5/23
 * @author: cakin
 */
public class ChatClient {
    public static void main(String[] args) {
        try {
            SocketChannel socketChannel = SocketChannel.open();
            // 切换到非阻塞模式
            socketChannel.configureBlocking(false);
            Selector selector = Selector.open();
            // 在客户端的选择器上,注册一个通道,并标识该通道所感兴趣的事件是:向服务端发送连接(连接就绪)。对应于服务端的OP_ACCEPT事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            // 随机连接到服务端提供的一个端口上
            int[] ports = {7777, 8888, 9999};
            int port = ports[(int) (Math.random() * 3)];
            socketChannel.connect(new InetSocketAddress("127.0.0.1", port));
            while (true) {
                selector.select();
                // selectionKeys 包含了所有通道与选择器之间的关系(请求连接、读、写)
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey selectedKey = keyIterator.next();
                    // 判断是否连接成功
                    if (selectedKey.isConnectable()) {
                        ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
                        // 创建一个用于和服务端交互的 Channel
                        SocketChannel client = (SocketChannel) selectedKey.channel();
                        // 如果状态是:正在连接中...
                        if (client.isConnectionPending()) {
                            boolean isConnected = client.finishConnect();
                            if (isConnected) {
                                System.out.println("连接成功!访问的端口是:" + port);
                                // 向服务端发送一条测试消息
                                sendBuffer.put("connecting".getBytes());
                                sendBuffer.flip();
                                client.write(sendBuffer);
                            }

                            // 在“聊天室”中,对于客户端而言,可以随时向服务端发送消息(写操作),因此,需要建立一个单独写线程
                            new Thread(() -> {
                                while (true) {
                                    try {
                                        sendBuffer.clear();
                                        // 接收用户从控制台输入的内容,并发送给服务端
                                        InputStreamReader reader = new InputStreamReader(System.in);
                                        BufferedReader bReader = new BufferedReader(reader);
                                        String message = bReader.readLine();

                                        sendBuffer.put(message.getBytes());
                                        sendBuffer.flip();
                                        client.write(sendBuffer);
                                    } catch (Exception e) {
                                        e.printStackTrace();
                                    }
                                }
                            }).start();
                        }
                        // 标记通道感兴趣的事件是:读取服务端消息(读就绪)
                        client.register(selector, SelectionKey.OP_READ);
                        // 客户端读取服务端的反馈消息
                    } else if (selectedKey.isReadable()) {
                        SocketChannel client = (SocketChannel) selectedKey.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        // 将服务端的反馈消息放入 readBuffer中
                        int len = client.read(readBuffer);
                        if (len > 0) {
                            String receive = new String(readBuffer.array(), 0, len);
                            System.out.println(receive);
                        }
                    }
                }
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

三 测试

依次启动服务端和2个客户端,并发送消息

1 服务端打印

服务端启动成功,端口7777

服务端启动成功,端口8888

服务端启动成功,端口9999

java.nio.channels.SocketChannel[connected local=/127.0.0.1:7777 remote=/127.0.0.1:52781]:connecting

java.nio.channels.SocketChannel[connected local=/127.0.0.1:7777 remote=/127.0.0.1:52892]:connecting

java.nio.channels.SocketChannel[connected local=/127.0.0.1:7777 remote=/127.0.0.1:52892]:你好    

java.nio.channels.SocketChannel[connected local=/127.0.0.1:7777 remote=/127.0.0.1:52781]:你来自哪里?            

2 第1个客户端打印

连接成功!访问的端口是:7777

key9664:新客户端加入聊天!

key4978:新客户端加入聊天!

key4978:你好    

你来自哪里?

key9664:你来自哪里?            

3 第2个客户端打印

连接成功!访问的端口是:7777

key4978:新客户端加入聊天!

你好

key4978:你好    

key9664:你来自哪里?            

相关文章