解决websocket集群的session共享问题

2023-12-05

在websocket中,服务端主要使用的是session打交道,但是由于session无法实现序列化,不能存储到redis这些中间存储里面,因此这里我们只能把session存储在本地的内存中,那么如果是集群的话,我们如何实现session准确的发送消息呢,其实就是session共享。在websocket中,其实是无法做到session共享的,目前通用的解决方案都是通过消息中间件,实现消息的发布与订阅,也就是每一个服务端实例都订阅某个消息队列的topic,根据对应的sessionid来判断是否在本地存储,如果在本地通过sessionid找到了session,则给客户端发送消息,如果在本地找不到对应的session,那么就直接把这条消息丢弃掉。具体的如下图所示:

这里的图来自于网上,网上大多都是基于redis做发布与订阅,在真实的环境中,我们一般用kafka或者rocketmq等。根据上面的图示,我们介绍下整个流程:

1、我们同时有A,B,C,D四个websocket服务端,同时订阅消息队列的topic: test8
2、我们发送一条消息a1到消息队列的topic:test8
3、此时A,B,C,D四个websocket服务端都会收到这条消息a1
4、A根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
5、B根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
6、C根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。
7、D根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,结果找到有对应的session,此时我们就把这条消息发送给=这个session。
8、客户端就收到了对应的消息。

一、创建一个公共的map,用来存放session

package com.websocket.utils;

import java.util.concurrent.ConcurrentHashMap;

import javax.websocket.Session;

import org.springframework.stereotype.Component;

@Component
public class OnlineSessionCache {

	private ConcurrentHashMap<Integer, Session> onlines = new ConcurrentHashMap<Integer, Session>();

	public void setUserSession(Integer userId, Session session) {
		onlines.put(userId, session);
	}

	public Session getUserSession(Integer userId) {
		return onlines.get(userId);
	}

	public void removeUserSession(Integer userId) {
		onlines.remove(userId);
	}
	
	public ConcurrentHashMap<Integer, Session> getAllSession() {
		return this.onlines;
	}

}

二、在websocket连接和关闭的时候,把session关闭掉

@OnOpen
	public void onOpen(Session session,EndpointConfig config) {

		this.session = session;
		log.info("当前session id : {}  登录进来了", session.getId());
		OnlineCalUtils.addOnlineCount();
		onlineSessionCache.setUserSession(Integer.valueOf(session.getId()), session);
		log.info("存储session了多少个session:{}", onlineSessionCache.getAllSession().size());
		log.info("有新连接加入!当前在线人数为 :{} ", getOnlineCount());
	}
@OnClose
	public void onClose() {
		OnlineCalUtils.subOnlineCount();
		log.info("有一连接关闭!当前在线人数为: {}", getOnlineCount());
		onlineSessionCache.removeUserSession(Integer.valueOf(this.session.getId()));
		log.info("当前session id : {}  退出去了");
	}

三、编写一个接口,用来给指定的用户发送消息

package com.websocket.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.websocket.model.ChatModel;
import com.websocket.producer.RocketProducer;
import com.websocket.utils.ChatModelUtils;

import lombok.extern.slf4j.Slf4j;

@RestController
@Slf4j
public class ChatMsgController {

	@Autowired
	private RocketProducer rocketProducer;
	
	@RequestMapping("/sendToSimpleUser")
	public String sendToSimpleUser(Integer fromUserId,Integer toUserId) {
		
		ChatModel model = ChatModelUtils.createNewChatModel(fromUserId, toUserId, "手动发送消息");
		rocketProducer.sendDirectMessage(model);
		
		return "成功";
	}
	
	
}

这里我们是把消息直接发送给了rocketmq里面,发送者代码如下:

package com.websocket.producer;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.websocket.model.ChatModel;

@Component
public class RocketProducer {

	@Autowired
	private RocketMQTemplate rocketMQTemplate;
	
	public void sendDirectMessage(ChatModel message) {
		String msg = JSON.toJSONString(message);
        rocketMQTemplate.syncSend("test8", msg);
	}
	
}

四、编写消费者,获取mq的消息,并且发送消息给对应的session

package com.websocket.producer;

import javax.websocket.Session;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.websocket.model.ChatModel;
import com.websocket.product.SocketServerProduct;
import com.websocket.utils.OnlineSessionCache;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@RocketMQMessageListener(topic = "test8", consumerGroup = "${chat.group.groupname}")
public class RocketConsumer implements RocketMQListener<String>{

	@Autowired
	private OnlineSessionCache onlineSessionCache;

	@Autowired
	private SocketServerProduct socketServerProduct;
	
	@Value("${chat.group.groupname}")
	private String groupName;
	
	@Override
	public void onMessage(String message) {
		log.info("监听到的topic是:{}  groupname是:{}","test8",groupName);
		ChatModel model = JSON.parseObject(message, ChatModel.class);
		Integer userId = model.getToUserId();
		Session session = onlineSessionCache.getUserSession(userId);
		if (null != session) {
			log.info("找到了对应的session,准备回复消息");
			socketServerProduct.sendMessage(session, model.getMessage());
		}else {
			log.info("没有找到对应的session,准备丢弃");
		}
	}
}

以上就是一个完整的关于websocket服务端集群关于session共享的解决方案。

WebSocket服务端数据推送及心跳机制(Spring Boot + VUE)_websocket心跳机制-CSDN博客

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

