前言
在前面的博客内容中我们介绍了如何使用websocket实现一个网页版的在线客服聊天室,众所周知,由于websocket是一个长连接,要和服务端保持会话连接,所以其本身并不适用于微服务环境,在微服务环境中,有可能A、B俩个客户端连接到不同的服务A、B中,这样就没法保证A、B俩个客户端完成聊天的功能,因为会话不在同一台服务器上,A、B无法感知到对方发送的消息,为了解决websocket单机的这个痛点,我们引入消息中间键RocketMQ的广播机制,实现消息的转发,从而实现微服务版的websocke聊天室功能。其架构如下:
本节内容使用的主要技术包含springboot、redis、rocketmq、vue等,关于中间键的搭建本节内容不在展开,请关注作者的往期博客内容。
正文
- 引入websocket、redis和rocketmq的pom依赖
①核心pom依赖
<!-- rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<!-- websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- redis-->
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
<version>2.4.3</version>
</dependency>
PS:可以按需引入自己需要的依赖,作者这里只列出核心的pom依赖
①配置文件
server:
port: 8888
spring:
#数据源配置
datasource:
dynamic:
primary: master #设置默认的数据源或者数据源组,默认值即为master
strict: false #设置严格模式,默认false不启动. 启动后在未匹配到指定数据源时候会抛出异常,不启动则使用默认数据源.
datasource:
master:
url: jdbc:mysql://192.168.56.10:3306/atp
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
profiles:
active: dev
servlet:
multipart:
max-file-size: 52428800
max-request-size: 52428800
#redis配置
redis:
#redisson配置
redisson:
file: classpath:redisson.yaml
#默认数据分区
database: 0
#redis集群节点配置
cluster:
nodes:
- 192.168.56.10:6379
- 192.168.56.10:6380
- 192.168.56.10.6381
max-redirects: 3
#超时时间
timeout: 10000
#哨兵节点配置
sentinel:
master: mymaster
nodes:
- "192.168.56.10:26379"
- "192.168.56.10:26380"
- "192.168.56.10:26381"
#redis密码
password: root
#redis 客户端工具
lettuce:
pool:
# 连接池最大连接数(使用负值表示没有限制) 默认为8
max-active: 8
# 连接池中的最小空闲连接 默认为 0
min-idle: 1
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认为-1
max-wait: 1000
# 连接池中的最大空闲连接 默认为8
max-idle: 8
session:
store-type: redis
redis:
flush-mode: on_save
namespace: spring:session:atp
thymeleaf:
cache: false
#mybatisplus配置
mybatis-plus:
mapper-locations: classpath*:/mapper/*/*Mapper.xml
type-aliases-package: com.yundi.atp.platform.module.*.entity
configuration:
map-underscore-to-camel-case: true
global-config:
db-config:
id-type: assign_id
#rocketmq配置
rocketmq:
#注册地址
name-server: 192.168.56.10:9876;192.168.56.10:9877
producer:
#生产者组名称
group: atp-producer
#命名空间
namespace: atp
#异步消息发送失败重试次数,默认是2
retry-times-when-send-async-failed: 2
#发送消息超时时间,默认2000ms
send-message-timeout: 2000
#消息的最大长度:默认1024 * 1024 * 4(默认4M)
max-message-size: 40000000
#压缩消息阈值,超过4k就压缩
compress-message-body-threshold: 4096
#是否发送失败,重试另外的broker
retry-next-server: false
#是否启用消息追踪
enable-msg-trace: false
#默认追踪的主题
customized-trace-topic: RMQ_SYS_TRACE_TOPIC
#消息发送失败重试的次数
retry-times-when-send-failed: 2
package com.yundi.atp.platform.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
/**
* 注入ServerEndpointExporter,
* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
package com.yundi.atp.platform.websocket;
import com.alibaba.fastjson.JSON;
import com.yundi.atp.platform.common.Constant;
import com.yundi.atp.platform.enums.MessageType;
import com.yundi.atp.platform.module.test.entity.ChatMsg;
import com.yundi.atp.platform.module.test.service.ChatMsgService;
import com.yundi.atp.platform.rocketmq.RocketConstant;
import com.yundi.atp.platform.rocketmq.RocketProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@Slf4j
@Component
@ServerEndpoint(Constant.WEBSOCKET_MQ_URL + "{userName}")
public class WebSocketMqServer {
/**
* 会话session
*/
private Session session;
/**
* socket连接
*/
private static CopyOnWriteArraySet<WebSocketMqServer> webSockets = new CopyOnWriteArraySet<>();
/**
* 会话连接池
*/
private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
/**
* 消息持久化
*/
private static ChatMsgService chatMsgService;
/**
* redis
*/
private static RedisTemplate redisTemplate;
/**
* RocketMQ消息工具类
*/
private static RocketProducer rocketProducer;
@Autowired
public void setWebSocketServer(ChatMsgService chatMsgService,
RedisTemplate redisTemplate,
RocketProducer rocketProducer) {
WebSocketMqServer.chatMsgService = chatMsgService;
WebSocketMqServer.redisTemplate = redisTemplate;
WebSocketMqServer.rocketProducer = rocketProducer;
}
@OnOpen
public void onOpen(Session session, @PathParam(value = "userName") String userName) {
//1.将用户添加到在线用户列表中
if (!Constant.SUPER_ADMIN.equals(userName)) {
redisTemplate.opsForSet().add("online", userName);
}
//2.保存会话连接
this.session = session;
webSockets.add(this);
sessionPool.put(userName, session);
Set online = redisTemplate.opsForSet().members("online");
log.info("【websocket消息】有新的连接,总在线人数为:" + online.size());
//3.创建消息
WebSocketMqMsg webSocketMqMsg = new WebSocketMqMsg();
//消息类型
webSocketMqMsg.setKey(MessageType.MESSAGE_OPEN.getCode());
//在线人数
webSocketMqMsg.setOnlineList(online);
//全部人数
webSocketMqMsg.setUserList(chatMsgService.getUserList());
//4.消息异步发送到RocketMQ
rocketProducer.sendAsyncMsg(RocketConstant.ROCKET_TOPIC, RocketConstant.ROCKET_TAG_CHAT, UUID.randomUUID().toString(), JSON.toJSONString(webSocketMqMsg));
}
@OnClose
public void onClose(@PathParam(value = "userName") String userName) {
//1.更新在线用户列表
redisTemplate.opsForSet().remove("online", userName);
//2.清除会话连接
webSockets.remove(this);
sessionPool.remove(userName);
Set online = redisTemplate.opsForSet().members("online");
log.info("【websocket消息】连接断开,总在线人数为:" + online.size());
//3.创建消息
WebSocketMqMsg webSocketMqMsg = new WebSocketMqMsg();
webSocketMqMsg.setKey(MessageType.MESSAGE_CLOSE.getCode());
webSocketMqMsg.setOnlineList(online);
webSocketMqMsg.setUserList(chatMsgService.getUserList());
//4.消息异步发送到RocketMQ
rocketProducer.sendAsyncMsg(RocketConstant.ROCKET_TOPIC, RocketConstant.ROCKET_TAG_CHAT, UUID.randomUUID().toString(), JSON.toJSONString(webSocketMqMsg));
}
@OnMessage
public void onMessage(String message) {
//1.持久化消息内容
ChatMsg chatMsg = JSON.parseObject(message, ChatMsg.class);
chatMsgService.save(chatMsg);
//2.创建消息
WebSocketMqMsg webSocketMqMsg = new WebSocketMqMsg();
webSocketMqMsg.setKey(MessageType.MESSAGE_SEND.getCode());
webSocketMqMsg.setData(chatMsg);
//3.消息异步发送到RocketMQ
rocketProducer.sendAsyncMsg(RocketConstant.ROCKET_TOPIC, RocketConstant.ROCKET_TAG_CHAT, UUID.randomUUID().toString(), JSON.toJSONString(webSocketMqMsg));
}
/**
* 广播消息
*/
public void sendAllMessage(String message) {
for (WebSocketMqServer webSocket : webSockets) {
log.info("【websocket消息】广播消息:" + message);
try {
Session session = webSocket.session;
if (session != null && session.isOpen()) {
webSocket.session.getAsyncRemote().sendText(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 单点消息
*
* @param userName
* @param message
*/
public void sendOneMessage(String userName, String message) {
log.info("【websocket消息】单点消息:" + message);
Session session = sessionPool.get(userName);
if (session != null && session.isOpen()) {
try {
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
ps:这里我们将会话的消息先推送给消息中间键RocketMQ,然后将消息通过广播的形式分发给每一台服务器去消费,如何能消费成功 ,就将消息推送给对应的客户端
package com.yundi.atp.platform.common;
public class Constant {
/**
* zookeeper分布式锁根路径
*/
public final static String LOCK_ROOT_PATH = "/zookeeper/lock/";
/**
* websocket协议
*/
public final static String WEBSOCKET_PROTOCOL = "ws://";
/**
* 单机版聊天室
*/
public final static String WEBSOCKET_SINGLE_URL = "/websocket/chat/";
/**
* 微服务版聊天室
*/
public final static String WEBSOCKET_MQ_URL = "/websocket/mq/chat/";
/**
* 超级管理员
*/
public final static String SUPER_ADMIN = "super_admin";
}
- 自定义消息类型:根据不同消息内容处理不同的消息业务逻辑
package com.yundi.atp.platform.enums;
public enum MessageType {
MESSAGE_OPEN(1, "开启连接"),
MESSAGE_CLOSE(2, "断开连接"),
MESSAGE_SEND(3, "发送消息"),
MESSAGE_RE_OPEN(4, "异地登录下线通知");
private Integer code;
private String msg;
MessageType(Integer code, String msg) {
this.code = code;
this.msg = msg;
}
public Integer getCode() {
return code;
}
public String getMsg() {
return msg;
}
}
package com.yundi.atp.platform.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RocketProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送同步消息:消息响应后发送下一条消息
*
* @param topic 消息主题
* @param tag 消息tag
* @param key 业务号
* @param data 消息内容
*/
public void sendSyncMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
log.info("【RocketMQ】发送同步消息:{}", sendResult);
}
/**
* 发送异步消息:异步回调通知消息发送的状况
*
* @param topic 消息主题
* @param tag 消息tag
* @param key 业务号
* @param data 消息内容
*/
public void sendAsyncMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("【RocketMQ】发送异步消息:{}", sendResult);
}
@Override
public void onException(Throwable e) {
log.info("【RocketMQ】发送异步消息异常:{}", e.getMessage());
}
});
}
/**
* 发送单向消息:消息发送后无响应,可靠性差,效率高
*
* @param topic 消息主题
* @param tag 消息tag
* @param key 业务号
* @param data 消息内容
*/
public void sendOneWayMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
rocketMQTemplate.sendOneWay(destination, message);
}
/**
* 同步延迟消息
*
* @param topic 主题
* @param tag 标签
* @param key 业务号
* @param data 消息体
* @param timeout 发送消息的过期时间
* @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
SendResult sendResult = rocketMQTemplate.syncSend(destination, message, timeout, delayLevel);
log.info("【RocketMQ】发送同步延迟消息:{}", sendResult);
}
/**
* 异步延迟消息
*
* @param topic 主题
* @param tag 标签
* @param key 业务号
* @param data 消息体
* @param timeout 发送消息的过期时间
* @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("【RocketMQ】发送异步延迟消息:{}", sendResult);
}
@Override
public void onException(Throwable e) {
log.info("【RocketMQ】发送异步延迟消息异常:{}", e.getMessage());
}
}, timeout, delayLevel);
}
/**
* 同步顺序消息
*
* @param topic 主题
* @param tag 标签
* @param key 业务号
* @param data 消息体
*/
public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, key);
log.info("【RocketMQ】发送同步顺序消息:{}", sendResult);
}
/**
* 异步顺序消息
*
* @param topic 主题
* @param tag 标签
* @param key 业务号
* @param data 消息体
*/
public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
//主题
String destination = topic + ":" + tag;
rocketMQTemplate.asyncSendOrderly(destination, message, key, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("【RocketMQ】发送异步顺序消息:{}", sendResult);
}
@Override
public void onException(Throwable e) {
log.info("【RocketMQ】发送异步顺序消息异常:{}", e.getMessage());
}
});
}
/**
* 分布式事务消息
*
* @param topic 主题
* @param tag 标签
* @param key 业务号
* @param data 消息体
*/
public void sendTransactionMsg(String topic, String tag, String key, String data) {
//消息
Message message = MessageBuilder.withPayload(data)
.setHeader(RocketMQHeaders.KEYS, key)
.setHeader(RocketMQHeaders.TRANSACTION_ID, key)
.build();
//主题
String destination = topic + ":" + tag;
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, null);
if (transactionSendResult.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) &&
transactionSendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
log.info("分布式事物消息发送成功");
}
log.info("分布式事物消息发送结果:{}", transactionSendResult);
}
}
- websocket服务的连接地址获取及历史消息获取
package com.yundi.atp.platform.module.test.controller;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yundi.atp.platform.common.Result;
import com.yundi.atp.platform.module.test.entity.ChatMsg;
import com.yundi.atp.platform.module.test.service.ChatMsgService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
@Api(tags = "聊天室接口-mq版")
@RestController
@RequestMapping("/test/mq/chatMsg")
public class ChatMsgMqController {
@Autowired
private ChatMsgService chatMsgService;
@ApiOperation(value = "获取聊天室地址")
@GetMapping(value = "/getWebSocketAddress/{username}")
public Result getWebSocketAddress(HttpServletRequest request, @PathVariable(value = "username") String username) throws UnknownHostException {
String address = "ws://" + InetAddress.getLocalHost().getHostAddress() + ":" + request.getServerPort() + request.getContextPath() + "/websocket/mq/chat/" + username;
return Result.success(address);
}
@ApiOperation(value = "获取历史聊天记录")
@GetMapping(value = "/getHistoryChat/{username}")
public Result getWebSocketAddress(@PathVariable(value = "username") String username) {
List<ChatMsg> list = chatMsgService.list(new QueryWrapper<ChatMsg>()
.and(wrapper -> wrapper.eq("sender", username).or().eq("receiver", username))
.orderByDesc("create_time"));
List<ChatMsg> collect = list.stream().sorted(Comparator.comparing(ChatMsg::getCreateTime)).collect(Collectors.toList());
return Result.success(collect);
}
@ApiOperation(value = "获取用户列表")
@GetMapping(value = "/getUserList")
public Result getUserList() {
List<String> userList = chatMsgService.getUserList();
return Result.success(userList);
}
}
package com.yundi.atp.platform.websocket;
import com.alibaba.fastjson.JSON;
import com.yundi.atp.platform.common.Constant;
import com.yundi.atp.platform.enums.MessageType;
import com.yundi.atp.platform.module.test.entity.ChatMsg;
import com.yundi.atp.platform.rocketmq.RocketConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = RocketConstant.ROCKET_CONSUMER_CHAT_GROUP,
topic = RocketConstant.ROCKET_TOPIC,
selectorExpression = RocketConstant.ROCKET_TAG_CHAT,
namespace = RocketConstant.ROCKET_NAMESPACE,
messageModel = MessageModel.BROADCASTING)
public class WebSocketMqConsumer implements RocketMQListener<String> {
@Autowired
WebSocketMqServer webSocketMqServer;
@Override
public void onMessage(String message) {
log.info("聊天室消息:{}", message);
//1.解析消息
WebSocketMqMsg webSocketMqMsg = JSON.parseObject(message, WebSocketMqMsg.class);
//2.根据消息类型解析消息
// 建立连接消息
if (webSocketMqMsg.getKey().equals(MessageType.MESSAGE_OPEN.getCode())) {
webSocketMqServer.sendOneMessage(Constant.SUPER_ADMIN, message);
}
// 关闭连接消息
if (webSocketMqMsg.getKey().equals(MessageType.MESSAGE_CLOSE.getCode())) {
webSocketMqServer.sendOneMessage(Constant.SUPER_ADMIN, message);
}
// 发送消息
if (webSocketMqMsg.getKey().equals(MessageType.MESSAGE_SEND.getCode())) {
ChatMsg data = webSocketMqMsg.getData();
webSocketMqServer.sendOneMessage(data.getSender(), message);
webSocketMqServer.sendOneMessage(data.getReceiver(), message);
}
}
}
package com.yundi.atp.platform.rocketmq;
public class RocketConstant {
/**
* 消费者组
*/
public final static String ROCKET_CONSUMER_GROUP = "atp-consumer";
/**
* 聊天室消费者组
*/
public final static String ROCKET_CONSUMER_CHAT_GROUP = "atp-chat-consumer";
/**
* 主题
*/
public final static String ROCKET_TOPIC = "atp";
/**
* tag
*/
public final static String ROCKET_TAG = "app";
/**
* 聊天室tag
*/
public final static String ROCKET_TAG_CHAT = "chat";
/**
* 名称空间
*/
public final static String ROCKET_NAMESPACE = "atp";
}
<template>
<div class="container">
<el-card class="box-card">
<div slot="header">
<el-row type="flex">
<el-col :span="1" style="margin: 15px 10px;">
<img alt="ATP客服" src="@/assets/logo.png" style="width:40px;height:40px;"/>
</el-col>
<el-col :span="3" style="line-height: 74px;margin-left: 10px;">
<span style="display: inline-block;color: white;">ATP客服</span>
</el-col>
<el-col :span="20" v-if="username==='super_admin'">
<h5 style="color: #83ccd2;padding: 0;text-align: right;margin: 50px 20px 0 0;">当前在线人数:{{ online }}</h5>
</el-col>
<el-col :span="20" v-else>
<h5 style="color: #83ccd2;padding: 0 0 2px 0;text-align: right;margin: 50px 20px 0 0;font-size: 18px;">
{{ username }}</h5>
</el-col>
</el-row>
</div>
<div class="content" ref="content">
<el-row type="flex">
<el-col :span="6" style="background: #eee;min-height: 600px;" v-if="username==='super_admin'">
<el-tabs v-model="activeName" @tab-click="handleClick" style="width: 190px;margin: 0 2px;">
<el-tab-pane label="在线用户" name="online">
<div v-for="item in friend" :key="item" @click="switchUser(item)" :class="item===active?'mark':''">
<el-badge :is-dot=msgNotify.includes(item) class="item" type="success">
<li style="list-style-type:none;padding: 5px 8px;cursor: pointer;"
class="active">
{{ item }}
</li>
</el-badge>
<el-divider></el-divider>
</div>
</el-tab-pane>
<el-tab-pane label="全部用户" name="all">
<div v-for="item in userList" :key="item" @click="switchUser(item)" :class="item===active?'mark':''">
<el-badge :is-dot=msgNotify.includes(item) class="item" type="success">
<li style="list-style-type:none;padding: 5px 8px;cursor: pointer;"
:class="friend.includes(item)?'active':''">
{{ item }}
</li>
</el-badge>
<el-divider></el-divider>
</div>
</el-tab-pane>
</el-tabs>
</el-col>
<el-col :span="18" v-if="username==='super_admin'">
<div v-for="item in chatMsgList">
<el-row type="flex" style="margin-bottom: 20px;" v-if="username===item.sender">
<el-col :span="2">
<img alt="ATP客服" src="@/assets/logo.png" style="width:30px;height:30px;margin: 10px 0px 0px 20px;"/>
</el-col>
<el-col :span="22">
<el-row type="flex" style="margin-top: 10px;margin-left: 5px;opacity: 0.2;">
<el-col :span="7"><span style="padding-left: 20px;">{{ item.sender }}</span></el-col>
<el-col :span="7"><span>{{ item.createTime | dataFormat('yyyy-MM-dd HH:mm') }}</span></el-col>
</el-row>
<el-row>
<el-col :span="14" style="margin-left: 8px;margin-top: 5px;">
<el-card style="padding: 8px 5px;">
{{ item.msg }}
</el-card>
</el-col>
</el-row>
</el-col>
</el-row>
<el-row type="flex" style="margin-bottom: 20px;" v-else justify="end">
<el-col :span="22">
<el-row type="flex" style="margin-top: 10px;margin-right: 5px;opacity: 0.2;" justify="end">
<el-col :span="6"><span>{{ item.sender }}</span></el-col>
<el-col :span="7"><span>{{ item.createTime | dataFormat('yyyy-MM-dd HH:mm') }}</span></el-col>
</el-row>
<el-row type="flex" justify="end" style="margin-right: 8px;margin-top: 5px;">
<el-col :span="14" style="margin-right: 8px;">
<el-card style="padding: 8px 5px;">
{{ item.msg }}
</el-card>
</el-col>
</el-row>
</el-col>
<el-col :span="2">
<el-avatar shape="square" size="medium" style="float: right;margin: 10px 20px 0px 0px;">客户</el-avatar>
</el-col>
</el-row>
</div>
</el-col>
<el-col :span="24" v-else>
<div v-for="item in chatMsgList">
<el-row type="flex" style="margin-bottom: 20px;" v-if="username===item.sender">
<el-col :span="2">
<el-avatar shape="square" size="medium" style="float: right;margin: 10px 20px 0px 0px;">客户</el-avatar>
</el-col>
<el-col :span="22">
<el-row type="flex" style="margin-top: 10px;opacity: 0.2;margin-left: 20px;">
<el-col :span="7"><span style="padding-left: 5px;">{{ item.sender }}</span></el-col>
<el-col :span="7"><span>{{ item.createTime | dataFormat('yyyy-MM-dd HH:mm') }}</span></el-col>
</el-row>
<el-row>
<el-col :span="14">
<el-card style="padding: 8px 5px;">
{{ item.msg }}
</el-card>
</el-col>
</el-row>
</el-col>
</el-row>
<el-row type="flex" style="margin-bottom: 20px;" v-else justify="end">
<el-col :span="22">
<el-row type="flex" style="margin-top: 10px;margin-right: 5px;opacity: 0.2;" justify="end">
<el-col :span="6"><span>{{ item.sender }}</span></el-col>
<el-col :span="7"><span>{{ item.createTime | dataFormat('yyyy-MM-dd HH:mm') }}</span></el-col>
</el-row>
<el-row type="flex" justify="end" style="margin-top: 5px;">
<el-col :span="14">
<el-card style="padding: 8px 5px;">
{{ item.msg }}
</el-card>
</el-col>
</el-row>
</el-col>
<el-col :span="2">
<img alt="ATP客服" src="@/assets/logo.png" style="width:30px;height:30px;margin: 10px 0px 0px 20px;"/>
</el-col>
</el-row>
</div>
</el-col>
</el-row>
</div>
<div class="operate" v-if="username==='super_admin'">
<el-input
type="textarea"
:autosize="{ minRows: 3, maxRows: 3}"
placeholder="您好!这里是ATP客服部,我是客服小美,很高兴为您服务!"
v-model="msg">
</el-input>
<el-button type="success" size="mini" style="float: right;margin-top: 5px;" @click="sendMsg"
:disabled="!(msg && active)">
发送
</el-button>
</div>
<div class="operate" v-else>
<el-input
type="textarea"
:autosize="{ minRows: 3, maxRows: 3}"
placeholder="您好!这里是ATP客服部,我是客服小美,很高兴为您服务!"
v-model="msg">
</el-input>
<el-button type="success" size="mini" style="float: right;margin-top: 5px;" @click="sendMsg" :disabled="!msg">
发送
</el-button>
</div>
</el-card>
</div>
</template>
<script>
export default {
name: "ClientMqChat",
data() {
return {
msg: '',
chatMsgList: [],
username: sessionStorage.getItem("username"),
friend: [],
online: 0,
active: '',
receiver: 'super_admin',
userList: [],
activeName: 'online',
msgNotify:[],
}
},
created() {
this.getWebSocketAddress();
},
methods: {
//tab切换
handleClick(tab, event) {
const _this = this;
if (tab.name === 'online') {
if (!_this.active) {
if (_this.online > 0) {
_this.active = _this.friend[0];
_this.activeName = 'online';
_this.receiver = _this.active;
_this.getHistoryChat(_this.receiver);
} else {
if (_this.userList.length > 0) {
_this.active = _this.userList[0];
_this.activeName = 'all';
_this.receiver = _this.active;
_this.getHistoryChat(_this.receiver);
}
}
}
}
if (tab.name === 'all') {
if (!_this.active) {
if (_this.online > 0) {
_this.active = _this.friend[0];
_this.activeName = 'online';
_this.receiver = _this.active;
_this.getHistoryChat(_this.receiver);
} else {
if (_this.userList.length > 0) {
_this.active = _this.userList[0];
_this.activeName = 'all';
_this.receiver = _this.active;
_this.getHistoryChat(_this.receiver);
}
}
}
}
},
//切换用户
switchUser(data) {
if (this.active === data) {
return;
}
this.active = data;
this.receiver = data;
//获取历史聊天记录
this.getHistoryChat(this.receiver);
this.msgNotify = this.msgNotify.filter(item => item != this.active);
},
//获取历史聊天记录
getHistoryChat(data) {
this.$http.get('/test/mq/chatMsg/getHistoryChat/' + data).then(res => {
if (res.data.code === 1) {
this.chatMsgList = res.data.data;
this.flushScroll();
} else {
this.$message.warning(res.data.msg);
}
}).catch(error => {
this.$message.error(error);
});
},
//获取websocket地址
getWebSocketAddress() {
this.$http.get('/test/mq/chatMsg/getWebSocketAddress/' + this.username).then(res => {
if (res.data.code === 1) {
if ('WebSocket' in window) {
this.websocket = new WebSocket(res.data.data);
this.initWebSocket();
if (this.username != 'super_admin') {
this.getHistoryChat(this.username);
}
} else {
this.$message.warning('当前浏览器不支持websocket创建!');
}
} else {
this.$message.warning(res.data.msg);
}
}).catch(error => {
this.$message.error(error);
});
},
//初始化websocket
initWebSocket() {
const _this = this;
_this.websocket.onerror = function (event) {
_this.$message.error('服务端连接错误!');
}
_this.websocket.onopen = function (event) {
_this.$message.success("连接成功!");
}
_this.websocket.onmessage = function (event) {
let res = JSON.parse(event.data);
if (res.key === 1) {
_this.userList = res.userList;
_this.friend = res.onlineList;
_this.online = _this.friend.length;
if (!_this.active) {
if (_this.online > 0) {
_this.active = _this.friend[0];
_this.activeName = 'online';
_this.receiver = _this.active;
_this.getHistoryChat(_this.receiver);
} else {
if (_this.userList.length > 0) {
_this.active = _this.userList[0];
_this.activeName = 'all';
_this.receiver = _this.active;
_this.getHistoryChat(_this.receiver);
}
}
}
}
if (res.key === 2) {
_this.userList = res.userList;
_this.friend = res.onlineList;
_this.online = _this.friend.length;
if (!_this.active) {
if (_this.online > 0) {
_this.active = _this.friend[0];
_this.activeName = 'online';
_this.receiver = _this.active;
_this.getHistoryChat(_this.receiver);
} else {
if (_this.userList.length > 0) {
_this.active = _this.userList[0];
_this.activeName = 'all';
_this.receiver = _this.active;
_this.getHistoryChat(_this.receiver);
}
}
}
}
if (res.key === 3) {
if (_this.username === res.data.sender) {
_this.chatMsgList.push(res.data);
_this.flushScroll();
} else {
if (res.data.sender === 'super_admin') {
_this.chatMsgList.push(res.data);
_this.flushScroll();
} else {
if (res.data.sender === _this.active) {
_this.chatMsgList.push(res.data);
_this.flushScroll();
} else {
//发送其它用户处理
_this.msgNotify.push(res.data.sender);
}
}
}
}
}
_this.websocket.onclose = function (event) {
_this.$message.warning('服务端已关闭!');
}
},
//发送消息
sendMsg() {
if (this.msg.trim().length === 0) {
this.$message.warning('不能发送空消息!');
return;
}
let chatMsg = {};
chatMsg.msg = this.msg;
chatMsg.sender = this.username;
chatMsg.createTime = new Date();
chatMsg.receiver = this.receiver;
this.websocket.send(JSON.stringify(chatMsg));
this.msg = '';
this.flushScroll();
},
//刷新滚动条
flushScroll() {
let content = this.$refs.content;
setTimeout(() => {
content.scrollTop = content.scrollHeight;
}, 100);
},
}
}
</script>
<style scoped lang="scss">
.container {
padding-top: 50px;
.box-card {
margin: auto;
width: 800px;
height: 800px;
max-height: 900px;
::v-deep .el-card__header {
background: #867ba9 !important;
border-bottom: none;
padding: 0;
}
::v-deep .el-card__body {
padding: 0px !important;
position: relative;
.content {
height: 600px;
background: #ddd;
overflow-y: auto;
.el-divider--horizontal {
margin: 0;
}
.active {
color: #0080ff;
}
.mark {
background: #deb068;
}
.item {
margin-top: 10px;
margin-right: 10px;
}
}
.operate {
padding: 5px 15px;
}
}
}
}
</style>
- 启动前后端项目,分别使用客服账号和客户账号登录聊天室
结语
至此,关于实现微服务的websocket聊天室到这里就结束了,下期见。。。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)