Java网络编程系列之基于BIO的多人聊天室设计与实现

x33g5p2x  于2021-12-07 转载在 Java  
字(8.4k)|赞(0)|评价(0)|浏览(391)

BIO模型

传统的BIO模型(同步阻塞IO模型)+线程池(多线程)模式:适合活动连接次数不是特别高。该模式是1:1,即每次连接每个线程。

处理步骤:客户端发送请求,接收器Acceptor每接收一个请求,就创建一个新线程,处理完成之后,再通过输出流返回到客户端,然后销毁线程。

缺陷:一个客户端请求,就对应一个线程,客户端的请求和服务端的线程就成为1:1的比例,当请求过多的时候,线程越来越多,最后导致JVM的内存被大量的占用,堆栈溢出,发生异常。

多人聊天室功能概述

多人聊天室设计UML建模之时序图

  • 1.服务器端: 通过ServerSocket对象,绑定端口,调用accept函数,等待客户端连接
  • 2.服务器端维护一个map集合,通过每个客户端的端口号,来唯一识别每一个客户端对象
  • 2.当有客户端连接成功后,通过ChatHandler创建一个新的线程用以处理当前客户端的连接
  • 4.ChatHandler负责将当前连接成功的客户端放入当前在线用户集合中,然后保持与当前客户端的线程连接,直到当前客户端主动退出连接
  • 5.客户端ChatClient通过服务器ip和端口与之建立连接,然后等待接收服务器发送过来的消息
  • 6.同时客户端创建一个单独的线程UserInputHandler,负责发送消息,当客户端这边输入quit指令的时候,表示客户端要退出连接

服务端代码实现

