Spring Boot中使用WebSocket [第三部分]

2023-10-31

使用消息队列实现分布式WebSocket

在上一篇文章(https://www.zifangsky.cn/1359.html)中我介绍了服务端如何给指定用户的客户端发送消息,并如何处理对方不在线的情况。在这篇文章中我们继续思考另外一个重要的问题,那就是:如果我们的项目是分布式环境,登录的用户被Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?

其实,要解决这个问题就需要实现分布式WebSocket,而分布式WebSocket一般可以通过以下两种方案来实现:

  • 方案一:将消息(<用户id,消息内容>)统一推送到一个消息队列(Redis、Kafka等)的的topic,然后每个应用节点都订阅这个topic,在接收到WebSocket消息后取出这个消息的“消息接收者的用户ID/用户名”,然后再比对自身是否存在相应用户的连接,如果存在则推送消息,否则丢弃接收到的这个消息(这个消息接收者所在的应用节点会处理)

  • 方案二:在用户建立WebSocket连接后,使用Redis缓存记录用户的WebSocket建立在哪个应用节点上,然后同样使用消息队列将消息推送到接收者所在的应用节点上面(实现上比方案一要复杂,但是网络流量会更低)

注:本篇文章的完整源码可以参考:https://github.com/zifangsky/WebSocketDemo

在下面的示例中,我将根据相对简单的方案一来是实现,具体实现方式如下:

(1)定义一个WebSocket Channel枚举类:

 

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

package cn.zifangsky.mqwebsocket.enums;

 

import org.apache.commons.lang3.StringUtils;

 

/**

* WebSocket Channel枚举类

*

* @author zifangsky

* @date 2018/10/16

* @since 1.0.0

*/

public enum WebSocketChannelEnum {

    //测试使用的简易点对点聊天

    CHAT("CHAT", "测试使用的简易点对点聊天", "/topic/reply");

 

    WebSocketChannelEnum(String code, String description, String subscribeUrl) {

        this.code = code;

        this.description = description;

        this.subscribeUrl = subscribeUrl;

    }

 

    /**

     * 唯一CODE

     */

    private String code;

    /**

     * 描述

     */

    private String description;

    /**

     * WebSocket客户端订阅的URL

     */

    private String subscribeUrl;

 

    public String getCode() {

        return code;

    }

 

    public String getDescription() {

        return description;

    }

 

    public String getSubscribeUrl() {

        return subscribeUrl;

    }

 

    /**

     * 通过CODE查找枚举类

     */

    public static WebSocketChannelEnum fromCode(String code){

        if(StringUtils.isNoneBlank(code)){

            for(WebSocketChannelEnum channelEnum : values()){

                if(channelEnum.code.equals(code)){

                    return channelEnum;

                }

            }

        }

 

        return null;

    }

 

}

 

(2)配置基于Redis的消息队列:

关于Redis实现的消息队列可以参考我之前的这篇文章:https://www.zifangsky.cn/1347.html

需要注意的是,在大中型正式项目中并不推荐使用Redis实现的消息队列,因为经过测试它并不是特别可靠,所以应该考虑使用KafkarabbitMQ等专业的消息队列中间件(PS:据说Redis 5.0全新的数据结构Streams极大增强了Redis的消息队列功能?)

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

package cn.zifangsky.mqwebsocket.config;

 

import cn.zifangsky.mqwebsocket.mq.MessageReceiver;

import com.fasterxml.jackson.annotation.JsonAutoDetect;

import com.fasterxml.jackson.annotation.PropertyAccessor;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.connection.RedisClusterConfiguration;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.listener.PatternTopic;

import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

import org.springframework.data.redis.serializer.StringRedisSerializer;

import redis.clients.jedis.JedisCluster;

import redis.clients.jedis.JedisPoolConfig;

 

import java.util.Arrays;

 

/**

* Redis相关配置

*

* @author zifangsky

* @date 2018/7/30

* @since 1.0.0

*/

@Configuration

@ConditionalOnClass({JedisCluster.class})

public class RedisConfig {

 

    @Value("${spring.redis.timeout}")

    private String timeOut;

 

    @Value("${spring.redis.cluster.nodes}")

    private String nodes;

 

    @Value("${spring.redis.cluster.max-redirects}")

    private int maxRedirects;

 

    @Value("${spring.redis.jedis.pool.max-active}")

    private int maxActive;

 

    @Value("${spring.redis.jedis.pool.max-wait}")

    private int maxWait;

 

    @Value("${spring.redis.jedis.pool.max-idle}")

    private int maxIdle;

 

    @Value("${spring.redis.jedis.pool.min-idle}")

    private int minIdle;

 

    @Value("${spring.redis.message.topic-name}")

    private String topicName;

 

    @Bean

    public JedisPoolConfig jedisPoolConfig(){

        JedisPoolConfig config = new JedisPoolConfig();

        config.setMaxTotal(maxActive);

        config.setMaxIdle(maxIdle);

        config.setMinIdle(minIdle);

        config.setMaxWaitMillis(maxWait);

 

        return config;

    }

 

    @Bean

    public RedisClusterConfiguration redisClusterConfiguration(){

        RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes));

        configuration.setMaxRedirects(maxRedirects);

 

        return configuration;

    }

 

    /**

     * JedisConnectionFactory

     */

    @Bean

    public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){

        return new JedisConnectionFactory(configuration,jedisPoolConfig);

    }

 

    /**

     * 使用Jackson序列化对象

     */

    @Bean

    public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){

        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class);

 

        ObjectMapper objectMapper = new ObjectMapper();

        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

        serializer.setObjectMapper(objectMapper);

 

        return serializer;

    }

 

    /**

     * RedisTemplate

     */

    @Bean

    public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){

        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();

        redisTemplate.setConnectionFactory(factory);

 

        //字符串方式序列化KEY

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        redisTemplate.setKeySerializer(stringRedisSerializer);

        redisTemplate.setHashKeySerializer(stringRedisSerializer);

 

        //JSON方式序列化VALUE

        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

 

        redisTemplate.afterPropertiesSet();

 

        return redisTemplate;

    }

 

    /**

     * 消息监听器

     */

    @Bean

    MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){

        //消息接收者以及对应的默认处理方法

        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");

        //消息的反序列化方式

        messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);

 

        return messageListenerAdapter;

    }

 

    /**

     * message listener container

     */

    @Bean

    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory

            , MessageListenerAdapter messageListenerAdapter){

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        container.setConnectionFactory(connectionFactory);

        //添加消息监听器

        container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));

 

        return container;

    }

 

}

