Netty的使用-spring boot整合篇

x33g5p2x  于2022-02-12 转载在 Spring  
字(10.8k)|赞(0)|评价(0)|浏览(548)

1、什么是Netty?

Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty是基于nio的,它封装了jdk的nio,让我们使用起来更加方法灵活。

2、它有什么特点?

  • 高并发:Netty 是一款基于 NIO(Nonblocking IO,非阻塞IO)开发的网络通信框架,对比于 BIO(Blocking I/O,阻塞IO),它的并发性能得到了很大提高。
  • 传输快:Netty 的传输依赖于零拷贝特性,尽量减少不必要的内存拷贝,实现了更高效率的传输。
  • 封装好:Netty 封装了 NIO 操作的很多细节,提供了易于使用调用接口。

3、为什么并发高

两张图让你了解BIO和NIO的区别

  • 阻塞IO的通信方式
    (由于图片莫名的无法上传,请读者自行百度😅)
  • 非阻塞IO的通信方式
    (由于图片莫名的无法上传,请读者自行百度😅)
  • BIO,同步阻塞IO,阻塞整个步骤,如果连接少,他的延迟是最低的,因为一个线程只处理一个连接,适用于少连接且延迟低的场景,比如说数据库连接。而且一个连接一个线程,客户端有连接请求时服务器端就需要启动一个线程进行处理。线程开销大。
  • NIO,同步非阻塞IO,阻塞业务处理但不阻塞数据接收,适用于高并发且处理简单的场景,比如聊天软件。
  • 多路复用IO,他的两个步骤处理是分开的,也就是说,一个连接可能他的数据接收是线程a完成的,数据处理是线程b完成的,他比BIO能处理更多请求。
  • 信号驱动IO,这种IO模型主要用在嵌入式开发。
  • 异步IO,他的数据请求和数据处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。

4、为什么传输速度快

传输快是因为,netty的读写都是在堆内存上进行的,netty接收数据时,它会在堆内存之外开辟一块内存,

数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。

5、netty和websocket区别

  • 作用不同:Tomcat 是 Servlet 容器,可以视为 Web 服务器,而 Netty 是异步事件驱动的网络应用程序框架和工具用于简化网络编程,例如TCP和UDP套接字服务器。
  • 协议不同:Tomcat 是基于 http 协议的 Web 服务器,而 Netty 能通过编程自定义各种协议,因为 Netty 本身自己能编码/解码字节流,所有 Netty 可以实现,HTTP 服务器、FTP 服务器、UDP 服务器、RPC 服务器、WebSocket 服务器、Redis 的 Proxy 服务器、MySQL 的 Proxy 服务器等等。

6、Netty中的Channel

①Channel

如上图所示:

  • 用户端连接成功后,将新建一个channel与该用户进行绑定
  • channel从EventLoopGroup获得一个EventLoop,并注册到该EventLoop,channel生命周期内都和该EventLoop在一起(注册时获得selectionKey)
  • channel同用户端进行网络连接、关闭和读写,生成相对应的event(改变selectinKey信息),触发eventloop调度线程进行执行。
  • 如果是读事件,执行线程调度pippline来处理用户业务逻辑
  • Channel包含四种状态:注册->活跃->非活跃->非注册

②ChannelPipeline和ChannelHandler

ChannelPipeline和ChannelHandler用于channel事件的拦截和处理,Netty使用类似责任链的模式来设计ChannelPipeline和ChannelHandler ChannelPipeline相当于ChannelHandler的容器,channel事件消息在ChannelPipeline中流动和传播,相应的事件能够被ChannelHandler拦截处理、传递、忽略或者终止,如下图所示:

③ChannelHandler

ChannelHandler负责I/O事件或者I/O操作进行拦截和处理,用户可以通过ChannelHandlerAdapter来选择性的实现自己感兴趣的事件拦截和处理。

7、Netty ChannelHandler生命周期

方法描述
handlerAdded握手建立(Handler被加入Pipeline时触发,仅触发一次)
channelRegistered分配连接线程(channel成功注册到EventLoopGroup的EventLoop)
channelActive通道活跃,可以使用(连接就绪)
channelRead有数据可读时触发
channelReadComplete有数据可读,并且读完时触发
channelInactive客户端主动断开连接时触发,底层TCP连接断开,通道不活跃
channelUnregistered连接关闭后,释放绑定的EventLoop线程
handlerRemoved握手取消(handler被Pipeline移除时触发)

