SpringBoot结合Netty实现WebSocket推送功能

2023-05-16

文章目录

    • Http协议
    • WebSocket
    • 代码演示
    • 总结:

Http协议

  • 无状态,使得HTTP服务轻量级-
  • HTTP消息冗长:HTTP消息包含消息头,消息体,换行符等。且大多采用文本传输。所以HTTP消息会有很多冗余消息并且消息占用字节数大,消耗过多的带宽
  • 半双工通信:同一时刻,数据只能往同一方向传输。比如向服务器发送消息时,服务器此时不可以向客户端发送消息。(不过目前HTTP2已经支持了全双工通信)
    1、WebSocket可以由tomcat和netty进行实现,这里选择用netty进行实现

WebSocket

  • 底层采用单一TCP链接,全双工通信
  • 对代理、防火墙、路由器透明
  • 无头部消息,cookie,身份验证
  • 通过ping/pong消息保持心跳

代码演示

  • NettyBooter.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;



@Component
@Slf4j
public class NettyBooter implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware {

    private static final Logger LOGGER=  LoggerFactory.getLogger(NettyBooter.class);

    @Value("${netty.websocket.port}")
    private int port;

    @Value("${netty.websocket.ip}")
    private String ip;

    @Value("${netty.websocket.path}")
    private String path;

    @Value("${netty.websocket.max-frame-size}")
    private int maxFrameSize;

    private ApplicationContext applicationContext;

    private Channel serverChannel;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

//    @Autowired
//    private AppProp appProp;
//    @Override
//    public void onApplicationEvent(ContextRefreshedEvent event) {
//
//        if(event.getApplicationContext().getParent()!=null){
//            try{
//                WebSocketServer.getInstance().start();
//            }catch (Exception e){
//
//            }
//        }
//    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.localAddress(new InetSocketAddress(this.ip, this.port));
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>(){

                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();

                    pipeline.addLast(new HttpServerCodec());
                    // 支持异步发送大的码流(大的文件传输),但不占用过多的内存,防止java内存溢出
                    pipeline.addLast(new ChunkedWriteHandler());
                    // http 消息聚合器  512*1024为接收的最大contentlength
                    pipeline.addLast(new HttpObjectAggregator(65536));
//                    pipeline.addLast(new HeartBeatHandler());
                    pipeline.addLast(new ChannelInboundHandlerAdapter(){
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            if(msg instanceof FullHttpRequest) {
                                FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
                                String uri = fullHttpRequest.uri();
                                if (!uri.equals(path)) {
                                    // 访问的路径不是 websocket的端点地址,响应404
                                    ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND))
                                            .addListener(ChannelFutureListener.CLOSE);
                                    return ;
                                }
                            }
                            super.channelRead(ctx, msg);
                        }
                    });
                    pipeline.addLast(new WebSocketServerCompressionHandler());
                    pipeline.addLast(new WebSocketServerProtocolHandler(path, null, true, maxFrameSize));
             pipeline.addLast(applicationContext.getBean(WebsocketMessageHandler.class));
                   // pipeline.addLast(new WebsocketMessageHandler());
                   //使用getBean可以注入实例
                }
            });
            Channel channel = serverBootstrap.bind().sync().channel();
            this.serverChannel = channel;
            LOGGER.info("websocket 服务启动,ip={},port={}", this.ip, this.port);
            channel.closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
        if (this.serverChannel != null) {
            this.serverChannel.close();
        }
        LOGGER.info("websocket 服务停止");
    }


}

  • WebsocketMessageHandler .java
package com.canbot.enterprise.websocket.netty;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


@Sharable
@Component
public class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketMessageHandler.class);

//    @Autowired
//    DiscardService discardService;
    //维护一个group将每一个上线的机器加入
    public static ChannelGroup channelGroup;

    static {
        channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }


    //收到客户端,发送过来的数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {

        if (msg instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
            // 业务层处理数据(若是任务比较耗时,这里使用线程池可以异步处理任务)
//            this.discardService.discard(textWebSocketFrame.text());
            //将想要发送的消息推送给客户端
            sendMessage(ctx);
//            sendMessage();

        } else {
            // 不接受文本以外的数据帧类型
            ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE);
        }
    }

    //给固定的人发送消息,scheduleWithFixedDelay
    public void sendMessage(ChannelHandlerContext ctx){
//        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
//        scheduledThreadPool.scheduleWithFixedDelay(() -> {
//            // 群发
//            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器发送消息了"+System.currentTimeMillis()));
//
//        }, 0,30, TimeUnit.SECONDS);
        ctx.channel().eventLoop().scheduleWithFixedDelay(() -> {

            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器发送消息了"+System.currentTimeMillis()));

        }, 0,30, TimeUnit.SECONDS);
    }



    //向所有在线的客户端发送消息,每天定时发送消息,
    public void sendMessage(){
    
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);


        long oneDay = 24 * 60 * 60 * 1000;
        long initDelay  = getTimeMillis("15:48:00") - System.currentTimeMillis();
        initDelay = initDelay > 0 ? initDelay : oneDay + initDelay;

        scheduledThreadPool.scheduleAtFixedRate(
                ()->{
                    channelGroup.writeAndFlush(new TextWebSocketFrame("服务器主动推送消息。当前服务器时间:"+System.currentTimeMillis()));
                },
                initDelay,
                oneDay,
                TimeUnit.MILLISECONDS);

    }

    //客户端与服务器建立连接的时候触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        channelGroup.add(ctx.channel());
        sendMessage();
        LOGGER.info("链接创建:{}", ctx.channel().remoteAddress());
    }


    //客户端与服务器断开连接的时候触发
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        channelGroup.remove(ctx.channel());
        LOGGER.info("链接断开:{}", ctx.channel().remoteAddress());
    }

    private static long getTimeMillis(String time) {
        try {
            DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
            DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
            Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
            return curDate.getTime();
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return 0;

    }
}

总结:

sendMessage()实现定时向所有在线的机器推送消息,sendMessage(ctx)实现向指定机器发送延迟消息。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot结合Netty实现WebSocket推送功能 的相关文章

随机推荐