Netty入门及实现简单通信框架

x33g5p2x  于2021-09-19 转载在 其他  
字(3.9k)|赞(0)|评价(0)|浏览(277)

1. Netty是什么

Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。
说白了,Netty就是一个java高性能的网络通信框架,它运用了操作系统底层提供的NIO通信模型,实现了高性能的网络通信,目前已经被广泛运用到各种中间件中。

2.快速入门

这里直接上服务端代码,netty的代码层次简单易懂,是一个很典型的建造者模式

public static void main(String[] args) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        EventLoopGroup bossGroup = new NioEventLoopGroup(2);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //处理channel
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new IdleHandler());
                            p.addLast(new LoggingHandler(LogLevel.INFO));
                            //消息解码
                            p.addLast(new MyEncoder());
                            p.addLast(new MyDecoder(1024, 4, 4, 0, 0));
                            //业务处理
                            p.addLast(new ServerSingleHandler());
                            p.addLast(new ServerGroupHandler());
                        }
                    });
            //绑定端口
            ChannelFuture f = b.bind(8090).sync();
            f.channel().closeFuture().sync();
        } finally {
            //处理完剩余请求再关闭
            workerGroup.shutdownGracefully();
        }
    }

客户端的编码基本大同小异

public static void main(String[] args) throws Exception {
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.SO_KEEPALIVE,true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new MyEncoder());
                     p.addLast(new MyDecoder(1024, 4, 4, 0, 0));
                     p.addLast(new ClientHandler());
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect("127.0.0.1", 8090).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }

3.Netty的Handler责任链模式

在上面代码可以看到 p.addLast()这个方法,这里就是按照前后顺序添加逻辑处理的handler,多个handler会构造成一条pipeline,也就是一条逻辑处理的责任链,对传入的信息进行顺序处理,而handler也分为inboundHandler和outboundHandler,因此在服务器初始化的时候,我们可以很轻松直白的完成出入信息的逻辑处理流程编写

4.编码和解码器

实现自定义的解码器只需要继承netty框架中自带的encoder/decoder即可,netty自带的简单编码解码器有如下几种:

固定长度的解码器FixedLengthFrameDecoder
行解码器LineBasedFrameDecoder
基于分隔符的解码器DelimiterBasedFrameDecoder
基于长度域的解码器LengthFieldBasedFrameDecoder
自定义格式编码器MessageToByteEncoder

此外还有多种自带的更复杂协议解码器,如redis、protobuf、json等
我们就以上面例子用到的编码解码器为例子,看看如何使用这些自带的类

这里我们使用 标志头(一个int)+body长度(一个int)+body实体的格式构造

public class MyEncoder extends MessageToByteEncoder<MyProtocol> {
    //重写encode方法即可
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MyProtocol myProtocol, ByteBuf byteBuf)  {
        byteBuf.writeInt(MyProtocol.header);//0x76
        byteBuf.writeInt(myProtocol.getContentLength());
        byteBuf.writeBytes(myProtocol.getContent());
    }
}

当我们已经知道消息体格式后,剩下的事情就是通过自定义的decoder把消息体解析出来,这里由于有body长度因此适用基于长度域的解码器LengthFieldBasedFrameDecoder。
这时候,我们就可以配合起来看上面例子中解码器的参数了。

根据设置,我们消息体最长为1024字节,body长度字段偏移为4字节(因为一开始的4字节是标识数0
x76),body长度字段为4字节(一个int)。也就是说,消息体header长度为8字节,解析header中的body长度后可以精确截取body,这样我们一个完整的消息体就被解析出来了。

!注意,后续业务逻辑还要注意编码格式。例如:MessageEntity messageEntity =
GsonUtil.GsonToBean(new String(myProtocol.getContent(), “UTF-8”),
MessageEntity.class);

解码器代码如下:

public class MyDecoder extends LengthFieldBasedFrameDecoder {
    public MyDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分
        in = (ByteBuf) super.decode(ctx, in);
        if (in == null) {
            return null;
        }

        int header = in.readInt();
        if (header != MyProtocol.header) {
            throw new Exception("无法识别协议");
        }
        //读取length字段
        int length = in.readInt();

        if (in.readableBytes() != length) {
            throw new Exception("标记的长度不符合实际长度");
        }
        //读取body
        byte[] bytes = new byte[length];
        in.readBytes(bytes);

        return new MyProtocol(length, bytes);

    }
}

附:demo代码

相关文章

微信公众号

最新文章

更多