8、Springboot整合Netty

1)引入依赖
<dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <!--不用加版本号-->
  </dependency>
2)Netty与springboot的整合配置,使用的是spring的监听者接口
@Component
public class NettybootServerInitConfig implements ApplicationListener<ContextRefreshedEvent> {
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if(event.getApplicationContext().getParent() == null){
            WssServer.getInstance().start();
        }
    }
}
3)netty服务端主要配置类
@Component
public class WssServer {
    /**
     * 单例静态内部类
     */
    public static class SingletionWSServer {
        static final WssServer instance = new WssServer();
    }

    public static WssServer getInstance() {
        return SingletionWSServer.instance;
    }

    private EventLoopGroup mainGroup;
    private EventLoopGroup subGroup;
    private ServerBootstrap server;
    private ChannelFuture future;

    public WssServer() {
        mainGroup = new NioEventLoopGroup();
        subGroup = new NioEventLoopGroup();
        server = new ServerBootstrap();
        server.group(mainGroup, subGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WssServerInitialzer()); // 添加自定义初始化处理器
    }

    public void start() {
        //端口号不要和spring的端口一样
        future = this.server.bind(8086);
        System.err.println("netty 服务端启动完毕 .....");
    }
}
public class WssServerInitialzer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
//        pipeline.addLast(new IdleStateHandler(3,4,5, TimeUnit.SECONDS));
        //websocket基于http协议,所以需要http编解码器
        pipeline.addLast(new HttpServerCodec());
        //添加对于读写大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        //对httpMessage进行聚合
        pipeline.addLast(new HttpObjectAggregator(1024*64));

        // ================= 上述是用于支持http协议的 ==============

        //websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址
        //比如处理一些握手动作(ping,pong)
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

        //自定义handler
        pipeline.addLast(new ChatHandler());

    }
}
4)自定义的处理类
@ChannelHandler.Sharable
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    //用于记录和管理所有客户端的channel
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private WebSocketServerHandshaker handshaker;

    /*
     * 握手建立
     * 客户端创建的时候触发,当客户端连接上服务端之后,就可以获取该channel,然后放到channelGroup中进行统一管理
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("握手建立");
        Channel incoming = ctx.channel();
        clients.add(incoming);
    }

    /*
     * channelAction
     * channel 通道 action 活跃的
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。
     */
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().localAddress().toString() + " 通道活跃");
    }

    /*
     * 功能:读取 h5页面发送过来的信息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)
            throws Exception {
        if (msg instanceof FullHttpRequest) {// 如果是HTTP请求,进行HTTP操作
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {// 如果是Websocket请求,则进行websocket操作
            System.out.println("websocket接收到消息:"+msg);
            handleWebSocketFrame(ctx, msg);
        }

    }

    /*
     * channelInactive
     * channel 通道 Inactive 不活跃的
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。
     */
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().localAddress().toString() + " 通道不活跃!");
    }

    /*
     * 功能:读空闲时移除Channel
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent evnet = (IdleStateEvent) evt;
            // 判断Channel是否读空闲, 读空闲时移除Channel
            if (evnet.state().equals(IdleState.READER_IDLE)) {
                System.out.println("读空闲移除Channel");
                UserInfoManager.removeChannel(ctx.channel());
            }
        }
        ctx.fireUserEventTriggered(evt);
    }

    /*
     * 握手取消
     * 客户端销毁的时候触发
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("握手取消");
        Channel incoming = ctx.channel();
        clients.remove(incoming);
    }

    /**
     * 功能:服务端发生异常的操作
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    /*
     * 功能:处理HTTP的代码
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws UnsupportedEncodingException {
        // 如果HTTP解码失败,返回HHTP异常
        if (req instanceof HttpRequest) {
            HttpMethod method = req.method();
            // 如果是websocket请求就握手升级
            if ("/ws".equalsIgnoreCase(req.uri())) {
                System.out.println(" req instanceof HttpRequest");
                WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                        "", null, false);
                handshaker = wsFactory.newHandshaker(req);
                if (handshaker == null) {
                    WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
                } else {
                    handshaker.handshake(ctx.channel(), req);
                }
            }

        }
    }

    /*
     * 处理Websocket的代码
     */
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判断是否是关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 文本消息,不支持二进制消息
        if (frame instanceof TextWebSocketFrame) {
            // 返回应答消息
            String requestmsg = ((TextWebSocketFrame) frame).text();
            System.out.println("收到信息"+requestmsg);
            String[] array= requestmsg.split(",");
            // 将通道加入通道管理器
            UserInfoManager.addChannel(ctx.channel(),array[0]);
            if (array.length== 3) {
                // 将信息返回给h5
                String sender=array[0];
                String recevier=array[1];
                String message=array[2];
                UserInfoManager.broadcastMess(sender,recevier,message);
            }
        }
    }
}
5)channel与用户的id绑定的处理类
public class UserInfoManager {
    private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);

    private static ConcurrentMap<Channel, UserInfo> userInfos = new ConcurrentHashMap<>();
    /**
     * 登录注册 channel
     */
    public static void addChannel(Channel channel, String uid) {
        String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
        UserInfo userInfo = new UserInfo();
        userInfo.setUserId(uid);
        userInfo.setAddr(remoteAddr);
        userInfo.setChannel(channel);
        userInfos.put(channel, userInfo);
    }
    public static void removeChannel(Channel channel){
        userInfos.remove(channel);
        System.out.println("移除channel成功");
    }

    /**
     * 普通消息
     * @param message
     */
    public static void broadcastMess(String sender,String recevier,String message) {
        if (!BlankUtil.isBlank(message)) {
            try {
                rwLock.readLock().lock();
                Set<Channel> keySet = userInfos.keySet();
                for (Channel ch : keySet) {
                    UserInfo userInfo = userInfos.get(ch);
                    if (!userInfo.getUserId().equals(recevier)) {
                        continue;
                    }
                    String backmessage=sender+":"+message;
                    ch.writeAndFlush(new TextWebSocketFrame(backmessage));
                    System.out.println("客户端收到消息:"+backmessage);
                      /*responseToClient(ch,message);*/
                }
            } finally {
                rwLock.readLock().unlock();
            }
        }
    }

    public static UserInfo getUserInfo(Channel channel) {
        return userInfos.get(channel);
    }
}
6)自定义的channel实体类包含:用户id,地址,通道
private String userId;  // UID
    private String addr;    // 地址
    private Channel channel;// 通道

    public String getAddr() {
        return addr;
    }

    public void setAddr(String addr) {
        this.addr = addr;
    }

    public Channel getChannel() {
        return channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }
