本来想用websocket做一个消息推送 可是分布式环境下不支持session共享因为服务器不同
所以采用 rabbitMQ+webSocket实现分布式消息推送
生产者将消息 发送给 rabbitMQ 的 virtual-host:/(顶极路由) 再由它路由到交换机 最终由交换机通过路由键指定具体的管道
消费者监听指定的管道获取消息
最终将获取的消息 交给 webSocket 被@OnMessage注解标识的方法
每次消费一条消息交给 被@OnMessage注解标识的方法 返回给前台
实现分布式实时推送
1.配置rabbitMQ
消息生产者
1.1pom.xml
1
2
3 org.springframework.boot
4 spring-boot-starter-amqp
5
1 server:2 port: 50023
4 spring:5 rabbitmq:6 host: localhost7 #账号密码 默认有的8 username: guest9 password: guest10 #rbbitmq虚拟主机路径11 virtual-host: /12 #rabbitmq的端口号 也是默认的13 port: 5672
1 @SpringBootApplication2 @MapperScan(basePackages = "com.supplychain.dao")3 @EnableRabbit/**开启rabbitmq*/
4 public classThumbsupServer5002_App {5
6 public static voidmain(String[]args){7
8 SpringApplication.run(ThumbsupServer5002_App.class,args);9
10 }11
12 /**消息的转换器13 * 设置成json 并放入到Spring中14 **/
15 @Bean16 publicMessageConverter messageConverter(){17
18 return newJackson2JsonMessageConverter();19
20 }21 }
测试发送消息
1 @RunWith(SpringRunner.class)2 @SpringBootTest3 public classThumbsupServer5002_AppTest {4
5
6 @Autowired7 privateRabbitTemplate rabbitTemplate;8
9 @Test10 public voidcontextLoads() {11
12 UserTest userTest = new UserTest("hao", "651238730@qq.com");13
14 /**1.指定发送的交换机15 * 发送的消息会先发送给 virtual-host: /(顶级路由) 再由它到交换机16 * 由交换机通过路由键指定给具体的管道17 *18 * 2.路由键19 * 有的交换机需要路由键 有的不需要(发送给交换机的消息会被发送给所有管道)20 *21 * 3.发送的消息22 * 如果是对象的话必须实现序列化接口因为网络传输只能传二进制23 *24 **/
25 rabbitTemplate.convertAndSend("userTest-exchange", "userTest-key", userTest);26 }27
28 }
2.消息消费者
同样是pom.xml需要引入rabbitMQ依赖
1
2
3 org.springframework.boot
4 spring-boot-starter-amqp
5
同样需要配置application.yml
1 spring:2 rabbitmq:3 host: 127.0.0.1
4 #账号密码 默认有的5 username: guest6 password: guest7 #rbbitmq虚拟主机路径8 virtual-host: /
9 #rabbitmq的端口号 也是默认的10 port: 5672
11 listener:12 simple:13 acknowledge-mode: manual #手动接受数据14 #max-concurrency: 10#最大并发15 #prefetch: 1 #限流
同样主启动类中需要开启RabbitMQ
1 @SpringBootApplication2 @EnableRabbit3 public classMessageServer5003_App {4
5 public static voidmain(String[]args){6
7 SpringApplication.run(MessageServer5003_App.class,args);8
9 }10
11 /**这里也需要设置消息转换类型12 * 和发送的消息类型一定要对应13 * 不然对象接受json启动主程序类时就会报错14 **/
15 @Bean16 publicMessageConverter messageConverter(){17
18 return newJackson2JsonMessageConverter();19
20 }21
22 }
下面到了整合的环节了
1 @ServerEndpoint(value = "/websocket")2 @Component3 public classWebSocketServer {4
5 //静态变量 用于记录当前在线连接数 应该把它设计成线程安全
6 private static int onlineCount=0;7
8 /**Concurrent包下的 写时复制Set 用它作于存储客户端对应的MyWebSocket对象*/
9 private static CopyOnWriteArraySet webSocketSet= new CopyOnWriteArraySet();10
11
12 /**与某个客户端的链接会话,需要通过它来给客户端发送数据*/
13
14 privateSession session;15 /**
16 * 参数1:Message 可以获得消息的内容字节 还可以获得消息的其他属性17 * 参数2:可以写确定接受的参数类型比如User18 * 参数3:Channel 通道19 * com.rabbitmq.client.Channel必须是这个包下20 * 通过这个参数可以拒绝消息21 * 让rabbitmq再发给别的消费者22 *23 * 使用@RabbitListener 可以绑定交换机 路由键 管道24 *25 */
26 @RabbitListener(bindings =@QueueBinding(27 value = @Queue(value = "userTest-queue",durable = "true"),28 exchange = @Exchange(name = "userTest-exchange",durable = "true",type = "direct"),29 key = "userTest-key"
30 )31 )32 @RabbitHandler//注解意思:如果有消息过来 需要消费的时候才会调用该方法
33 /**如果已知传递的参数是 UserTest对象可以通过该注解34 * 消息头需要用map接受35 * 既然是手动接受消息 就需要设置channel36 **/
37 public void receiveUserMessage(@Payload UserTest userTest, @Headers Map headers, Channel channel) throwsIOException {38 //sendMessage(message.toString());
39 System.out.println("UserTest对象"+userTest);40 onMessage(userTest.toString());//调用消息方法将数据船体给他41
42 Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);43 //手动接受并告诉rabbitmq消息已经接受了 deliverTag记录接受消息 false不批量接受
44 channel.basicAck(deliveryTag,false);45
46 /**
47 * basicReject()48 * 参数1: 消息标签49 * 参数2: true 将消息从新放入队列 false 接受到并将消息抛弃50 *51 *52 try {53 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);54 System.out.println(message);55 } catch (IOException e) {56 e.printStackTrace();57 }58 */
59
60 }61
62 /**服务器端推送消息*/
63 public voidsendMessage(String message){64 try{65 System.out.println("session可否显示出来"+session);66 this.session.getBasicRemote().sendText(message);67 } catch(IOException e) {68 e.printStackTrace();69 }70 }71
72 /**
73 * 连接建立成功调用的方法74 **/
75 @OnOpen76 public voidonOpen(Session session){77 this.session=session;78 webSocketSet.add(this);79 System.out.println("有新的连接加入!当前在线人数为"+getOnlineCount());80 System.out.println(session);81 }82
83 /**
84 * 连接关闭调用的方法85 **/
86 @OnClose87 public voidonClose(){88 /**从安全Set中 移除当前连接对象*/
89 webSocketSet.remove(this);90 subOnlineCount();91 System.out.println("有一连接关闭!当前在线人数为"+getOnlineCount());92 }93
94
95
96 @OnMessage97 public voidonMessage(String message){98
99 System.out.println("来自客户端的消息:"+message);100
101 for(WebSocketServer webSocketServer:webSocketSet){102 webSocketServer.sendMessage(message);103 }104
105 }106
107
108 public static intgetOnlineCount() {109 returnonlineCount;110 }111
112 public static synchronized voidaddOnlineCount() {113 WebSocketServer.onlineCount++;114 }115
116 public static synchronized voidsubOnlineCount() {117 WebSocketServer.onlineCount--;118 }119
120
121
122 }
websocket前端
websocket是html5提出的协议属于双工通信 前端发送一次请求告诉服务器需要将http协议升级成tcp长连接
后面服务端直接给前端推送消息就可以了 从以前的一次请求一次响应 服务端被动式 变成 一次请求服务端可以无限响应
1
2 varsocket;3 console.log(typeofsocket)4 if(typeof(WebSocket)=="undefined"){5 alert("您的浏览器不支持WebSocket");6 }else{7 alert("您的浏览器支持WebSocket");8
9 socket=newWebSocket("ws://localhost:5003/websocket");10
11 socket.onopen=function() {12 console.log("Socket 已打开");13 };14
15 //获得消息事件
16 socket.onmessage= function(msg) {17 console.log(msg.data);18 //发现消息进入 调后台获取
19 //getCallingList();
20 };21
22 //关闭事件
23 socket.onclose= function() {24 console.log("Socket已关闭");25 };26 //发生了错误事件
27 socket.οnerrοr= function() {28 alert("Socket发生了错误");29 };30 /**31 $(window).unload(function(){32 socket.close();33 });34 */
35 }36