SpringBoot+WebSocket+Netty实现消息推送

2023-11-06

实现思路

  1. 前端使用webSocket与服务端创建连接的时候,将用户ID传给服务端
  2. 服务端将用户ID与channel关联起来存储,同时将channel放入到channel组中
  3. 如果需要给所有用户发送消息,直接执行channel组的writeAndFlush()方法
  4. 如果需要给指定用户发送消息,根据用户ID查询到对于的channel,然后执行writeAndFlush()方法
  5. 前端获取到服务端推送的消息之后,将消息内容展示到文本域中

实现步骤

一、引入依赖
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>
<dependency>
   <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.2.3</version>
</dependency>
二、编写NettyConfig

定义一个channel组,管理所有的channel,再定义一个map,管理用户与channel的对应关系

public class NettyConfig {

    /**
     * 定义一个channel组,管理所有的channel
     * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 存放用户与channel的对应消息,用于给指定用户发送消息
     */
    private static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();

    private NettyConfig() {}

    /**
     * 获取channel组
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }

    /**
     * 获取用户 channel map
     */
    public static ConcurrentHashMap<String, Channel> getUserChannelMap() {
        return userChannelMap;
    }
}
三、具体实现业务的WebSocketHandler
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    // 一旦连接,第一个被执行
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        logger.info("handlerAdded 被调用: " + ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    // 读取数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
        logger.info("服务器收到消息:{}", msg.text());
        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        NettyConfig.getUserChannelMap().put(uid, ctx.channel());
        // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
        AttributeKey<Object> key = AttributeKey.valueOf("userId");
        ctx.channel().attr(key).setIfAbsent(uid);
        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器连接成功"));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        logger.info("handlerRemoved 被调用: " + ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.info("异常: {}", cause.getMessage());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    // 删除用户与channel的对应关系
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getUserChannelMap().remove(userId);
    }

}

@Sharable注解用来说明ChannelHandler是否可以在多个channel直接共享使用

四、创建NettyServer

定义两个EventLoopGroup,BossGroup辅助客户端的TCP连接请求,workGroup负责与客户端之前的读写操作,需要说明的是,需要开启一个新的线程来执行NettyServer,要不然会阻塞主线程,到时候就无法调用项目的其他controller接口了

@Component
@RequiredArgsConstructor
public class NettyServer {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    // webSocket协议名
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * 端口号
     */
    @Value("${webSocket.netty.port:58080}")
    private int port;

    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path:/webSocket}")
    private String webSocketPath;

    private final WebSocketHandler webSocketHandler;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;

    // 启动
    @SneakyThrows
    private void start() {
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        // bossGroup辅助客户端的tcp连接请求,workGroup负责与客户端之前的读写操作
        bootstrap.group(bossGroup, workGroup);
        // 设置NIO类型的channel
        bootstrap.channel(NioServerSocketChannel.class);
        // 设置监听端口
        bootstrap.localAddress(new InetSocketAddress(port));
        // 连接达到时会创建一个通道
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                // 流水线管理通道中的处理程序(Handle),用来处理业务
                // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
                ch.pipeline().addLast(new HttpServerCodec());
                ch.pipeline().addLast(new ObjectEncoder());
                // 以块的方式来写的处理器
                ch.pipeline().addLast(new ChunkedWriteHandler());
                /*
                  1.http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合
                  2.这就是为什么,当浏览器发送大量数据时,就会发送多次http请求
                 */
                ch.pipeline().addLast(new HttpObjectAggregator(8192));
                /*
                  1.对应websocket,它的数据是以帧(frame)的形式传递
                  2.浏览器请求时 ws://localhost:58080/xxx 表示请求的url
                  3.核心功能是将http协议升级为ws协议,保持长连接
                 */
                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, Boolean.TRUE, 65536 * 10));
                // 自定义handler,处理业务逻辑
                ch.pipeline().addLast(webSocketHandler);
            }
        });
        // 配置完成,开始绑定server,通过调用sync同步方法阻塞知道绑定成功
        ChannelFuture channelFuture = bootstrap.bind().sync();
        logger.info("Server started and listen on: {}", channelFuture.channel().localAddress());
        // 对关闭通道进行监听
        channelFuture.channel().closeFuture().sync();
    }

    @SneakyThrows
    @PreDestroy
    public void destroy() {
        if (Objects.nonNull(bossGroup)) {
            bossGroup.shutdownGracefully().sync();
        }
        if (Objects.nonNull(workGroup)) {
            workGroup.shutdownGracefully().sync();
        }
    }

    @PostConstruct
    public void init() {
        // 需要开启一个新的线程来执行Netty server 服务器
        new Thread(this::start).start();
    }
}
五、编写前端页面
<!DOCTYPE html>
<html lang="en">
<head>
	<meta charset="UTF-8">
	<title>Title</title>
