netty实现websocket推送消息

2023-11-02

前言

由于http协议为应答模式的连接,无法保持长连接于是引入了websocket套接字长连接概念,能够保持数据持久性的交互;本篇文章将告知读者如何使用netty实现简单的消息推送功能

websocket请求头

GET / HTTP/1.1
Host: 127.0.0.1:8096
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36
Upgrade: websocket
Origin: http://localhost:8056
Sec-WebSocket-Version: 13

websocket请求头 会有 Connection 升级为 Upgrade, 并且Upgrade 属性值为 websocket

引入依赖

引入netty和 引擎模板依赖

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.55.Final</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
    </dependencies>

创建WebSocketServer

创建Nio线程组,并在辅助启动器中中注入 自定义处理器;定义套接字端口为8096;

/**
 * @author lsc
 * <p> </p>
 */
@Slf4j
public class WebSocketServer {

    public void init(){
        NioEventLoopGroup boss=new NioEventLoopGroup();
        NioEventLoopGroup work=new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap=new ServerBootstrap();
            bootstrap.group(boss,work);
            bootstrap.channel(NioServerSocketChannel.class);
            // 自定义处理器
            bootstrap.childHandler(new SocketChannelInitializer());
            Channel channel = bootstrap.bind(8096).sync().channel();
            log.info("------------webSocket服务器启动成功-----------:"+channel);
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            log.info("---------运行出错----------:"+e);
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
            log.info("------------websocket服务器已关闭----------------");
        }
    }
}

SocketChannelInitializer

SocketChannelInitializer 中定义了聚合器 HttpObjectAggregator 将多个http片段消息聚合成完整的http消息,并且指定大小为65536;最后注入自定义的WebSocketHandler;

/**
 * @author lsc
 * <p> </p>
 */
public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) {
        //设置log监听器
        ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));
        //设置解码器
        ch.pipeline().addLast("http-codec",new HttpServerCodec());
        //聚合器
        ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
        //用于大数据的分区传输
        ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
        //自定义业务handler
        ch.pipeline().addLast("handler",new WebSocketHandler());
    }
}

WebSocketHandler

WebSocketHandler 中对接收的消息进行判定,如果是websocket 消息 则将消息广播给所有通道;

/**
 * @author lsc
 * <p> </p>
 */
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object>  {

    // 存放已经连接的通道
    private  static ConcurrentMap<String, Channel> ChannelMap=new ConcurrentHashMap();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest){

            System.out.println("------------收到http消息--------------"+msg);
            handleHttpRequest(ctx,(FullHttpRequest)msg);
        }else if (msg instanceof WebSocketFrame){
            //处理websocket客户端的消息
            String message = ((TextWebSocketFrame) msg).text();
            System.out.println("------------收到消息--------------"+message);
//            ctx.channel().writeAndFlush(new TextWebSocketFrame(message));
            // 将消息回复给所有连接
            Collection<Channel> values = ChannelMap.values();
            for (Channel channel: values){
                channel.writeAndFlush(new TextWebSocketFrame(message));
            }
        }

    }

    /**
     * @author lsc
     * <p> 处理http请求升级</p>
     */
    private void handleHttpRequest(ChannelHandlerContext ctx,
                                   FullHttpRequest req) throws Exception {

        // 该请求是不是websocket upgrade请求
        if (isWebSocketUpgrade(req)) {
            String ws = "ws://127.0.0.1:8096";
            WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(ws, null, false);
            WebSocketServerHandshaker handshaker = factory.newHandshaker(req);

            if (handshaker == null) {// 请求头不合法, 导致handshaker没创建成功
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
                // 响应该请求
                handshaker.handshake(ctx.channel(), req);
            }
            return;
        }
    }

    //n1.GET? 2.Upgrade头 包含websocket字符串?
    private boolean isWebSocketUpgrade(FullHttpRequest req) {
        HttpHeaders headers = req.headers();
        return req.method().equals(HttpMethod.GET)
                && headers.get(HttpHeaderNames.UPGRADE).equals("websocket");
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //添加连接
        log.debug("客户端加入连接:"+ctx.channel());
        Channel channel = ctx.channel();
        ChannelMap.put(channel.id().asShortText(),channel);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //断开连接
        log.debug("客户端断开连接:"+ctx.channel());
        Channel channel = ctx.channel();
        ChannelMap.remove(channel.id().asShortText());
    }
}