7)工具类
public class NettyUtil {
    public static String parseChannelRemoteAddr(final Channel channel) {
        if (null == channel) {
            return "";
        }
        final SocketAddress remote = channel.remoteAddress();
        final String addr = remote != null ? remote.toString() : "";

        if (addr.length() > 0) {
            int index = addr.lastIndexOf("/");
            if (index >= 0) {
                return addr.substring(index + 1);
            }
            return addr;
        }
        return "";
    }
}
public class BlankUtil {
    public static boolean isBlank(String ...strings){
        for(String str:strings){
            if (str == null || str.trim().length() == 0){
                return true;
            }
        }
        return false;
    }
}
8)客户端
<template>
  <div>
    <div id="content" class="row-center">
      <div
        id="chat-box"
        class="row-center"
        style="height: 100px; width: 100px; border: 2px red ridge"
      ></div>
      <div id="input-box">
        <input class="chat-input" id="chat-input" placeholder="message" />
        <input id="myid" placeholder="myid" />
        <button id="login-button" @click="login()">登录</button>
        <input id="friendid" placeholder="friendid" />
        <button class="chat-button" id="send" @click="send()">发送</button>
      </div>
    </div>
  </div>
</template>
<script>
import $ from "jquery";
var socket = null;
export default {
  created() {
    var ipAddress = "127.0.0.1";
    //新建socket对象
    socket = new WebSocket("ws://" + ipAddress + ":8086/ws");
  },
  methods: {
    send() {
      var data =
        $("#myid").val() +
        "," +
        $("#friendid").val() +
        "," +
        $("#chat-input").val();
      socket.send(data);
      socket.onmessage = function (event) {
        var datas = event.data.split(",");
        console.log("服务器消息====" + datas);
        $("#chat-box").text(datas);
      };
    },
    login() {
      var data = $("#myid").val();
      // socket.onopen = function () {
      socket.send(data);
      // }
    },
  },
};

</script>
<style lang="less" scoped>
</style>

相关文章