需要注意的是,这里使用的配置如下所示:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

spring:

  ...

  #redis

  redis:

      cluster:

        nodes: namenode22:6379,datanode23:6379,datanode24:6379

        max-redirects: 6

      timeout: 300000

      jedis:

        pool:

          max-active: 8

          max-wait: 100000

          max-idle: 8

          min-idle: 0

      #自定义的监听的TOPIC路径

      message:

        topic-name: topic-test

 

(3)定义一个Redis消息的处理者:

 

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

package cn.zifangsky.mqwebsocket.mq;

 

import cn.zifangsky.mqwebsocket.enums.WebSocketChannelEnum;

import cn.zifangsky.mqwebsocket.model.websocket.RedisWebsocketMsg;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.simp.SimpMessagingTemplate;

import org.springframework.messaging.simp.user.SimpUser;

import org.springframework.messaging.simp.user.SimpUserRegistry;

import org.springframework.stereotype.Component;

 

import java.text.MessageFormat;

 

/**

* Redis中的WebSocket消息的处理者

*

* @author zifangsky

* @date 2018/10/16

* @since 1.0.0

*/

@Component

public class MessageReceiver {

    private final Logger logger = LoggerFactory.getLogger(getClass());

 

    @Autowired

    private SimpMessagingTemplate messagingTemplate;

 

    @Autowired

    private SimpUserRegistry userRegistry;

 

    /**

     * 处理WebSocket消息

     */

    public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {

        logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));

        //1. 取出用户名并判断是否连接到当前应用节点的WebSocket

        SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());

 

        if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){

            //2. 获取WebSocket客户端的订阅地址

            WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());

 

            if(channelEnum != null){

                //3. 给WebSocket客户端发送消息

                messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());

            }

        }

 

    }

}

 

(4)在Controller中发送WebSocket消息:

 

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

package cn.zifangsky.mqwebsocket.controller;

 

import cn.zifangsky.mqwebsocket.common.Constants;

import cn.zifangsky.mqwebsocket.common.SpringContextUtils;

import cn.zifangsky.mqwebsocket.enums.ExpireEnum;

import cn.zifangsky.mqwebsocket.enums.WebSocketChannelEnum;

import cn.zifangsky.mqwebsocket.model.User;

import cn.zifangsky.mqwebsocket.model.websocket.HelloMessage;

import cn.zifangsky.mqwebsocket.model.websocket.RedisWebsocketMsg;

import cn.zifangsky.mqwebsocket.service.RedisService;

import cn.zifangsky.mqwebsocket.utils.JsonUtils;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.messaging.simp.SimpMessagingTemplate;

import org.springframework.messaging.simp.user.SimpUser;