最后将WebSocketServer 注入spring监听器,在服务启动的时候运行;

@Slf4j
@Component
public class ApplicationConfig implements ApplicationListener<ApplicationReadyEvent> {



    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        WebSocketServer webSocketServer = new WebSocketServer();
        webSocketServer.init();
    }
}

视图转发

编写 IndexController 对视图进行转发

/**
 * @author lsc
 * <p> </p>
 */
@Controller
public class IndexController {

    @GetMapping("index")
    public ModelAndView index(){
        ModelAndView modelAndView = new ModelAndView("socket");
        return modelAndView;
    }
}

html

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>用户页面</title>
</head>
<body>
<div id="msg"></div>
<input type="text" id="text">
<input type="submit" value="send" onclick="send()">
</body>
<script>
    var msg = document.getElementById("msg");
    var wsServer = 'ws://127.0.0.1:8096';
    var websocket = new WebSocket(wsServer);
    //监听连接打开
    websocket.onopen = function (evt) {
        msg.innerHTML = "The connection is open";
    };

    //监听服务器数据推送
    websocket.onmessage = function (evt) {
        msg.innerHTML += "<br>" + evt.data;
    };

    //监听连接关闭
    websocket.onclose = function (evt) {
        alert("连接关闭");
    };

    function send() {
        var text = document.getElementById("text").value
        websocket.send(text);
    }
</script>
</html>

附上配置文件

server:
  port: 8056

spring:
  # 引擎模板配置
  thymeleaf:
    cache: false # 关闭缓存
    mode: html # 去除htm5严格校验
    prefix: classpath:/templates/ # 指定 thymeleaf 模板路径
    encoding: UTF-8 # 指定字符集编码
    suffix: .html

运行服务后

前端页面显示消息

服务端打印消息

源码获取:知识追寻者公众号回复:netty
配套教程

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

netty实现websocket推送消息 的相关文章

