Netty入门案例——Netty实现心跳检测

2023-05-16

文章目录

  • 一、服务端
  • 二、客户端

一、服务端


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class MyServer {
    public static void main(String[] args) throws Exception{


        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); // 日志处理器
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //加入一个netty 提供 IdleStateHandler
                    /*
                    说明
                    1. IdleStateHandler 是netty 提供的处理空闲状态的处理器
                    2. long readerIdleTime : 表示多长时间没有读(server没有读取客户端数据), 就会发送一个心跳检测包检测是否连接
                    3. long writerIdleTime : 表示多长时间没有写(server没有发送给客户端数据), 就会发送一个心跳检测包检测是否连接
                    4. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接

                    5. 当没有读、写或者全部读写,会触发一个IdleStateEvent事件
                    6. 当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理
                        通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
                     */
                    pipeline.addLast(new IdleStateHandler(30,30,30, TimeUnit.SECONDS));
                    //向pipeline加入解码器
                    pipeline.addLast("decoder", new StringDecoder());
                    //向pipeline加入编码器
                    pipeline.addLast("encoder", new StringEncoder());
                    //加入一个对空闲检测进一步处理的handler(自定义)
                    pipeline.addLast(new MyServerHandler());
                }
            });

            //启动服务器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}


import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;

import java.nio.charset.Charset;

public class MyServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     *
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception
     * IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理
     * 通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if(evt instanceof IdleStateEvent) {

            //将  evt 向下转型 IdleStateEvent
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            switch (event.state()) {
                case READER_IDLE:
                  eventType = "读空闲";
                  break;
                case WRITER_IDLE:
                    eventType = "写空闲";
                    break;
                case ALL_IDLE:
                    eventType = "读写空闲";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);
            System.out.println("服务器做相应处理..");

            //如果发生空闲,我们关闭通道
            ctx.channel().close();
        }
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 接收到客户端消息的处理逻辑
        System.out.println("[Client]: " + msg);
        if ("heartbeat".equals(msg)) {
            // 收到客户端的心跳请求,发送心跳应答消息
            ctx.channel().writeAndFlush(Unpooled.copiedBuffer("heartbeat_ack", Charset.defaultCharset()));
        } else {
            // 收到其他消息,则按常规消息处理逻辑进行处理
            ctx.channel().writeAndFlush(Unpooled.copiedBuffer("消息已收到", Charset.defaultCharset()));
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 断开连接后的处理逻辑
        System.out.println("与客户端断开连接:" + ctx.channel().remoteAddress());
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

二、客户端


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class HeartbeatClient {
    public static final String HOST = "127.0.0.1";
    public static final int PORT = 7000;
    private static final int READ_TIMEOUT = 10; // 心跳超时时间,单位:秒

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new IdleStateHandler(READ_TIMEOUT, 5, 0, TimeUnit.SECONDS));
                            //向pipeline加入解码器
                            ch.pipeline().addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            ch.pipeline().addLast("encoder", new StringEncoder());
                            ch.pipeline().addLast(new HeartbeatClientHandler());
                        }
                    });

            ChannelFuture channelFuture = b.connect(HOST, PORT).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }

        Thread.sleep(10000);
    }

    static class HeartbeatClientHandler extends SimpleChannelInboundHandler<String> {
        private int heartbeatCount = 0;

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            // 接收到服务端消息的处理逻辑
            System.out.println("[Server]: " + msg);

            // 重置心跳发送计数器
            heartbeatCount = 0;
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleState state = ((IdleStateEvent) evt).state();
                if (state == IdleState.WRITER_IDLE) {
                    // 客户端向服务端发送心跳请求
                    System.out.println("发送心跳请求");
                    ctx.writeAndFlush("heartbeat#" + ++heartbeatCount);
                } else if (state == IdleState.READER_IDLE) {
                    // 超时未接收到服务端应答消息,主动断开连接
                    System.out.println("超时未收到服务端应答消息,断开连接");
                    ctx.channel().close();
                }
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }

    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty入门案例——Netty实现心跳检测 的相关文章

随机推荐