import org.springframework.messaging.simp.user.SimpUserRegistry;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

 

import javax.annotation.Resource;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpSession;

import java.text.MessageFormat;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

 

/**

* 测试{@link org.springframework.messaging.simp.SimpMessagingTemplate}类的基本用法

* @author zifangsky

* @date 2018/10/10

* @since 1.0.0

*/

@Controller

@RequestMapping(("/wsTemplate"))

public class RedisMessageController {

    private final Logger logger = LoggerFactory.getLogger(getClass());

 

    @Value("${spring.redis.message.topic-name}")

    private String topicName;

 

    @Autowired

    private SimpMessagingTemplate messagingTemplate;

 

    @Autowired

    private SimpUserRegistry userRegistry;

 

    @Resource(name = "redisServiceImpl")

    private RedisService redisService;

 

    /**

     * 给指定用户发送WebSocket消息

     */

    @PostMapping("/sendToUser")

    @ResponseBody

    public String chat(HttpServletRequest request) {

        //消息接收者

        String receiver = request.getParameter("receiver");

        //消息内容

        String msg = request.getParameter("msg");

        HttpSession session = SpringContextUtils.getSession();

        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

 

        HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));

        this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(resultData));

 

        return "ok";

    }

 

    /**

     * 给指定用户发送消息,并处理接收者不在线的情况

     * @param sender 消息发送者

     * @param receiver 消息接收者

     * @param destination 目的地

     * @param payload 消息正文

     */

    private void sendToUser(String sender, String receiver, String destination, String payload){

        SimpUser simpUser = userRegistry.getUser(receiver);

 

        //如果接收者存在,则发送消息

        if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){

            messagingTemplate.convertAndSendToUser(receiver, destination, payload);

        }

        //如果接收者在线,则说明接收者连接了集群的其他节点,需要通知接收者连接的那个节点发送消息

        else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)){

            RedisWebsocketMsg<String> redisWebsocketMsg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload);

 

            redisService.convertAndSend(topicName, redisWebsocketMsg);

        }

        //否则将消息存储到redis,等用户上线后主动拉取未读消息

        else{

            //存储消息的Redis列表名

            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;

            logger.info(MessageFormat.format("消息接收者{0}还未建立WebSocket连接,{1}发送的消息【{2}】将被存储到Redis的【{3}】列表中", receiver, sender, payload, listKey));

 

            //存储消息到Redis中

            redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);

        }

 

    }

 

 

    /**

     * 拉取指定监听路径的未读的WebSocket消息

     * @param destination 指定监听路径

     * @return java.util.Map<java.lang.String,java.lang.Object>

     */

    @PostMapping("/pullUnreadMessage")

    @ResponseBody

    public Map<String, Object> pullUnreadMessage(String destination){

        Map<String, Object> result = new HashMap<>();

        try {

            HttpSession session = SpringContextUtils.getSession();

            //当前登录用户

            User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

 

            //存储消息的Redis列表名

            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;

            //从Redis中拉取所有未读消息

            List<Object> messageList = redisService.rangeList(listKey, 0, -1);

 

            result.put("code", "200");

            if(messageList !=null && messageList.size() > 0){

                //删除Redis中的这个未读消息列表

                redisService.delete(listKey);

                //将数据添加到返回集,供前台页面展示

                result.put("result", messageList);

            }

        }catch (Exception e){

            result.put("code", "500");

            result.put("msg", e.getMessage());

        }

 

        return result;

    }

 

}

 

(5)其他拦截器处理WebSocket连接相关问题:

i)AuthHandshakeInterceptor:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

package cn.zifangsky.mqwebsocket.interceptor.websocket;

 

import cn.zifangsky.mqwebsocket.common.Constants;

import cn.zifangsky.mqwebsocket.common.SpringContextUtils;

import cn.zifangsky.mqwebsocket.model.User;

import cn.zifangsky.mqwebsocket.service.RedisService;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.http.server.ServerHttpRequest;

import org.springframework.http.server.ServerHttpResponse;

import org.springframework.stereotype.Component;

import org.springframework.web.socket.WebSocketHandler;

import org.springframework.web.socket.server.HandshakeInterceptor;

 

import javax.annotation.Resource;

import javax.servlet.http.HttpSession;

import java.text.MessageFormat;

import java.util.Map;

 

/**

* 自定义{@link org.springframework.web.socket.server.HandshakeInterceptor},实现“需要登录才允许连接WebSocket”

*

* @author zifangsky

* @date 2018/10/11

* @since 1.0.0

*/

@Component

public class AuthHandshakeInterceptor implements HandshakeInterceptor {

    private final Logger logger = LoggerFactory.getLogger(getClass());

 

