分布式服务器转发消息,rabbitmq+websocket(SpringBoot版)实现分布式消息推送

2023-05-16

本来想用websocket做一个消息推送 可是分布式环境下不支持session共享因为服务器不同

所以采用 rabbitMQ+webSocket实现分布式消息推送

生产者将消息 发送给 rabbitMQ 的 virtual-host:/(顶极路由) 再由它路由到交换机 最终由交换机通过路由键指定具体的管道

消费者监听指定的管道获取消息

最终将获取的消息 交给 webSocket 被@OnMessage注解标识的方法

每次消费一条消息交给 被@OnMessage注解标识的方法 返回给前台

实现分布式实时推送

1.配置rabbitMQ

消息生产者

1.1pom.xml

1

2

3 org.springframework.boot

4 spring-boot-starter-amqp

5

1 server:2 port: 50023

4 spring:5 rabbitmq:6 host: localhost7 #账号密码 默认有的8 username: guest9 password: guest10 #rbbitmq虚拟主机路径11 virtual-host: /12 #rabbitmq的端口号 也是默认的13 port: 5672

1 @SpringBootApplication2 @MapperScan(basePackages = "com.supplychain.dao")3 @EnableRabbit/**开启rabbitmq*/

4 public classThumbsupServer5002_App {5

6 public static voidmain(String[]args){7

8 SpringApplication.run(ThumbsupServer5002_App.class,args);9

10 }11

12 /**消息的转换器13 * 设置成json 并放入到Spring中14 **/

15 @Bean16 publicMessageConverter messageConverter(){17

18 return newJackson2JsonMessageConverter();19

20 }21 }

测试发送消息

1 @RunWith(SpringRunner.class)2 @SpringBootTest3 public classThumbsupServer5002_AppTest {4

5

6 @Autowired7 privateRabbitTemplate rabbitTemplate;8

9 @Test10 public voidcontextLoads() {11

12 UserTest userTest = new UserTest("hao", "651238730@qq.com");13

14 /**1.指定发送的交换机15 * 发送的消息会先发送给 virtual-host: /(顶级路由) 再由它到交换机16 * 由交换机通过路由键指定给具体的管道17 *18 * 2.路由键19 * 有的交换机需要路由键 有的不需要(发送给交换机的消息会被发送给所有管道)20 *21 * 3.发送的消息22 * 如果是对象的话必须实现序列化接口因为网络传输只能传二进制23 *24 **/

25 rabbitTemplate.convertAndSend("userTest-exchange", "userTest-key", userTest);26 }27

28 }

2.消息消费者

同样是pom.xml需要引入rabbitMQ依赖

1

2

3 org.springframework.boot

4 spring-boot-starter-amqp

5

同样需要配置application.yml

1 spring:2 rabbitmq:3 host: 127.0.0.1

4 #账号密码 默认有的5 username: guest6 password: guest7 #rbbitmq虚拟主机路径8 virtual-host: /

9 #rabbitmq的端口号 也是默认的10 port: 5672

11 listener:12 simple:13 acknowledge-mode: manual #手动接受数据14 #max-concurrency: 10#最大并发15 #prefetch: 1 #限流

同样主启动类中需要开启RabbitMQ

1 @SpringBootApplication2 @EnableRabbit3 public classMessageServer5003_App {4

5 public static voidmain(String[]args){6

7 SpringApplication.run(MessageServer5003_App.class,args);8

9 }10

11 /**这里也需要设置消息转换类型12 * 和发送的消息类型一定要对应13 * 不然对象接受json启动主程序类时就会报错14 **/

15 @Bean16 publicMessageConverter messageConverter(){17

18 return newJackson2JsonMessageConverter();19

20 }21

22 }

下面到了整合的环节了

1 @ServerEndpoint(value = "/websocket")2 @Component3 public classWebSocketServer {4

5 //静态变量 用于记录当前在线连接数 应该把它设计成线程安全

6 private static int onlineCount=0;7

8 /**Concurrent包下的 写时复制Set 用它作于存储客户端对应的MyWebSocket对象*/

9 private static CopyOnWriteArraySet webSocketSet= new CopyOnWriteArraySet();10

11

12 /**与某个客户端的链接会话,需要通过它来给客户端发送数据*/

13

14 privateSession session;15 /**

16 * 参数1:Message 可以获得消息的内容字节 还可以获得消息的其他属性17 * 参数2:可以写确定接受的参数类型比如User18 * 参数3:Channel 通道19 * com.rabbitmq.client.Channel必须是这个包下20 * 通过这个参数可以拒绝消息21 * 让rabbitmq再发给别的消费者22 *23 * 使用@RabbitListener 可以绑定交换机 路由键 管道24 *25 */

26 @RabbitListener(bindings =@QueueBinding(27 value = @Queue(value = "userTest-queue",durable = "true"),28 exchange = @Exchange(name = "userTest-exchange",durable = "true",type = "direct"),29 key = "userTest-key"

30 )31 )32 @RabbitHandler//注解意思:如果有消息过来 需要消费的时候才会调用该方法

33 /**如果已知传递的参数是 UserTest对象可以通过该注解34 * 消息头需要用map接受35 * 既然是手动接受消息 就需要设置channel36 **/

37 public void receiveUserMessage(@Payload UserTest userTest, @Headers Map headers, Channel channel) throwsIOException {38 //sendMessage(message.toString());

39 System.out.println("UserTest对象"+userTest);40 onMessage(userTest.toString());//调用消息方法将数据船体给他41

42 Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);43 //手动接受并告诉rabbitmq消息已经接受了 deliverTag记录接受消息 false不批量接受

44 channel.basicAck(deliveryTag,false);45

46 /**

47 * basicReject()48 * 参数1: 消息标签49 * 参数2: true 将消息从新放入队列 false 接受到并将消息抛弃50 *51 *52 try {53 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);54 System.out.println(message);55 } catch (IOException e) {56 e.printStackTrace();57 }58 */

59

60 }61