</head>
<body>
<script>
    let socket;
    // 判断当前浏览器是否支持webSocket
    if (window.WebSocket) {
        socket = new WebSocket("ws://localhost:58080/webSocket")
        // 相当于channel的read事件,ev 收到服务器回送的消息
        socket.onmessage = function (ev) {
            let rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + ev.data;
        }
        // 相当于连接开启
        socket.onopen = function (ev) {
            let rt = document.getElementById("responseText");
            rt.value = "连接开启了..."
            socket.send(
                JSON.stringify({
                    // 连接成功将,用户ID传给服务端
                    uid: "123456"
                })
            );
        }
        // 相当于连接关闭
        socket.onclose = function (ev) {
            let rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + "连接关闭了...";
        }
    } else {
        alert("当前浏览器不支持webSocket")
    }
</script>
<form onsubmit="return false">
	<textarea id="responseText" style="height: 150px; width: 300px;"></textarea>
	<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>

测试

·1. 启动服务端
在这里插入图片描述
2. 启动客户端

客户端启动之后,服务端就会收到消息,且会返回消息
在这里插入图片描述
在这里插入图片描述

  1. 使用postman测试发送消息
    在这里插入图片描述
    在这里插入图片描述
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot+WebSocket+Netty实现消息推送 的相关文章

  • Spring boot 2+日志详细日志不起作用,配合Logback、Hibernate + Weblogic

    I use 甲骨文11 x 春季启动 2 x maven weblogic 作为外部服务器 入口点 SpringBootConfiguration SpringBootApplication public class WebSpringBo
  • 在 String 值之后打印 int 值

    我有以下示例代码 int pay 80 int bonus 65 System out println pay bonus bonus pay 有人可以向我解释一下为什么我得到以下输出 145 6580 您的代码正在从左到右解释表达式 pa
  • 了解 netty 通道缓冲区和水印

    我正在尝试了解网络缓冲区和水印 作为一个测试用例 我有一个 netty 服务器 它向客户端写入数据 客户端被阻止 基本上每次读取之间有 10 秒的睡眠时间 在正常 I O 下 如果接收方被阻塞 TCP 发送方将受到限制 由于流量控制 发送速
  • Hashset - 创建 Set 后使对象相同

    如果我们在 HashSet 中添加两个不同的对象 可变的 然后通过调用 setter 更改对象的值 使它们相同 则大小仍然是 hashSet 的 2 我无法理解其原因 public static void main String args
  • Java 小程序在 Mac 上闪烁

    这个问题很奇怪 问题并非在每个平台上都会发生 我在使用 MacOSX 的 Google Chrome 中出现了这种情况 但在 Safari 中却没有出现这种情况 对于使用 Windows 的朋友来说 在 Google Chrome 上运行得
  • 在 Eclipse 3.5 上安装旧版 TestNG 插件时出现问题

    我正在尝试在 eclipse 3 5 上安装 TestNG 5 11 并获得以下信息 eclipse buildId unknown java version 1 6 0 19 java vendor Sun Microsystems In
  • 使用全局变量从内部函数获取空字符串

    请帮助我解决一些小问题 我确信你能做到 D 我试图在 firestore 文档 user cases information 上设置一个字段 其中包含一个字段 case number 首先我声明这个全局变量 private String c
  • Maven WebApp META-INF context.xml

    我正在使用 Maven 3 并且尝试在 webapp 文件夹下添加 META INF 文件夹 所以我正在尝试执行以下操作 src main webapp META INF context xml WEB INF 下面是我的 POM 文件
  • 使用 Jena 查询维基数据

    目前 Wikidata 有一个 SPARQL 端点 https query wikidata org https query wikidata org 我想使用 Jena 3 0 1 查询此网站 我使用以下代码 但收到错误消息 端点返回的
  • 在拇指上方显示修改后的 JSlider 值

    有没有一种简单的方法可以在使用某些 外观和感觉 的同时更改 JSlider 上方标签中显示的值 为了清楚起见 我正在谈论这个值 具体来说 我想显示除以 1000 的值而不是值本身 我知道如果我显示它们 我可以为刻度设置标签 但用户将不得不猜
  • 如何自动转换十六进制代码以将其用作 Java 中的 byte[]?

    我这里有很多十六进制代码 我想将它们放入 Java 中 而不需要向每个实体附加 0x 喜欢 0102FFAB 和我必须执行以下操作 byte test 0x01 0x02 0xFF 0xAB 我有很多很长的十六进制代码 有什么办法可以自动做
  • for循环中更新JLabel的问题

    我的程序的想法是从之前在其他 JFrame 中保存的列表中选择一个名称 我想在标签中一个接一个地打印所有名称 它们之间有很小的延迟 然后停在其中一个名称上 问题是lbl setText String 如果有多个则不起作用setText co
  • 如何在 Eclipse 中获得完全限定的类名?

    有没有一种快速方法可以在 Eclipse 中单击 Java 类并获取其完全限定名称 或将其复制到剪贴板 2016年6月29日编辑 正如 Jeff 所指出的 您只需要执行以下第二步 1 Double click on the class na
  • 膨胀类 android.support.design.widget.NavigationView 时出错

    我按照 NavigationView 的教程进行操作 但无法解决此错误消息 Error inflating class android support design widget NavigationView 教程链接 https www
  • 避免 @Secured 注释的重复值

    我正在尝试使用以下方法来保护我的服务方法 Secured如下 public interface IUserService Secured ROLE ROLE1 ROLE ROLE2 ResponseEntity saveUser Creat
  • Janusgraph 0.3.2 + HBase 1.4.9 - 无法设置 graph.timestamps

    我在 Docker 容器中运行 Janusgraph 0 3 2 并尝试使用运行 HBase 1 4 9 的 AWS EMR 集群作为存储后端 我可以运行 gremlin server sh 但如果我尝试保存某些内容 我会得到粘贴在下面的堆
  • Java 8 方法签名不一致

    Java 8 为我们提供了具有很长签名的新方法 如下所示 static
  • Java 中序列化的目的是什么?

    我读过很多关于序列化的文章 以及它如何如此美好和伟大 但没有一个论点足够令人信服 我想知道是否有人能真正告诉我通过序列化一个类我们真正可以实现什么 让我们先定义序列化 然后我们才能讨论它为什么如此有用 序列化只是将现有对象转换为字节数组 该
  • 如何在J2ME中获取数字的幂[重复]

    这个问题在这里已经有答案了 可能的重复 J2ME power double double 数学函数实现 https stackoverflow com questions 2076913 j2me powerdouble double ma
  • Unicode(希腊语)字符存储在数据库中,例如“??????”

    数据库中的希腊字符就像问号 我找不到解决办法 我使用 Java Swing 开发了一个应用程序 但是当我在 MySQL 中插入希腊字母时 就像问号一样 我将数据库排序规则更改为 utf8 并将列也更改为 utf8 我的项目编码设置为UTF