    @Resource(name = "redisServiceImpl")

    private RedisService redisService;

 

    @Override

    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {

        HttpSession session = SpringContextUtils.getSession();

        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

 

        if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, loginUser.getUsername())){

            logger.error("同一个用户不准建立多个连接WebSocket");

            return false;

        }else if(loginUser == null || StringUtils.isBlank(loginUser.getUsername())){

            logger.error("未登录系统,禁止连接WebSocket");

            return false;

        }else{

            logger.debug(MessageFormat.format("用户{0}请求建立WebSocket连接", loginUser.getUsername()));

            return true;

        }

    }

 

    @Override

    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

        

    }

 

}

ii)MyHandshakeHandler:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

package cn.zifangsky.mqwebsocket.interceptor.websocket;

 

import cn.zifangsky.mqwebsocket.common.Constants;

import cn.zifangsky.mqwebsocket.common.SpringContextUtils;

import cn.zifangsky.mqwebsocket.model.User;

import cn.zifangsky.mqwebsocket.service.RedisService;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.http.server.ServerHttpRequest;

import org.springframework.stereotype.Component;

import org.springframework.web.socket.WebSocketHandler;

import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

 

import javax.annotation.Resource;

import javax.servlet.http.HttpSession;

import java.security.Principal;

import java.text.MessageFormat;

import java.util.Map;

 

/**

* 自定义{@link org.springframework.web.socket.server.support.DefaultHandshakeHandler},实现“生成自定义的{@link java.security.Principal}”

*

* @author zifangsky

* @date 2018/10/11

* @since 1.0.0

*/

@Component

public class MyHandshakeHandler extends DefaultHandshakeHandler{

    private final Logger logger = LoggerFactory.getLogger(getClass());

 

    @Resource(name = "redisServiceImpl")

    private RedisService redisService;

 

    @Override

    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {

        HttpSession session = SpringContextUtils.getSession();

        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

 

        if(loginUser != null){

            logger.debug(MessageFormat.format("WebSocket连接开始创建Principal,用户:{0}", loginUser.getUsername()));

            //1. 将用户名存到Redis中

            redisService.addToSet(Constants.REDIS_WEBSOCKET_USER_SET, loginUser.getUsername());

 

            //2. 返回自定义的Principal

            return new MyPrincipal(loginUser.getUsername());

        }else{

            logger.error("未登录系统,禁止连接WebSocket");

            return null;

        }

    }

 

}

iii)MyChannelInterceptor:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

package cn.zifangsky.mqwebsocket.interceptor.websocket;

 

import cn.zifangsky.mqwebsocket.common.Constants;

import cn.zifangsky.mqwebsocket.service.RedisService;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.simp.stomp.StompCommand;

import org.springframework.messaging.simp.stomp.StompHeaderAccessor;

import org.springframework.messaging.support.ChannelInterceptor;

import org.springframework.stereotype.Component;

 

import javax.annotation.Resource;

import java.security.Principal;

import java.text.MessageFormat;

 

/**

* 自定义{@link org.springframework.messaging.support.ChannelInterceptor},实现断开连接的处理

*

* @author zifangsky

* @date 2018/10/10

* @since 1.0.0

*/

@Component

public class MyChannelInterceptor implements ChannelInterceptor{

    private final Logger logger = LoggerFactory.getLogger(getClass());

 

    @Resource(name = "redisServiceImpl")

    private RedisService redisService;

 

    @Override

    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {

        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

        StompCommand command = accessor.getCommand();

 

        //用户已经断开连接

        if(StompCommand.DISCONNECT.equals(command)){

            String user = "";

            Principal principal = accessor.getUser();

            if(principal != null && StringUtils.isNoneBlank(principal.getName())){

                user = principal.getName();

 

                //从Redis中移除用户

                redisService.removeFromSet(Constants.REDIS_WEBSOCKET_USER_SET, user);

            }else{

                user = accessor.getSessionId();

            }

 

            logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user));

        }

    }

 

}

 

(6)WebSocket相关配置:

 

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

package cn.zifangsky.mqwebsocket.config;

 

import cn.zifangsky.mqwebsocket.interceptor.websocket.MyHandshakeHandler;

import cn.zifangsky.mqwebsocket.interceptor.websocket.AuthHandshakeInterceptor;

import cn.zifangsky.mqwebsocket.interceptor.websocket.MyChannelInterceptor;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Configuration;

import org.springframework.messaging.simp.config.ChannelRegistration;

import org.springframework.messaging.simp.config.MessageBrokerRegistry;

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;

import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

 

/**

* WebSocket相关配置

*

* @author zifangsky

* @date 2018/9/30

* @since 1.0.0

*/

