Netty功能实现:实现心跳检测

x33g5p2x  于2021-11-30 转载在 其他  
字(4.8k)|赞(0)|评价(0)|浏览(309)

netty实现心跳检测

检测逻辑:

1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。
2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费。
3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。

需求设计:

1) 客户端向服务端发送 “I am alive” , sleep一个随机时间,模拟空闲状态
2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数
3) 设定阈值,达到阈值时主动关闭连接

服务端编写

public class HreatBeatServer {
    public static void main(String[] args) {

        //创建两个Reactor 构建主从 Reactor 模型
        //用于处理 连接和读写事件 , 无限循环组(线程池)
        //管理 channel 监听事件
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        // 我们需要一个服务端引导程序来开启服务端。
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //将主从 Reactor 入参,设置当前参数
        //这个方法返回的事对象本身,我们可以点出其他方法, 这种返回类型为对象自身 提供了 链式编程的方式
        serverBootstrap.group(bossGroup, workerGroup)
                //我们需要设置 channel 的 类型
                //对应的是 netty NIO BIO
                //NioServerSocketChannel <== ServerSocketChannel <== ServerSocket
                .channel(NioServerSocketChannel.class)
                //设置当前通道的处理器,使用Netty提供的日志打印处理器
                .handler(new LoggingHandler(LogLevel.INFO))
                //定义客户端连接处理的使用
                //此方法需要设置参数 ChannelInitializer 通道初始化器
                //初始化 要处理客户端 通道, 所以泛型设置为 SocketChannel
                //此类 为抽象类 需要实现其抽象方法 initchannel (alt+enter)快捷键
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //通过channel 获取管道 pipeline
                        // 通道代表我们连接的角色, 管道代表处理业务得逻辑管理
                        // 管道相当于 链表,可以将不同的处理器连接起来,管理处理器的顺序
                        // 使用时 常常使用的事尾插法, addList 将加入到尾部
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        /* * 使用心跳检测处理器 * 读空闲 写空闲 读写空闲 的超时时间 * 最后一个参数是 时间的单位 * IdleStateHandler发现有空闲的时候 会触发 IdleStateEvent时间 * 他会把事件推送给下一个 handler的指定方法 userEventTriggered 去处理 * */
                        pipeline.addLast(new IdleStateHandler(5, 10, 20, TimeUnit.SECONDS));

                        socketChannel.pipeline().addLast(new HreatBeatServerHandler());
                    }
                });
        System.out.println("服务端初始化完成");
        // 设置并启动端口号,但需要使用sync 异步启动
        try {
            ChannelFuture future = serverBootstrap.bind(2020).sync();
            // 将关闭通道的方式也设置为异步的
            // 阻塞Finally中的代码执行
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅方式都关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

IdleStateHandler , 是netty提供的处理器

1)超过多长时间没有读 readerIdleTime
2) 超过多长时间没有写 writerIdleTime
3) 超过多长时间没有读和写 allIdleTime

底层实现检测的是 IdleStateEvent事件,通过管道传递给下一个handler处理,处理方法是userEventTriggered。

处理器编写

public class HreatBeatServerHandler extends SimpleChannelInboundHandler<String> {
    private int times;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        if ("I am alive".equals(msg)) {
            ctx.writeAndFlush(Unpooled.copiedBuffer("over", CharsetUtil.UTF_8));
        }
    }

    //处理心跳检测事件的方法
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
        String eventDesc = null;
        switch (event.state()) {
            case READER_IDLE:
                eventDesc = "读空闲";
                break;
            case WRITER_IDLE:
                eventDesc = "写空闲";
                break;
            case ALL_IDLE:
                eventDesc = "读写空闲";
                break;
        }
        System.out.println(ctx.channel().remoteAddress() + "发生超时事件--" + eventDesc);
        times++;
        if (times > 3) {
            System.out.println("空闲次数超过三次 关闭连接");
            ctx.writeAndFlush("you are out");
            ctx.channel().close();
        }
        //super.userEventTriggered(ctx, evt);

    }
}

其中IdleStateEvent事件,分为READER_IDLE、WRITER_IDLE、ALL_IDLE三大类

客户端编写

客户端不断循环给服务端发消息确认存活的期间 线程睡眠 模拟失去心跳场景

package com.hyc.netty.Hreatbeat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Random;

public class HreatbeatClient {
    public static void main(String[] args) {
        //客户端只需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        // 客户端启动的对象
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        socketChannel.pipeline().addLast(new HreatbeatClientHandler());
                    }
                });
        System.out.println("客户端初始化完成");
        try {
            ChannelFuture future = bootstrap.connect("127.0.0.1", 2020).sync();
            String data = "I am alive";
            while (future.channel().isActive()) {
                //模拟空闲状态
                int num = new Random().nextInt(10);
                Thread.sleep(num * 1000);
                future.channel().writeAndFlush(data);
            }
            //future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }

    }

    static class HreatbeatClientHandler extends SimpleChannelInboundHandler<String> {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            System.out.println("server data:" + s);
            if ("you are out".equals(s)) {
                System.out.println("关闭");
                channelHandlerContext.channel().close();
            }

        }
    }
}

客户端随机线程睡眠 一旦接受到 服务端返回的you are out代表空闲次数超过了 3次 则关闭客户端连接

相关文章