ChatServer

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ChatServer
{
    private  final int SERVER_PORT=8080;
    private final String QUIT="quit";

    private ServerSocket serverSocket;
    //key是当前客户端对应的端口号,value是服务器与当前客户端之前的关联输出字符流
    private Map<Integer, Writer> connectedClients;

    public ChatServer()
    {
        //初始化map集合
        connectedClients=new ConcurrentHashMap<>();
    }

    //新增客户端
    public synchronized void addClient(Socket socket) throws IOException {
        if(socket!=null)
        {
            //添加进集合
            connectedClients.put(socket.getPort(),new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())));
            System.out.println("当前客户端["+socket.getPort()+"]已成功连接到服务器");
        }
    }

    //移除客户端
    public synchronized void removeClient(Socket socket) throws IOException {
        if(socket!=null)
        {
            //从集合中移除,并关闭相关流
            if(connectedClients.containsKey(socket.getPort()))
            {
                connectedClients.get(socket.getPort()).close();
                connectedClients.remove(socket.getPort());
                System.out.println("客户端[" + socket.getPort() + "]已断开连接");
            }
        }
    }

    //群发消息给其他客户端
    public synchronized void forwardMessage(Socket socket,String msg) throws IOException {
        //遍历集合,挨个转发消息--但是不发给自己
// connectedClients.forEach(
// (port,client)->
// {
// if(!port.equals(socket.getPort()))
// {
// System.out.println("当前客户端["+port+"]");
// //这里最好还是把异常外抛出去,因为这里使用lambda写法,因此没法抛出去
// try
// {
// client.write(msg+"\n");
// client.flush();
// }
// catch (IOException e)
// {
// e.printStackTrace();
// }
// }
// }
// );
        for (Integer id : connectedClients.keySet()) {
            if (!id.equals(socket.getPort())) {
                Writer writer = connectedClients.get(id);
                //这个地方必须要加\n,否则readline读取不到换行符,会阻塞住
                writer.write(msg+"\n");
                writer.flush();
            }
        }
    }

    //客户端是否准备好退出连接
    public boolean clientReadyToQuit(String msg)
    {
          return QUIT.equals(msg);
    }

    //关闭服务器端口
    public synchronized void close() throws IOException {
            serverSocket.close();
            System.out.println("关闭socket服务器");
    }

    //启动服务器
    public void start()
    {
        try
        {
            //绑定服务器与对应的端口
            ServerSocket serverSocket = new ServerSocket(SERVER_PORT);
            System.out.println("服务器启动,对应的端口为: "+SERVER_PORT);
            //等待客户端连接
            while(true)
            {
                //如果没有客户端连接,这边就会阻塞住
                System.out.println("等待客户端连接...");
                Socket socket = serverSocket.accept();
                System.out.println("客户端连接中...");
                //有客户端连接后,创建ChatHandler线程
               new Thread(new ChatHandler(this,socket)).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //启动服务
    public static void main(String[] args) {
        ChatServer chatServer=new ChatServer();
        chatServer.start();
    }
}

ChatHandler

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class ChatHandler implements Runnable
{
    //保存服务器对象和对应的客户端socket连接对象
    private ChatServer chatServer;
    private  Socket socket;

   ChatHandler(ChatServer chatServer,Socket socket)
   {
       this.chatServer=chatServer;
       this.socket=socket;
   }

    @Override
    public void run()
    {
        try
        {
            //保存新用户连接进在线用户集合
            chatServer.addClient(socket);

            //读取用户发送的信息
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            //判断当前用户是否发送了消息
            String msg=null;
            //bufferedReader.readLine()是阻塞式的,直到收到客户端发送过来的一条信息
            //读取直到信息是以换行符或者回车结尾,然后返回换行符之前的所有信息
            //当客户端断开连接后,此时readLine函数会返回null值
            while((msg=bufferedReader.readLine())!=null)
            {
                //检查当前用户是否准备退出
                if(chatServer.clientReadyToQuit(msg))
                {
                    break;
                }
                String clientMsg = "客户端[" + socket.getPort() + "]的消息: " + msg;
                System.out.println(clientMsg);
                //转发消息
                chatServer.forwardMessage(socket,clientMsg);
            }
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }finally
        {
            try {
                chatServer.removeClient(socket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

'

客户端代码实现

ChatClient

//客户端
public class ChatClient
{
    private final String DEFAULT_SERVER_HOST = "127.0.0.1";
    private final int DEFAULT_SERVER_PORT = 8080;
    private final String QUIT = "quit";

    private Socket socket;
    private BufferedReader reader;
    private BufferedWriter writer;

    // 发送消息给服务器
    public void send(String msg) throws IOException
    {
        //当前当前客户端与服务器端之间的输出流是否关闭
        if (!socket.isOutputShutdown())
        {
            writer.write(msg + "\n");
            writer.flush();
        }
    }

    // 从服务器接收消息
    public String receive() throws IOException {
        String msg = null;
        //判断当前客户端与服务器端之间的输入流是否关闭
        if (!socket.isInputShutdown()) {
            msg = reader.readLine();
        }
        return msg;
    }

    // 检查用户是否准备退出
    public boolean readyToQuit(String msg) {
        return QUIT.equals(msg);
    }

    public void close() {
        if (writer != null) {
            try {
                System.out.println("关闭socket");
                writer.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start()
    {
        try {
            // 创建socket
            socket = new Socket(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);

            // 创建IO流
            reader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );
            writer = new BufferedWriter(
                    new OutputStreamWriter(socket.getOutputStream())
            );

            // 处理用户的输入
            new Thread(new UserInputHandler(this)).start();

            // 读取服务器转发的消息
            String msg = null;
            while ((msg = receive()) != null) {
                System.out.println(msg);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close();
        }
    }

    public static void main(String[] args) {
        ChatClient chatClient = new ChatClient();
        chatClient.start();
    }
}

UserInputHandler

public class UserInputHandler implements Runnable {

    private ChatClient chatClient;

    public UserInputHandler(ChatClient chatClient) {
        this.chatClient = chatClient;
    }

    @Override
    public void run() {
        try {
            // 等待用户输入消息
            BufferedReader consoleReader =
                    new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String input = consoleReader.readLine();

                // 向服务器发送消息
                chatClient.send(input);

                // 检查用户是否准备退出
                if (chatClient.readyToQuit(input)) {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

伪异步IO编程模型简析

上面版本的聊天室缺陷:

  • 服务端会为每一个客户端都创建一个新的线程用来通信,这样比较耗费资源,好的做法是创建一个线程池用来使用

复习java提供的线程池操作:

代码实现

public class ChatServer
{
    private  final int SERVER_PORT=8080;
    private final String QUIT="quit";

    private ServerSocket serverSocket;
    //key是当前客户端对应的端口号,value是服务器与当前客户端之前的关联输出字符流
    private Map<Integer, Writer> connectedClients;
    //维护一个线程池对象
    private ExecutorService executorService;

    public ChatServer()
    {
        //初始化map集合
        connectedClients=new ConcurrentHashMap<>();
        //线程池对象初始化---线程池里面固定线程数量
        executorService=Executors.newFixedThreadPool(10);
    }

    //新增客户端
    public synchronized void addClient(Socket socket) throws IOException {
        if(socket!=null)
        {
            //添加进集合
            connectedClients.put(socket.getPort(),new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())));
            System.out.println("当前客户端["+socket.getPort()+"]已成功连接到服务器");
        }
    }

    //移除客户端
    public synchronized void removeClient(Socket socket) throws IOException {
        if(socket!=null)
        {
            //从集合中移除,并关闭相关流
            if(connectedClients.containsKey(socket.getPort()))
            {
                connectedClients.get(socket.getPort()).close();
                connectedClients.remove(socket.getPort());
                System.out.println("客户端[" + socket.getPort() + "]已断开连接");
            }
        }
    }

    //群发消息给其他客户端
    public synchronized void forwardMessage(Socket socket,String msg) throws IOException {
        //遍历集合,挨个转发消息--但是不发给自己
// connectedClients.forEach(
// (port,client)->
// {
// if(!port.equals(socket.getPort()))
// {
// System.out.println("当前客户端["+port+"]");
// //这里最好还是把异常外抛出去,因为这里使用lambda写法,因此没法抛出去
// try
// {
// client.write(msg+"\n");
// client.flush();
// }
// catch (IOException e)
// {
// e.printStackTrace();
// }
// }
// }
// );
        for (Integer id : connectedClients.keySet()) {
            if (!id.equals(socket.getPort())) {
                Writer writer = connectedClients.get(id);
                //这个地方必须要加\n,否则readline读取不到换行符,会阻塞住
                writer.write(msg+"\n");
                writer.flush();
            }
        }
    }

    //客户端是否准备好退出连接
    public boolean clientReadyToQuit(String msg)
    {
          return QUIT.equals(msg);
    }

    //关闭服务器端口
    public synchronized void close() throws IOException {
            serverSocket.close();
            System.out.println("关闭socket服务器");
    }

    //启动服务器
    public void start()
    {
        try
        {
            //绑定服务器与对应的端口
            ServerSocket serverSocket = new ServerSocket(SERVER_PORT);
            System.out.println("服务器启动,对应的端口为: "+SERVER_PORT);
            //等待客户端连接
            while(true)
            {
                //如果没有客户端连接,这边就会阻塞住
                System.out.println("等待客户端连接...");
                Socket socket = serverSocket.accept();
                System.out.println("客户端连接中...");
                //有客户端连接后,创建ChatHandler线程
               //new Thread(new ChatHandler(this,socket)).start();
               //使用线程池管理客户端连接---这里excute会调用线程的start方法
                executorService.execute(new ChatHandler(this,socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //启动服务
    public static void main(String[] args) {
        ChatServer chatServer=new ChatServer();
        chatServer.start();
    }
}

相关文章