@Configuration

@EnableWebSocketMessageBroker

public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{

    @Autowired

    private AuthHandshakeInterceptor authHandshakeInterceptor;

 

    @Autowired

    private MyHandshakeHandler myHandshakeHandler;

 

    @Autowired

    private MyChannelInterceptor myChannelInterceptor;

 

    @Override

    public void registerStompEndpoints(StompEndpointRegistry registry) {

        registry.addEndpoint("/chat-websocket")

                .addInterceptors(authHandshakeInterceptor)

                .setHandshakeHandler(myHandshakeHandler)

                .withSockJS();

    }

 

    @Override

    public void configureMessageBroker(MessageBrokerRegistry registry) {

        //客户端需要把消息发送到/message/xxx地址

        registry.setApplicationDestinationPrefixes("/message");

        //服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息

        registry.enableSimpleBroker("/topic");

        //给指定用户发送消息的路径前缀,默认值是/user/

        registry.setUserDestinationPrefix("/user/");

    }

 

    @Override

    public void configureClientInboundChannel(ChannelRegistration registration) {

        registration.interceptors(myChannelInterceptor);

    }

 

}

 

(7)示例页面:

 

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

<html xmlns:th="http://www.thymeleaf.org">

<head>

    <meta content="text/html;charset=UTF-8"/>

    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>

    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>

    <meta name="viewport" content="width=device-width, initial-scale=1"/>

    <title>Chat With STOMP Message</title>

    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>

    <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>

    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>

    <script th:src="@{/layui/layui.js}"></script>

    <script th:src="@{/layui/lay/modules/layer.js}"></script>

    <link th:href="@{/layui/css/layui.css}" rel="stylesheet">

    <link th:href="@{/layui/css/modules/layer/default/layer.css}" rel="stylesheet">

    <link th:href="@{/css/style.css}" rel="stylesheet">

    <style type="text/css">

        #connect-container {

            margin: 0 auto;

            width: 400px;

        }

 

        #connect-container div {

            padding: 5px;

            margin: 0 7px 10px 0;

        }

 

        .message input {

            padding: 5px;

            margin: 0 7px 10px 0;

        }

 

        .layui-btn {

            display: inline-block;

        }

    </style>

    <script type="text/javascript">

        var stompClient = null;

 

        $(function () {

            var target = $("#target");

            if (window.location.protocol === 'http:') {

                target.val('http://' + window.location.host + target.val());

            } else {

                target.val('https://' + window.location.host + target.val());

            }

        });

 

        function setConnected(connected) {

            var connect = $("#connect");

            var disconnect = $("#disconnect");

            var echo = $("#echo");

 

            if (connected) {

                connect.addClass("layui-btn-disabled");

                disconnect.removeClass("layui-btn-disabled");

                echo.removeClass("layui-btn-disabled");

            } else {

                connect.removeClass("layui-btn-disabled");

                disconnect.addClass("layui-btn-disabled");

                echo.addClass("layui-btn-disabled");

            }

 

            connect.attr("disabled", connected);

            disconnect.attr("disabled", !connected);

            echo.attr("disabled", !connected);

        }

 

        //连接

        function connect() {

            var target = $("#target").val();

 

            var ws = new SockJS(target);

            stompClient = Stomp.over(ws);

 

            stompClient.connect({}, function () {

                setConnected(true);

                log('Info: STOMP connection opened.');

 

                //连接成功后,主动拉取未读消息

                pullUnreadMessage("/topic/reply");

 

                //订阅服务端的/topic/reply地址

                stompClient.subscribe("/user/topic/reply", function (response) {

                    log(JSON.parse(response.body).content);

                })

            },function () {

                //断开处理

                setConnected(false);

                log('Info: STOMP connection closed.');

            });

        }

 

        //断开连接

        function disconnect() {

            if (stompClient != null) {

                stompClient.disconnect();

                stompClient = null;

            }

            setConnected(false);

            log('Info: STOMP connection closed.');

        }

 

        //向指定用户发送消息

        function sendMessage() {

            if (stompClient != null) {

                var receiver = $("#receiver").val();

                var msg = $("#message").val();

                log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg}));

 

                $.ajax({

                    url: "/wsTemplate/sendToUser",

                    type: "POST",

                    dataType: "json",

                    async: true,

                    data: {

                        "receiver": receiver,

                        "msg": msg

                    },

                    success: function (data) {

 

                    }

                });

            } else {

                layer.msg('STOMP connection not established, please connect.', {

                    offset: 'auto'

                    ,icon: 2

                });

            }

        }

 

        //从服务器拉取未读消息

        function pullUnreadMessage(destination) {

            $.ajax({

                url: "/wsTemplate/pullUnreadMessage",

                type: "POST",

                dataType: "json",

                async: true,

                data: {

                    "destination": destination

                },

                success: function (data) {

                    if (data.result != null) {

                        $.each(data.result, function (i, item) {

                            log(JSON.parse(item).content);

                        })

                    } else if (data.code !=null && data.code == "500") {

                        layer.msg(data.msg, {

                            offset: 'auto'

                            ,icon: 2

                        });

                    }

                }

            });

        }

 

        //日志输出

        function log(message) {

            console.debug(message);

        }

    </script>

