NIO 中多缓冲区实战

x33g5p2x  于2022-05-23 转载在 其他  
字(4.4k)|赞(0)|评价(0)|浏览(195)

一 点睛

在 scatter-and-gather 场景下,可以将数据写入多个 Buffer 中。在 NIO 中,也能够同时操作多个缓冲区。在很多 Channel 实现类中,都提供了多个重载的 read() 和 write() 方法,下表介绍了 SocketChannel 的 read() 和 write() 方法。

1 read 的重载方法

| <br>read() 的重载方法<br> | <br>简介<br> |
| <br>int read(ByteBuffer dst)<br> | <br>将 Channel 读取到的数据存入一个 ByteBuffer 中<br> |
| <br>long read(ByteBuffer[] dsts,int offset,int length)                                  <br><br>long read(ByteBuffer[] dsts)<br> | <br>将 Channel 读取到的数据存入一个 ByteBuffer 数组中      <br> |

2 write 的重载方法

| <br>write() 的重载方法<br> | <br>简介<br> |
| <br>int write(ByteBuffer srcs)<br> | <br>将 ByteBuffer 中的数据写入 Channel 中<br> |
| <br>long write(ByteBuffer[] srcs,int offset,int length)<br><br>long write(ByteBuffer[] srcs)<br> | <br>将 ByteBuffer 数组中的所有数据写入 Channel 中<br> |

二 实战

1 服务端

package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class NIOServerWith2Buffers {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8888)) ;

        ByteBuffer[] buffers = new ByteBuffer[2] ;
        buffers[0]  = ByteBuffer.allocate(4) ;
        buffers[1]  = ByteBuffer.allocate(8) ;

        int bufferSum = 4 + 8 ;
        SocketChannel socketChannel =   serverSocketChannel.accept();

        while(true){
            /*
                 读取客户端的消息:
                 eachReadbytes:每次读取到的字节数
                 totalReadBytes:当前时刻,一共读取的字节数
                 如果 totalReadBytes 小于 "buffers能够容纳的最大字节数",则循环累加读取;否则,清空buffers,重新读取
             */
            int totalReadBytes = 0 ;
            while(totalReadBytes < bufferSum){
                long eachReadbytes = socketChannel.read(buffers);
                totalReadBytes += eachReadbytes ;
                System.out.println("读取到的数据大小:" + eachReadbytes);
            }
            // 如果buffers已满
            for(ByteBuffer buffer : buffers) {
                buffer.flip() ;
            }
        }
    }
}

2 客户端

package nio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @className: ChatClient
 * @description: 客户端
 * @date: 2022/5/23
 * @author: cakin
 */
public class ChatClient {
    public static void main(String[] args) {
        try {
            SocketChannel socketChannel = SocketChannel.open();
            // 切换到非阻塞模式
            socketChannel.configureBlocking(false);
            Selector selector = Selector.open();
            // 在客户端的选择器上,注册一个通道,并标识该通道所感兴趣的事件是:向服务端发送连接(连接就绪)。对应于服务端的OP_ACCEPT事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
            while (true) {
                selector.select();
                // selectionKeys 包含了所有通道与选择器之间的关系(请求连接、读、写)
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey selectedKey = keyIterator.next();
                    // 判断是否连接成功
                    if (selectedKey.isConnectable()) {
                        ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
                        // 创建一个用于和服务端交互的 Channel
                        SocketChannel client = (SocketChannel) selectedKey.channel();
                        // 如果状态是:正在连接中...
                        if (client.isConnectionPending()) {
                            boolean isConnected = client.finishConnect();
                            if (isConnected) {
                                System.out.println("连接成功!访问的端口是:" + port);
                                // 向服务端发送一条测试消息
                                sendBuffer.put("connecting".getBytes());
                                sendBuffer.flip();
                                client.write(sendBuffer);
                            }

                            // 在“聊天室”中,对于客户端而言,可以随时向服务端发送消息(写操作),因此,需要建立一个单独写线程
                            new Thread(() -> {
                                while (true) {
                                    try {
                                        sendBuffer.clear();
                                        // 接收用户从控制台输入的内容,并发送给服务端
                                        InputStreamReader reader = new InputStreamReader(System.in);
                                        BufferedReader bReader = new BufferedReader(reader);
                                        String message = bReader.readLine();

                                        sendBuffer.put(message.getBytes());
                                        sendBuffer.flip();
                                        client.write(sendBuffer);
                                    } catch (Exception e) {
                                        e.printStackTrace();
                                    }
                                }
                            }).start();
                        }
                        // 标记通道感兴趣的事件是:读取服务端消息(读就绪)
                        client.register(selector, SelectionKey.OP_READ);
                        // 客户端读取服务端的反馈消息
                    } else if (selectedKey.isReadable()) {
                        SocketChannel client = (SocketChannel) selectedKey.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        // 将服务端的反馈消息放入 readBuffer中
                        int len = client.read(readBuffer);
                        if (len > 0) {
                            String receive = new String(readBuffer.array(), 0, len);
                            System.out.println(receive);
                        }
                    }
                }
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

三 测试

1 启动服务端,然后启动客户端,会发送测试消息"connecting",共 10 个字节。

2 客户端向服务端发送一个"helloserver",一共有 11 个字节。

3 客户端向服务端发送"byebye",一共有 6 个字节。

4 整个过程中,客户端的打印

连接成功!访问的端口是:8888

helloserver

byebye

5 整个过程中,服务端的打印如下

读取到的数据大小:10

读取到的数据大小:2

读取到的数据大小:9

读取到的数据大小:3

读取到的数据大小:3

相关文章