随机推荐

  • 响应式网页设计(Responsive Web Design)的核心原理

    聚沙成塔 每天进步一点点 专栏简介 响应式网页设计的核心原理 优点和缺点 优点 缺点 写在最后 专栏简介 前端入门之旅 探索Web开发的奇妙世界 欢迎来到前端入门之旅 感兴趣的可以订阅本专栏哦 这个专栏是为那些对Web开发感兴趣 刚刚踏入前
  • CVE-2022-26134 Confluence OGNL RCE 复现

    一 漏洞概述 Atlassian Confluence 是一款各企业广泛使用的 wiki 系统 在Atlassian Confluence Server and Data Center上存在OGNL 注入漏洞 远程攻击者在未经身份验证的情况
  • Servlet之间传递数据

    转自 http jallay iteye com blog 256004 1 如何让用户的请求数据从一个Servlet传递给另一个Servlet 第一种方式 通过超链接传递数据 第二种方式 通过表传递取参数 第三种方式 通过setAttri
  • 『数据结构』B树(B-Tree)及其变体 B+树,B*树

    原文地址 1 背景 当有大量数据储存在磁盘时 如数据库的查找 插入 删除等操作的实现 如果要读取或者写入 磁盘的寻道 旋转时间很长 远大于在 内存中的读取 写入时间 平时用的二叉排序树搜索元素的时间复杂度虽然是 O log2n O l o
  • BBR拥塞算法的简单解释

    TCP BBR的ACM论文中 开篇就引入了图1 以此来说明BBR算法的切入点 为何当前基于丢包探测的TCP拥塞控制算法还有优化空间 BBR算法的优化极限在哪儿 图1 为了理解这张图花了我整整一个晚上的时间 它使我重新审视了所有基础概念 而我
  • vue2.js初探

    今天学习了一下vue2 js 感觉很好用 一个是把相同的功能组件化了 把他定义一个标签 不用多次开发重复的代码 直接加标签就可以了 还有就是他把数据和标签的显示修改完全分开了 之前用jQuery开发 如果数据变动了 需要用jquery回调事
  • 计算机网络第八版——第一章课后题答案(超详细)

    第一章 该答案为博主在网络上整理 排版不易 希望大家多多点赞支持 后续将会持续更新 可以给博主点个关注 第二章 答案 1 01 计算机网络可以向用户提供哪些服务 解答 这道题没有现成的标准答案 因为可以从不同的角度来看 服务 首先要明确的是
  • ThreadX 内部系统时钟服务

    ThreadX中 有两个函数可以获取和设置内部系统时钟服务 tx time get 获取当前时间 tx time set 设置当前时间 tx time get 获取当前时间 原型 ULONG tx time get VOID 描述 这项服务
  • VUE安装问题

    启动应用 npm run serve 默认进入为 http localhost 8080 由于部署在虚拟化linux上 需远程访问 需将localhost修改为服务器IP 1 修改package json 新增host 0 0 0 0 2
  • 【Flutter 系列——1】Flutter环境搭建及配置这一篇就够了(Windows)

    最近正式入坑Flutter 首先从环境搭建开始 看了网上好多关于Windows环境搭建的资料 基本都是按官方文档写的 看完的感受是 还不如直接去看官方文档 官方英文文档传送门 Get Started Install on Windows 本
  • 数据要素流通视角下数据安全保障研究报告

    报告围绕数据要素流通视角下流通数据 流通活动 流通设施的安全需求 分析健全我国数据安全保障体系的推进思路 并从分类分级 流通环境 安全技术 协同共治等方面提出措施建议 为完善我国数据要素流通视角下数据安全保障提供有益参考与借鉴 关注公众号
  • WinCE5.0显卡驱动修改笔记

    WinCE5 0显卡驱动修改笔记公司前段时间让我在Geode上安装一个CE5 0 我把系统安装好之后发现显卡驱动不支持开发板的屏幕 我们的屏幕是800x480的 所以我只能自己动手写修改了一下驱动让它能够支持800x480 一下是我对驱动的
  • python报错code for hash md5 was not found解决方案

    因为开发机服务器不能上网 只能手动安装Python 但是装完后import hashlib出现异常 出现不支持sha256 sha512 md5等错误 现象如下 gt gt gt import hashlib ERROR root code
  • 排序算法之时间复杂度为O(N^2)的算法

    背景知识 排序算法算是比较基础的算法了 但是在面试过程中偶尔也会被问到 虽然很多语言都内置了排序函数 例如php的sort函数等等 但是还是有必要聊聊排序算法 这篇文章中将介绍时间复杂度为O N 2 的几个排序算法 本文基于从小到大排序讲解
  • react面试题(30个)

    1 React Native相对于原生的ios和Android有哪些优势 react native一套代码可以开发出跨平台app 减少了人力 节省了时间 避免了 iOS 与 Android 版本发布的时间差 开发新功能可以更迅速 等等 2
  • go语言后端调用以太坊rpc

    任务要求 使用golang作为后端语言 获取eth 私链 中的账户信息以及创建新的账号 1 启动geth geth identity aaron datadir data0 rpcport 8545 rpccorsdomain port 3
  • 分布式工程团队建设的十大教训

    转自 https www zybuluo com lsmn note 1059823 摘要 人才招聘 培养并促进分布式工程团队的发展并非一日之功 但是值得投资 Bruno提出了一些非常重要的见解 揭示了如何让团队全力以赴 而不管地理位置在哪
  • 初识springBoot

    springboot初学应该了解哪些 了解更多请看Spring Boot 初识 系列 会持续更新 Spring Boot 初识丨一 入门实战 Spring Boot 初识丨二 maven Spring Boot 初识丨三 starter S
  • springcloud搭建标配配置(参考)

    文章目录 架构图 shop parent 后端父项目 pom xml shop common 公共项目 pom xml CommonConstants UserInfo 用户对象 BusinessException 自定义异常 Common
  • SpringBoot+WebSocket+Netty实现消息推送

    实现思路 前端使用webSocket与服务端创建连接的时候 将用户ID传给服务端 服务端将用户ID与channel关联起来存储 同时将channel放入到channel组中 如果需要给所有用户发送消息 直接执行channel组的writeA