netty入门(1)

x33g5p2x  于2021-08-23 转载在 Java  
字(4.9k)|赞(0)|评价(0)|浏览(257)

一 Netty核心组件介绍

1.1、 channel

channel 是一个通道,我们通常说其是一个NIO的构造

1.2、回调

回调本质是一个方法,方法中的参数指向另一个方法的引用;

1.3 、Futrure

通知机制,当方法执行结束时会发一个通知消息;

1.4ChannelHandler

通道处理事件,即一般就是我们的处理业务逻辑的地方;常用的通道处理类 ChannelInboundHandler,SimpleChannelInboundHandler,ChannelHandlerAdapter;不同的通道处理类适配不同的适配器;如下图的处理类或者适配器就是我们常用的类;

二 入门应用

首先需要引入 netty依赖

       <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.55.Final</version>
        </dependency>

2.1、 服务端

  1. 首选 我们 需要创建一个reactor模型的线程组,这里我们选择的是NIO异步线程
  2. 其次我们创建一个服务引导类,即服务端辅助启动器ServerBootstrap;在 ServerBootstrap 我们添加线程组NioEventLoopGroup和事件处理ChildChannelHandler;然后将 ServerBootstrap 绑定方法传入的参数 端口,执行 sync同步阻塞;
  3. 等待同步阻塞完成后,我们调用通道的closeFuture方法和sync将 线程阻塞,直到 处理器执行完成;
  4. 最后调用线程组的shutdownGracefully 方法释放资源;
public void bind(int port) throws Exception{

        // 配置线程组 实质是 reactor线程组
        NioEventLoopGroup parentGroup = new NioEventLoopGroup();
        // 启动 NIO
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 启动类
        serverBootstrap.group(parentGroup)
                .channel(NioServerSocketChannel.class)// 相当于 ServerSocketChannel
                .option(ChannelOption.SO_BACKLOG,1024)//TCP参数
                .childHandler(new ChildChannelHandler());// 处理事件
        // 绑定端口 同步阻塞等待同步成功 channelFuture 异步操作通知回调
        ChannelFuture channelFuture = serverBootstrap
                .bind(port)
                .sync();
        // 同步阻塞等待服务监听端口关闭
        channelFuture
                .channel()
                .closeFuture()
                .sync();
        // 关闭资源
        parentGroup.shutdownGracefully();

    }

其中的关键就是 new ChildChannelHandler(), 当来一个端口时就会新建一个处理类,保证了监听多个端口的可能性;

    /**
     * @Author lsc
     * <p>通道初始化 </p>
     * @Param
     * @Return
     */
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            //管道(Pipeline)持有某个通道的全部处理器
            ChannelPipeline pipeline = socketChannel.pipeline();
            // 添加处理器
            pipeline.addLast(new NettyServerHandler());
        }
    }

然后我们看下处理器 ChildChannelHandler,核心方法有三个

  1. channelRead,读取通道的消息;
  2. channelReadComplete 读取消息完毕后,执行的回调;
  3. exceptionCaught 异常出现执行的回调;
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 转为字节缓冲区
        ByteBuf buf = (ByteBuf)msg;
        // 字节数组
        byte[] bytes = new byte[buf.readableBytes()];
        // 缓冲区数据读入字节数组
        buf.readBytes(bytes);
        // 编码转为字符串
        String body = (new String(bytes, "UTF-8"));
        System.out.println(" get the data from client : " + body);
        // 构造响应数据
        String responseData = "那天刚刚好遇见你";
        //  数据写入缓冲区
        ByteBuf resp = Unpooled.copiedBuffer(responseData.getBytes());
        // 写入数据响应
        ChannelFuture channelFuture = ctx.writeAndFlush(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 写入 seocketChannel
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 异常关闭资源句柄
        ctx.close();
    }
}

2.2 、客户端

  1. 客户端我们也是创建 NIO线程组 NioEventLoopGroup;
  2. 使用 Bootstrap 进行辅助启动,在通道初始化的时候传入处理器 NettyClientHandler;
  3. bootstrap 连接时不止绑定了端口,还要绑定ip;
  4. 最后阻塞处理器执行完成后关闭资源
    public void connect(int port, String host) throws InterruptedException {

        // 创建线程组
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        // netty启动辅助类
        Bootstrap bootstrap = new Bootstrap();
        //
        bootstrap.group(nioEventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 处理IO事件
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        //
                        pipeline.addLast(new NettyClientHandler());
                    }
                });
        // 异步操作
        ChannelFuture connect = bootstrap.connect(host, port).sync();
        // 关闭客户端
        connect.channel().closeFuture().sync();
        // 退出线程组
        nioEventLoopGroup.shutdownGracefully();
    }

我们再来看下处理器NettyServerHandler

处理器的构造都是大同小异

  1. exceptionCaught 异常回调;
  2. channelActive 当与服务端连接成功后被调用的回调;
  3. channelRead: 通道读取消息的回调;
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public NettyClientHandler() {
        super();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.warn("Unexpected exception from downstream : [{}]" ,cause.getMessage());
    }

    /* *
     * @Author lsc
     * <p>触发回调 </p>
     * @Param [ctx]
     * @Return void
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        byte[] bytes = "关注公众号知识追寻者回复netty获取本教程源码".getBytes();
        // 创建节字缓冲区
        ByteBuf message = Unpooled.buffer(bytes.length);
        // 将数据写入缓冲区
        message.writeBytes(bytes);
        // 写入数据
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 消息转为 字节缓冲区
        ByteBuf buf = (ByteBuf)msg;
        // 创建字节数组
        byte[] bytes = new byte[buf.readableBytes()];
        // 获得响应的数据写入字节数组
        buf.readBytes(bytes);
        // 字节数组转为字符串a
        String body = new String(bytes, "UTF-8");
        // 打印
        System.out.println("get the data from server: "+body);
    }

}

2.3 测试

启动服务端,监听8080端口

    public static void main(String[] args) throws Exception {
        NettyServer nettyServer = new NettyServer();
        // 连接的ip d端口
        nettyServer.bind(8080);
    }

服务端启动成功

启动客户端,连接服务端,绑定监听端口

    public static void main(String[] args) throws Exception {
        NettyClient nettyClient = new NettyClient();
        // 连接的ip d端口
        nettyClient.connect(8080,"127.0.0.1");
    }

客户端启动成功,与服务端连接后会收到服务端发的消息

由于服务端与客户端连接成功后,客户端会激活channelActive 方法,故 服务端也收到一条消息;

想获取本套教程源码和后续内容 关注 公众号 知识追寻者 , 后台回复 netty 获取源码

相关文章

微信公众号

最新文章

更多