62 /**服务器端推送消息*/

63 public voidsendMessage(String message){64 try{65 System.out.println("session可否显示出来"+session);66 this.session.getBasicRemote().sendText(message);67 } catch(IOException e) {68 e.printStackTrace();69 }70 }71

72 /**

73 * 连接建立成功调用的方法74 **/

75 @OnOpen76 public voidonOpen(Session session){77 this.session=session;78 webSocketSet.add(this);79 System.out.println("有新的连接加入!当前在线人数为"+getOnlineCount());80 System.out.println(session);81 }82

83 /**

84 * 连接关闭调用的方法85 **/

86 @OnClose87 public voidonClose(){88 /**从安全Set中 移除当前连接对象*/

89 webSocketSet.remove(this);90 subOnlineCount();91 System.out.println("有一连接关闭!当前在线人数为"+getOnlineCount());92 }93

94

95

96 @OnMessage97 public voidonMessage(String message){98

99 System.out.println("来自客户端的消息:"+message);100

101 for(WebSocketServer webSocketServer:webSocketSet){102 webSocketServer.sendMessage(message);103 }104

105 }106

107

108 public static intgetOnlineCount() {109 returnonlineCount;110 }111

112 public static synchronized voidaddOnlineCount() {113 WebSocketServer.onlineCount++;114 }115

116 public static synchronized voidsubOnlineCount() {117 WebSocketServer.onlineCount--;118 }119

120

121

122 }

websocket前端

websocket是html5提出的协议属于双工通信 前端发送一次请求告诉服务器需要将http协议升级成tcp长连接

后面服务端直接给前端推送消息就可以了 从以前的一次请求一次响应 服务端被动式 变成 一次请求服务端可以无限响应

1

2 varsocket;3 console.log(typeofsocket)4 if(typeof(WebSocket)=="undefined"){5 alert("您的浏览器不支持WebSocket");6 }else{7 alert("您的浏览器支持WebSocket");8

9 socket=newWebSocket("ws://localhost:5003/websocket");10

11 socket.onopen=function() {12 console.log("Socket 已打开");13 };14

15 //获得消息事件

16 socket.onmessage= function(msg) {17 console.log(msg.data);18 //发现消息进入 调后台获取

19 //getCallingList();

20 };21

22 //关闭事件

23 socket.onclose= function() {24 console.log("Socket已关闭");25 };26 //发生了错误事件

27 socket.οnerrοr= function() {28 alert("Socket发生了错误");29 };30 /**31 $(window).unload(function(){32 socket.close();33 });34 */

35 }36

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