</head>

<body>

    <noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being

        enabled. Please enable

        Javascript and reload this page!</h2></noscript>

    <div>

        <div id="connect-container" class="layui-elem-field">

            <legend>Chat With STOMP Message</legend>

            <div>

                <input id="target" type="text" class="layui-input" size="40" style="width: 350px" value="/chat-websocket"/>

            </div>

            <div>

                <button id="connect" class="layui-btn layui-btn-normal" οnclick="connect();">Connect</button>

                <button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"

                        οnclick="disconnect();">Disconnect

                </button>

 

            </div>

            <div class="message">

                <input id="receiver" type="text" class="layui-input" size="40" style="width: 350px" placeholder="接收者姓名" value=""/>

                <input id="message" type="text" class="layui-input" size="40" style="width: 350px" placeholder="消息内容" value=""/>

            </div>

            <div>

                <button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled"

                        οnclick="sendMessage();">Send Message

                </button>

            </div>

        </div>

    </div>

</body>

</html>

测试效果省略,具体效果可以自行在两台不同服务器上面运行示例源码查看。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Boot中使用WebSocket [第三部分] 的相关文章

  • 反思 Groovy 脚本中声明的函数

    有没有一种方法可以获取 Groovy 脚本中声明的函数的反射数据 该脚本已通过GroovyShell目的 具体来说 我想枚举脚本中的函数并访问附加到它们的注释 Put this到 Groovy 脚本的最后一行 它将作为脚本的返回值 a la
  • put方法中的Angularjs文件上传不起作用

    我有一个简单的待办事项应用程序 我试图在其中上传照片和单个待办事项 现在我已经创建了这个工厂函数来负责待办事项的创建 todosFactory insertTodo function todo return http post baseUr
  • 将 onclick 事件应用于页面加载时不存在的元素

    我将列表样式设置为看起来像选择框 并且当用户单击列表中的元素时我想触发一个函数 但是该元素是通过加载的AJAX因此 当页面加载并且我无法绑定时不存在onclick事件到它onDomReady 如果我把它作为一个普通的选择列表 我可以只标记一
  • 将 window.location 传递给 Flask url_for

    我正在使用 python 在我的页面上 当匿名用户转到登录页面时 我想将一个变量传递到后端 以便它指示用户来自哪里 发送 URL 因此 当用户单击此锚链接时 a href Sign in a 我想发送用户当前所在页面的当前 URL
  • Rails:找不到 JavaScript 运行时。有关可用运行时的列表,请参阅 https://github.com/sstephenson/execjs。 (ExecJS::运行时不可用)

    自从几周前 Dreamhost 升级了服务器以来 我的网站就被破坏了 我一直在努力解决它并取得了一些进展 但我仍然坚持希望是最后的问题 我在 Ruby 1 8 7 上使用 Rails 3 1 1 并收到来自 PhusionPassenger
  • 为什么 Array.prototype.filter() 在 Magnolia JavaScript 模型中抛出错误?

    我正在尝试过滤 FreeMarker 列表Magnolia JavaScript 模型 https documentation magnolia cms com display DOCS61 How to work with JavaScr
  • jQuery 悬停时滚动到 div 并返回到第一个元素

    我基本上有一个具有设定尺寸的 div 和overflow hidden 该 div 包含 7 个子 div 但一次只显示一个 我希望当它们各自的链接悬停时能够平滑地垂直滚动 但是 第一部分 div 没有链接 并且是没有悬停链接时的默认部分
  • 如何在 JFreeChart TimeSeries 图表上显示降雨指数和温度?

    目前 我的 TimeSeries 图表每 2 秒显示一个位置的温度 现在 如果我想每2秒显示一次降雨指数和温度 我该如何实现呢 这是我的代码 import testWeatherService TestWeatherTimeLapseSer
  • 检查 protobuf 消息 - 如何按名称获取字段值?

    我似乎无法找到一种方法来验证 protobuf 消息中字段的值 而无需显式调用其 getter 我看到周围的例子使用Descriptors FieldDescriptor实例到达消息映射内部 但它们要么基于迭代器 要么由字段号驱动 一旦我有
  • 使用 AWS Java SDK 为现有 S3 对象设置 Expires 标头

    我正在更新 Amazon S3 存储桶中的现有对象以设置一些元数据 我想设置 HTTPExpires每个对象的标头以更好地处理 HTTP 1 0 客户端 我们正在使用AWS Java SDK http aws amazon com sdkf
  • 应用程序关闭时的倒计时问题

    我制作了一个 CountDownTimer 代码 我希望 CountDownTimer 在完成时重新启动 即使应用程序已关闭 但它仅在应用程序正在运行或重新启动应用程序时重新启动 因此 如果我在倒计时为 00 10 分钟 秒 时关闭应用程序
  • 将 JSON 参数从 java 发布到 sinatra 服务

    我有一个 Android 应用程序发布到我的 sinatra 服务 早些时候 我无法读取 sinatra 服务上的参数 但是 在我将内容类型设置为 x www form urlencoded 之后 我能够看到参数 但不完全是我想要的 我在
  • 当单元格内的 JComboBox 中有 ItemEvent 时,如何获取 CellRow

    我有一个 JTable 其中有一列包含 JComboBox 我有一个附加到 JComboBox 的 ItemListener 它会根据任何更改进行操作 但是 ItemListener 没有获取更改的 ComboBox 所在行的方法 当组合框
  • JavaScript 提升解释

    下面的片段有什么区别 var a 0 function b a 10 return function a b console log a gt 10 and var a 0 function b a 10 return function a
  • Aurelia - 仅 HTML 自定义元素的内联定义

    我的 Aurelia 视图模型中有一个递归对象 如下所示 Class BottomlessPit Name string MorePits BottomlessPit null 因此 我想在 Aurelia 视图中使用递归模板 它只会在一个
  • 将2-3-4树转换为红黑树

    我正在尝试将 2 3 4 树转换为 java 中的红黑树 但我无法弄清楚它 我将这两个基本类编写如下 以使问题简单明了 但不知道从这里到哪里去 public class TwoThreeFour
  • AngularJS 在指令运行之前通过 AJAX 检索数据

    我正在使用 AngularUIuiMap http angular ui github com directives map实例化谷歌地图的指令 uiMap 指令非常适合处理硬编码数据 mapOptions and myMarkers 但是
  • KeyPressed 和 KeyTyped 混淆[重复]

    这个问题在这里已经有答案了 我搜索过之间的区别KeyPressedand KeyTyped事件 但我仍然不清楚 我发现的一件事是 Keypressed 比 KeyTyped 首先被触发 请澄清一下这些事件何时被准确触发 哪个适合用于哪个目的
  • DOM 解析器 Chrome 扩展内存泄漏

    问题 我开发了一个扩展程序 可以拦截 Web 请求 获取 Web 请求来源的 HTML 并对其进行处理 我使用 DOMParser 来解析 HTML 并且意识到 DOMParser 正在导致大量内存泄漏问题 最终导致 chrome 扩展崩溃
  • Jackson 将单个项目反序列化到列表中

    我正在尝试使用一项服务 该服务为我提供了一个带有数组字段的实体 id 23233 items name item 1 name item 2 但是 当数组包含单个项目时 将返回该项目本身 而不是包含一个元素的数组 id 43567 item