解决websocket集群的session共享问题 的相关文章

  • 使用 socket.io 处理超大消息

    我有一个 nodejs 项目 它生成多个与套接字 io 通信的进程 该进程既发送数据又接收数据 有时在功能开发过程中 其他程序员可能会犯错误 导致我的套接字基础结构代码发送超过大小 X 例如 超过 500MB 的大消息 我正在寻找一种方法来
  • 调用 .disconnect() 后如何重新连接

    问题 发布手册后如何重新连接客户端到服务器 disconnect 在我当前的项目中 当用户从会话注销时 我需要断开客户端与服务器的连接 我做了一个socket disconnect 才能成功断开连接 服务器从会话中删除了用户 一段时间后 用
  • NodeJS 中的客户端 websocket 证书

    我有一个 NodeJS websocket 客户端应用程序 使用ws https www npmjs com package ws https www npmjs com package ws 这个 NodeJS 应用程序作为客户端连接到
  • 如何使用Lua脚本语言打开Web套接字?

    作为初学者 我想在基于 Linux 的服务器上使用 Lua 打开一个 Web 套接字 该服务器应允许 Android 客户端连接到它 你能给我一些用Lua打开网络套接字的示例代码吗 您两周前已经问过同样的问题并得到了回答 LUA 脚本 We
  • 如何将数据从 sinatra 应用程序中的类传递到 websocket-rack?

    我在 sinatra 应用程序中有一个 websocket rack 的工作配置 旨在用于具有多个屏幕的物理安装 有一些功能可以正常工作 消息可以通过 websocket 来回传递 我的问题是这样的 我有一个带有标准 Web 表单 即不是
  • Node.js ws 包上的正确错误处理

    我正在努力将基于 REST 的数据管道替换为基于 Websocket 的数据管道 但我无法找到所有可能出错的地方 该系统是生产系统 因此如果出现故障并且无法恢复 将会发生非常糟糕的情况 这是我到目前为止所得到的 客户端 let server
  • 无法获取 ProxyPass 用户 IP 地址

    脚本语言 websocket new WebSocket wss site com game play PHP socket socket create AF INET SOCK STREAM SOL TCP size socket rec
  • 什么是 Ruby on Rails Action 电缆适配器?

    通过 RoR 动作电缆导轨查看http edgeguides rubyonrails org action cable overview html subscription adapter http edgeguides rubyonrai
  • 目前可用于 python3 上带有 Flask 的 websocket 的最佳选择[关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 目前我看到以下库可用于将 websockets 与 Flask 一起使用 烧瓶插座 https gith
  • Websocket、Angular 2 和 JSON Web 令牌身份验证

    我的 Angular 2 应用程序 用打字稿编码 有一个简单的身份验证方案 用户登录 服务器返回 JSON Web 令牌 JWT abc123 在每次 API 调用时 应用程序都会将 JWT 发送到Authorization header
  • 使用来自 WebSocket @ServerEndpoint 的 CDI @SessionScoped bean

    在 Web 应用程序中 用户使用 servlet HTTP 会话 一些数据存储在 CDI SessionScoped beans 中 稍后在某些页面中 WebSocket 通信是在用户浏览器和服务器之间执行的 对于 GlassFish 4
  • 我可以在浏览器中启动 socket.io/websocket 服务器吗?

    之前有人问过这个问题 答案是否定的 但是现在 有了 browserify webpack 我可以像在服务器上那样编写代码吗 它会在浏览器中运行 还是有任何限制使这变得不可能 你不能 在浏览器中启动服务器需要访问浏览器中根本不存在的低级功能
  • Play框架2.5.0 Websockets示例[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 播放框架 2 5 0 Websockets 示例 在 play 2 5 0 websockets 代码
  • Websocket-rails 不适用于 Nginx 和 Unicorn 的生产环境

    我有 Rails 3 2 应用程序和 gem websocket rails 0 7 在开发机器上 一切正常 在生产环境中 我使用 Nginx 1 6 作为代理服务器 使用 Unicorn 作为 http 服务器 Thin 用于独立模式 如
  • websocket握手问题

    我正在使用 python 实现一个简单的 websocket 服务器 我使用的握手来自 握手本身似乎有效 但是当我点击发送时 我收到一个 JavaScript 错误 未捕获的错误 INVALID STATE ERR DOM 异常 11 这是
  • ServerEndpoint 和 web.xml

    我有一些 Soap REST servlet 现在还有一个 WebSocket ServerEndpoint game public class WebSocketgame 我有下一个麻烦 如果 web xml 存在 WebSocket 不
  • WebSocket 和负载平衡是瓶颈吗?

    当有一堆充当 WebSocket 无人机的系统和这些无人机前面的负载均衡器时 当 WebSocket 请求进入 LB 时 它会选择一个 WebSocket 无人机 并建立 WebSocket 我在 ELB 上使用 AWS ELB tcp S
  • 通过 Websockets 进行 WebRTC 视频聊天

    我正在尝试使用 webRTC 和 WebSockets 进行信号发送来开发视频聊天应用程序 我的问题是 我不知道创建 RTCPeerConnection 并通过 webSocket 连接两个对等点 2 个浏览器 的过程是什么 至少在本地 我
  • 使用 WebSocket 是否会产生服务器成本?

    我已经离开了 PHP MySQL 的舒适区 因为语法 封装 过程的东西可能会让人沮丧 上周 我开始尝试并按照一些教程使用 Node js Socket IO 创建实时聊天应用程序 到目前为止 我从未使用过 WebSockets 做过任何事情
  • 关闭旧的 php websocket

    我在用PHP Websockets https github com ghedipunk PHP Websockets创建一个简单的聊天服务器 当我第一次运行在我的服务器上创建 websocket 的 php 脚本时 一切正常 如果脚本由于

随机推荐