分布式服务器转发消息,rabbitmq+websocket(SpringBoot版)实现分布式消息推送 的相关文章

  • 转:如何查找别人论文(计算机类文献)中实验的代码?

    最近看计算机类文献 xff0c 想看看别人论文中实验是如何做出来的 xff0c 请问如何查找别人论文中实验的代码 1 如果这论文很老 xff0c 论文里的算法在该领域有举足轻重的地位 那么网上很可能有工具包 例如我做的机器学习方向 xff0
  • Pytorch-属性统计

    引言 本篇介绍Pytorch属性统计的几种方式 统计属性 求值或位置 normmean sumprodmax min argmin argmaxkthvalue topk norm norm 与 normalize norm指的是范数 xf
  • 高性能异步爬虫

    背景 其实爬虫的本质就是client发请求批量获取server的响应数据 xff0c 如果我们有多个url待爬取 xff0c 只用一个线程且采用串行的方式执行 xff0c 那只能等待爬取一个结束后才能继续下一个 xff0c 效率会非常低 需
  • [operator]deepin 卸载自带搜狗输入法后,输入法消失

    解决这个问题我先是升级了官方的im config套件 xff0c 升级后发现并没有什么用 xff0c 然后使用以下方式 xff0c 做个记录 命令行操作 删除搜狗的残留文件 cd config rm rf SogouPY users rm
  • DPK

    一 概念 dpk文件是Delphi的包文件 xff0c 有dpk文件的组件安装比较方便 一般来说 xff0c 支持不同版本Delphi的组件会有不同的dpk文件 xff0c 一般以7结尾的dpk文件是支持Delphi 7的 如果没有支持De
  • TCP/IP协议栈概述及各层包头分析

    一 摘要 对之前几篇博文涉及到的网络通信协议进行分析 xff0c 概述出TCP IP的协议栈模型 xff0c 最后根据实例对各层包头进行分析 二 标准TCP IP协议栈模型 标准TCP IP协议是用于计算机通信的一组协议 xff0c 通常被
  • 2范数和F范数的区别

    2范数和F范数是不同的 2范数表示矩阵或向量的最大奇异值 xff0c max svd X 而 F范数表示矩阵所有元素平方和的开方根 sqrt x i j X x i j 2 转载于 https www cnblogs com yinwei
  • 网络钩子webhook

    网页开发中的网络钩子是一种通过自定义回调函数来增加或更改网页表现的方法 webhook 发布订阅模式 xff0c 与api不同的是 xff0c webhook无需发送请求即可收到监听地址发布的消息 主要用途 xff1a 更新客户端
  • free -g 说明

    free g 说明 xff1a free g 43 buffers cache 说明 xff1a buffer 写缓存 xff0c 表示脏数据写入磁盘之前缓存一段时间 xff0c 可以释放 sync命令可以把buffer强制写入硬盘 cac
  • Google Drive 里的文件下载的方法

    Google Drive 里并不提供创建直接下载链接的选项 xff0c 但是可以通过小小的更改链接形式就能把分享的内容保存到本地 例如 xff0c 一份通过 Google Drive 分享的文件链接形式为 xff1a https drive
  • 关于虚拟机VMware Tools安装中出现的无法自动安装VMCI驱动程序的问题

    问题 解决方法 根据配置文件信息找到所在的虚拟机位置 找到后缀名为vmx的文件 xff0c 右键打开方式中选择使用记事本打开 选择左上角编辑中的查找功能输入图中的查找内容后 xff0c 点击查找下一个 将其原先的TRUE值改为false即可
  • 人脸识别概念杂记

    Gabor特征 xff1a 通过Gabor变换获取的特征 Gabor变换 xff1a 是在20世纪40年代有Gabor提出的一种利用高斯函数作为窗口函数的加窗傅里叶变换 Gabor变换可以有效的获取空间和方向等视觉信息 xff0c 使得原始
  • 大麦盒子(domybox)无法进入系统解决方案!【简单几步】

    大麦无法进入系统解决方案 xff01 简单几步 前提准备 xff1a 电脑一台盒子控制台软件盒子开机并联网并且盒子和电脑处于同一个路由器下的网络 xff01 前提准备 xff1a 电脑一台盒子控制台软件盒子开机并联网并且盒子和电脑处于同一个
  • 常见开发语言擅长领域

    Python xff1a 机器学习 xff0c 数据科学还有Web开发 JavaScript xff1a Web开发 xff08 前端和后端 xff09 和游戏开发 Java xff1a 移动Android应用程序开发 xff0c 企业应用
  • H3C 维护命令

    一 xff1a 基础维护命令 1 dis version 查看版本 2 dis cu 显示实时配置 3 dis this 显示当前视图下的配置 4 dis interface 显示接口 5 dis mac address 显示mac地址表
  • ROS下利用realsense采集RGBD图像合成点云

    摘要 xff1a 在ROS kinetic下 xff0c 利用realsense D435深度相机采集校准的RGBD图片 xff0c 合成点云 xff0c 在rviz中查看点云 xff0c 最后保存成pcd文件 一 各种bug 代码编译成功
  • SQL在工作中遇到的问题

    多表查询的用法区别varchar类型的时间比大小 多表查询的用法区别 一般对于两张表的查询习惯用 select from a b where a id 61 b id 最近发现也可以使用 select from a inner join b
  • 剑指offer

    目录 第2章 面试需要的基础知识 2 3 数据结构 2 3 1 数组 xff1a 二维数组中的查找 2 3 2 字符串 xff1a 替换空格 2 3 3 链表 xff1a 从尾到头打印链表 2 3 4 树 xff1a 重建二叉树 2 3 5
  • js阻止冒泡事件发生(react)

    其实就两个函数 event MouseEvent event preventDefault span class token punctuation span span class token punctuation span span c
  • 机器学习与深度学习核心知识点总结

    来源 SIGAI 数学 1 列举常用的最优化方法 梯度下降法 牛顿法 xff0c 拟牛顿法 坐标下降法 梯度下降法的改进型如AdaDelta xff0c AdaGrad xff0c Adam xff0

随机推荐