随机推荐

  • Sublime Text 2.0.1 2217 版本破解注册方法(32位)

    准备工具 十六进制编辑器 这里以WinHex为例 1 安装最新的Sublime Text 2 0 1 2217版本http www sublimetext com 2 32位 2 备份sublime text exe 然后用 WinHex十
  • IntelliJ IDEA使用教程:一个沉浸式的 IDE 工具

    IntelliJ IDEA 一个沉浸式的 IDE 工具 本文基础 IntelliJ IDEA 旗舰版或者说是 JetBrains 家所有 IDE 的旗舰版 核心概念 讲 IntelliJ IDEA 的好 带更多人入坑 说几句老实话 劝别人换
  • RS485 硬件自收发切换的实现

    RS485抗噪音抗干扰能力强 传输距离远 支持多点通信 是工控行业首选串行接口 485规定的电气特性为2线 半双工多点通信 采用两线差分信号传输数据 具有抗共模干扰的能力 由于是半双工模式 因此通讯时需要切换收发状态 目前常用的485收发切
  • Ubuntu如何重启网络

    更多课程点击此处 快速链接 专栏目录 环境搭建安装问题笔记目录 付费专栏 付费课程 购买须知 个人博客笔记导读目录 全部 网络的IP地址没了 我们只要重启网络即可 sudo systemctl restart NetworkManager
  • “vue-amap“: “^0.5.10“, 高德地图 vue版本 H5地图 实现根据地名搜索坐标,经纬度

    本文介绍高德地图在vue框架中的使用方法 H5地图通用 首先引入 vue amap 0 5 10 npm install vue amap 然后在main js里写入如下代码 import VueAMap from vue amap Vue
  • javaweb 配置系统错误页面404或500 等其他错误代码

    web xml
  • Linux网络的DHCP、FTP原理及配置

    DHCP原理与配置 DHCP服务 使用DHCP动态配置主机地址 DHCP服务 可分配的地址信息主要包括 配置DHCP服务器 FTP 用来传输文件的协议 端口号 FTP数据连接模式 实现FTP功能 实验 DHCP服务 使用DHCP动态配置主机
  • 视图在SQL中的应用

    我们之前对 SQL 中的数据表查询进行了讲解 今天我们来看下如何对视图进行查询 视图 也就是我们今天要讲的虚拟表 本身是不具有数据的 它是 SQL 中的一个重要概念 从下面这张图中 你能看到 虚拟表的创建连接了一个或多个数据表 不同的查询应
  • 锚点机制在目标检测领域的发展综述

    摘要 目标检测是计算机视觉领域的基本任务 近年来 基于深度学习的目标检测研究发展十分迅速 锚点 anchor 机制广泛应用于主流目标检测器中 多尺度的锚点是检测器解决尺度问题的有效方法 但锚点策略也存在尺寸固定 模型鲁棒性差等问题 根据优化
  • fifo介绍及fifo IP核使用(工程文件获取请参考文末)

    一 fifo简介 1 fifo first in first out的缩写 先进先出数据缓存器 与普通存储器的区别 对外接口没有地址线 由此所带来的优点是 不用处理地址信号 时序较简单 缺点是 不能像普通存储器那样自由读写某个地址的数据 只
  • 小巧玲珑:机器学习届快刀XGBoost的介绍和使用

    欢迎大家前往腾讯云技术社区 获取更多腾讯海量技术实践干货哦 作者 张萌 序言 XGBoost效率很高 在Kaggle等诸多比赛中使用广泛 并且取得了不少好成绩 为了让公司的算法工程师 可以更加方便的使用XGBoost 我们将XGBoost更
  • JVM垃圾回收器

    目录 串行垃圾回收器 Serial Collector 并发式垃圾回收器 7种经典垃圾回收器 Serial 收集器 ParNew收集器 Parallel Scavenge收集器 CMS收集器 G1收集器 小结 常用参数 按垃圾回收器的线程数
  • [管理与领导-74]:IT基层管理者 - 辅助技能 - 4- 职业发展规划 - 构建自己的个人品牌

    前言 一 什么是信任账户 在职场中受到信任是建立良好声誉和专业形象的基础 以下是一些可以帮助职场人受到信任的方法 诚实守信 始终保持诚实和可靠的行为 遵守诺言 履行承诺 不轻易背信弃义 专业素养 展现专业的知识和技能 以及对工作的敬业精神
  • 51单片机学习之-串口中断

    串口中断 SM2 多机通信控制位 0 数据直接进入SBUF 并同时使R1致1 T1 发送中断标志位 发送数据自动由硬件置1 并且同时执行中断程序 也必须在中断程序中写0 RI 接收中断标志位 收到数据自动由硬件置1 并且同时执行中断程序 也
  • 全球与中国注塑磁体市场竞争策略分析及投资前景研究报告2021-2027年版

    全球与中国注塑磁体市场竞争策略分析及投资前景研究报告2021 2027年版 2020年 全球注塑磁体市场规模达到了 亿元 预计2027年将达到 亿元 年复合增长率 CAGR 为 本报告研究全球与中国市场注塑磁体的产能 产量 销量 销售额 价
  • 华为云云服务器评测|前端开发同学的初体验部署贪吃蛇!

    文章目录 前言 初配置 初始化宝塔面板 安装Nginx 上传项目 修改nginx配置 效果展示 前言 作为一名前端同学 我的技能和日常工作主要集中在用户界面的设计和交互上 与服务器产品相关的经验相对较少 正好看到了咱们华为云开展的评测活动
  • pta冒泡排序c语言_PTA 冒泡排序

    编程实现冒泡排序函数 void bubbleSort int arr int n 其中arr存放待排序的数据 n为数组长度 1 n 1000 函数接口定义如下 对长度为n的数组arr执行冒泡排序 void bubbleSort int ar
  • 统计学习:ANOVA(方差分析)(1)

    统计学习 最近在处理数据的过程中 越发发觉自己理论知识的薄弱 因此 开始了这一系列的帖子 记录自己的理论补充过程 同时方便后来人理解 ANOVA 方差分析 方差分析 Analysis of Variance 简称ANOVA 又称 变异数分析
  • 思科配置STP生成树协议

    第一步 给所有接口开trunk 第二步 左边的三层交换机和右边的三层交换机建立vlan 并给上IP地址 第三步 下面的二层交换机创建两个vlan 第四步 左边三层交换机配置 spanning tree mode pvst 开启vlan快速生
  • netty实现websocket推送消息

    前言 由于http协议为应答模式的连接 无法保持长连接于是引入了websocket套接字长连接概念 能够保持数据持久性的交互 本篇文章将告知读者如何使用netty实现简单的消息推送功能 websocket请求头 GET HTTP 1 1 H