随机推荐

  • 使用Termux在安卓手机上运行tomcat服务器

    使用Termux在安卓手机上安装运行tomcat服务器 简单背景 探索尝试 尝试一 使用limbo虚拟机 失败 想念二 使用Linux Deploy安装 直接放弃 尝试三 使用Aid Learning 成功但搁置 尝试四 使用Termux直
  • Django之数据库并发处理

    1 数据库并发处理问题 在多个用户同时发起对同一个数据提交修改操作时 先查询 再修改 会出现资源竞争的问题 导致最终修改的数据结果出现异常 比如限量商品在热销时 当多个用户同时请求购买商品时 最终修改的数据就会出现异常 下面我们来写点代码还
  • getline()的使用详解

    一 getline int main string line while getline cin line cout lt
  • 图像去噪的OPenCV添加噪声和去噪

    添加噪声 添加高斯噪声 IplImage AddGuassianNoise IplImage src 添加高斯噪声 IplImage dst cvCreateImage cvGetSize src src gt depth src gt n
  • sass的日常使用

    sass跟css的简单对比 css 由于CSS的语法不够强大 没有变量和合理的样式复用机制 使得逻辑上相关的属性值必须以字面的形式反复出现 导致难以维护 而动态演示语言为CSS赋予了动态语言的特性 极大的提高了样式语言的可维护性 sass
  • github.com访问慢解决

    修改hosts HOSTS文件路径 C Windows System32 drivers etc hosts 1 打开Dns查询 站长工具 http tool chinaz com dns 2 搜索http github com 3 把TT
  • 使用Linux内核里的spi屏驱动-fbtft

    Linux内核里已经提供spi接口小屏的设备驱动 在内核的配置选项 make menuconfig ARCH arm CROSS COMPILE arm linux gnueabihf Device Drivers gt Graphics
  • 多线程练习之:生产电脑

    生产电脑 题目 设计一个生产电脑和搬运电脑类 要求生产出一台电脑就搬走一台电脑 如果没有新的电脑生产出来 则搬运工要等待新电脑产出 如果生产出的电脑没有搬走 则要等待电脑搬走之后再生产 并统计出生产的电脑数量 public class Co
  • python乘法出现小数位很多_js小数运算出现多位小数如何解决

    小数相乘出现很多位小数的问题 这个问题自己以前也遇到过 现在特意来总结一下 Number类型 Number类型是ECMAScript中最常用和最令人关注的类型了 这种类型使用IEEE754格式来表示整数和浮点数值 浮点数值在某些语言中也被成
  • 字节跳动前端面经

    面试经历 我入职字节大概一个多月 目前准大四 实习生 现在写面经不仅仅是牛客网 YYSD 真tm灵 的还愿 还是就是想给大家推荐推荐我们部门 技术中台 可能大家对技术中台不太了解 但是大家肯定听说过掘金吧 我们组主要负责ByteTech 字
  • 相机的信噪比

    在图像传感器的成像过程中 真实的信号是无法探测到的理想值 在成像过程中理想值被引入了一系列的不确定性 最终形成读出信号也即图像 此过程中的不确定性被统一称为噪声 而信号与噪声的比值被定义为信噪比 Signal to NoiseRatio S
  • hibernate: Duplicate class/entity; Could not parse mapping document from resource

    近日在学习Hibernate时 总是遇到以下异常 org hibernate InvalidMappingException Could not parse mapping document from resource kpy db Cus
  • ModelScope-Agent: Building Your Customizable Agent System with Open-source Large Language Models

    本文是LLM系列文章 针对 ModelScope Agent Building Your Customizable Agent System with Open source Large Language Models 的翻译 ModelS
  • 1072. 开学寄语(20)

    下图是上海某校的新学期开学寄语 天将降大任于斯人也 必先删其微博 卸其QQ 封其电脑 夺其手机 收其ipad 断其wifi 使其百无聊赖 然后 净面 理发 整衣 然后思过 读书 锻炼 明智 开悟 精进 而后必成大器也 本题要求你写个程序帮助
  • Webpack 基础配置介绍(二)

    今天继续分享webpack的有关内容 我还是接着从上篇文章的项目来给大家分享后续内容 如果还有小伙伴没有阅读之前的文章 请关注博主进行阅读 今日分享 1 webpack的规范配置 2 webpack config js基础配置 3 单页开发
  • JSP+ssm计算机毕业设计考研资源共享平台设计与实现399xv【源码、数据库、LW、部署】

    项目运行 项目含有源码 文档 程序 数据库 配套开发软件 软件安装教程 环境配置 Jdk1 8 Tomcat7 0 Mysql HBuilderX Webstorm也行 Eclispe IntelliJ IDEA Eclispe MyEcl
  • 环境变量路径中有空格该怎么办?

    本机环境变量的设置 java home C Program Files Java jdk1 5 0 09 bat中的命令格式 java home bin java 或者 C Program Files Java jdk1 5 0 09 bi
  • Python、Matplot的subplot实现一行3列的子图绘制,并添加背景色

    Python Matplot的subplot实现一行3列的子图绘制 并添加背景色 1 可能遇到的问题 2 示例 1 绘制2 2 俩行俩列 的子图 并设置背景色 2 绘制1 3 一行三列 的子图 并设置横轴纵轴值 3 绘制1 3 一行三列 的
  • 记一次kafka Consumer线程停止消费过程分析

    前言 kafka消息队列在项目开发中经常被使用 尤其是在大数据领域经常见到它的身影 spring集成了kafka方便我们使用 只要引入spring kafka即可 问题描述 有一天我们后台版本发布 虽然改动很大 但是大家还是自信满满 因为经
  • Spring Boot中使用WebSocket [第三部分]

    使用消息队列实现分布式WebSocket 在上一篇文章 https www zifangsky cn 1359 html 中我介绍了服务端如何给指定用户的客户端发送消息 并如何处理对方不在线的情况 在这篇文章中我们继续思考另外一个重要的问题