netty 编码解码(5)

x33g5p2x  于2021-08-23 转载在 Java  
字(4.0k)|赞(0)|评价(0)|浏览(319)

一 编码解码概念

编码(encode)在程序中其实就是序列化,将对象转为 字节数组,方便于网络传输;

解码(decode)在程序中实际上就是反序列化,将字节数组转为原始对象。

在jdk 自带的序列化机制需要实现 java.io.Serializable接口并生成序列化ID,就可以实现对象的序列化

但java原生的序列化机制有些缺点:

  1. 无法跨语言,扩展性差;
  2. 序列化后码流太大,性能低;

二 编码器与节码器

2.1 解码器

netty 中提供了抽象节码器类ByteToMessageDecoderReplayingDecoder继承了ByteToMessageDecoder类,是对其的一种扩展,但并非所有的 ByteBuf 操作都被支持;MessageToMessageDecoder解码器是将一种消息转为另一种消息;

MessageToMessageDecoder 中默认实现了一些解码器,我们可以直接拿来使用;比如 之前文章我们使用过的 字符串解码器StringDecoder,baes64解码器Base64Encoder;

ByteToMessageDecoder 也提供 了一些解码器,比如我们之前用过的 定长解码器FixedLengthFrameDecoder,换行解码器LineBasedFrameDecoder;

整个解码的流程是先将入站的数据通过解码器解码后传给 ChannelInboundHandlerAdapter;

2.2 编码器

与解码器对应,netty 提供了与之相对应的编码器;

ByteToMessageEncoder和MessageToMessageDecoder相对应;

MessageToByteEncoder 和 MessageToMessageDecoder 相对应;

2.3 手动实现字符串编解码

编码器实现,直接将字符串转为字节数组传输;

/**
 * @Author lsc
 * <p> </p>
 */
public class MsgEncode extends MessageToByteEncoder {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        String msg = (String) o;
        byte[] bytes = msg.getBytes(Charset.forName("UTF-8"));
        byteBuf.writeBytes(bytes);
    }
}

解码器实现, 将字节数组转为字符串;

/**
 * @Author lsc
 * <p> </p>
 */
public class MsgDeocde extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

        int size = byteBuf.readableBytes();
        if (size<=2){
            System.out.println("报文长度不满足抛弃");
            return;
        }
        // 创建字节数组
        byte[] bytes = new byte[size];
        // 缓冲区数据读入字节数组
        byteBuf.readBytes(bytes);
        // 编码转为字符串
        String body = (new String(bytes, "UTF-8"));
        list.add(body);
    }
}

然后将编码器,与解码器注入 pipeline; 服务端示例如下;

/**
     * @Author lsc
     * <p>通道初始化 </p>
     * @Param
     * @Return
     */
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            // 管道(Pipeline)持有某个通道的全部处理器
            ChannelPipeline pipeline = socketChannel.pipeline();
            // 添加编码器和解码器
            pipeline.addLast(new MsgEncode());
            pipeline.addLast(new MsgDeocde());
            // 添加处理器
            pipeline.addLast(new NettyServerHandler());
        }
    }

客户端也需要注入;

public void connect(int port, String host) throws InterruptedException {

        // 创建线程组
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        // netty启动辅助类
        Bootstrap bootstrap = new Bootstrap();
        //
        bootstrap.group(nioEventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 处理IO事件
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 添加编码器和解码器
                        pipeline.addLast(new MsgEncode());
                        pipeline.addLast(new MsgDeocde());
                        // 处理器
                        pipeline.addLast(new NettyClientHandler());
                    }
                });
        // 异步操作
        ChannelFuture connect = bootstrap.connect(host, port).sync();
        // 关闭客户端
        connect.channel().closeFuture().sync();
        // 退出线程组
        nioEventLoopGroup.shutdownGracefully();
    }

客户端处理器直接发送字符串消息

@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    public NettyClientHandler() {
        super();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warn("Unexpected exception from downstream : [{}]" ,cause.getMessage());
    }

    /* *
     * @Author lsc
     * <p>触发回调 </p>
     * @Param [ctx]
     * @Return void
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        String message = "关注公众号知识追寻者回复netty获取本教程源码";
        // 写入数据
        ctx.writeAndFlush(message);

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 打印
        System.out.println("get the data from server: "+msg);
    }
}

同理服务端处理器也是直接发送字符串消息

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" get the data from client : " + msg);
        // 构造响应数据
        String responseData = "那天刚刚好遇见你";
        ctx.writeAndFlush(responseData);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 写入 seocketChannel
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 异常关闭资源句柄
        ctx.close();
    }
}

服务启动后服务端效果示例图如下

客户端效果图示例如下

相关文章

微信公众号

最新文章

更多