客户端请求websocket接口,连接通道=》我这边业务成功客户端发消息=》客户端自动刷新。
接口:ws://localhost:8080/websocket/xx
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author Administrator
*/
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
import cn.hutool.core.util.StrUtil;
import xx.Constant;
import xx.utils.JwtHelper;
import xx.utils.RedisUtil;
import xx.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@Slf4j
@ServerEndpoint("/websocket/{key}")
public class WebSocketService {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
private static CopyOnWriteArraySet<WebSocketService> webSockets = new CopyOnWriteArraySet<>();
//用来存放每个客户端对应的websocket对象
private static Map<String,Session> sessionPool = new HashMap<>();
private String userIdStr;
private static WebSocketService webSocketService;
@PostConstruct
public void init(){
webSocketService = this;
}
/**
* 连接成功后调用的方法
* @param session
* @param key
*/
@OnOpen
public void onOpen(Session session,@PathParam("key") String key){
//key作为前端传给后端的token
this.session = session;
//从token中获取到的userId当作区分websocket客户端的key
String userIdStr = JwtHelper.verifyTokenAndGetUserId(key);
this.userIdStr = userIdStr;
sessionPool.put(userIdStr,session);
if (sessionPool.get(userIdStr) == null){
webSockets.add(this);
}
webSockets.add(this);
log.info("websocket连接成功");
log.info("websocket有新的连接,连接总数为"+webSockets.size());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(){
sessionPool.remove(userIdStr);
webSockets.remove(this);
log.info("websocket连接关闭");
}
/**
* 收到客户端消息后调用的方法,根据业务要求进行处理,这里就简单地将收到的消息直接群发推送出去
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message){
String key = Constant.BID_SUCCESS+userIdStr;
if (StrUtil.isNotBlank(message)){
if(message.equals("heartbeat")){
sendTextMessage(key,"已连接");
}else{
RedisUtil redisUtil = SpringUtil.getBean(RedisUtil.class);
//根据业务条件判断发送消息
String result = String.valueOf(redisUtil.get(key));
if (StrUtil.isNotBlank(result) && result.equals(Constant.SUCCESS)){
sendAllMessage("ok");
//发送之后删除key
redisUtil.delete(key);
}
}
}
log.info("WebSocket收到客户端消息:"+message);
}
/**
* 实现服务器主动推送消息
* @param key
* @param message
*/
private void sendTextMessage(String key, String message) {
Session session = sessionPool.get(key);
if (session != null){
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void sendAllMessage(String message){
log.info("websocket消息全部人员消息:"+message);
for (WebSocketService apiWebSocketService:webSockets){
try{
if (apiWebSocketService.session.isOpen()){
apiWebSocketService.session.getAsyncRemote().sendText(message);
}
}catch (Exception e){
e.printStackTrace();
log.error("全部人员发送消息失败:",e.getMessage());
}
}
}
/**
* 发生错误时的回调函数
* @param session
* @param error
*/
@OnError
public void onError(Session session,Throwable error){
log.error("发生错误");
error.printStackTrace();
}
}
经测试,成功
如果是线上服务器连接,则需要在nginx里配置websocket相关内容,再重启nginx,代码如下
server {
listen 443 ssl;
server_name api.kadecard.com;
ssl_certificate cert.pem;
ssl_certificate_key cert.key;
ssl_session_cache shared:SSL:1m;
ssl_session_timeout 5m;
# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_prefer_server_ciphers on;
location / {
proxy_pass http://xx:7878/;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
#websocket 这里的websocket是后端websocket接口的接口名称
location /websocket {
rewrite ^/ws$ / break;
rewrite ^/ws(.*)$ $1 break;
proxy_pass http://127.0.0.1:7878;
proxy_http_version 1.1; #websoket必须要使用的协议,http 1.1
proxy_set_header Upgrade $http_upgrade; #要使用websocket协议时,响应http升级请求
proxy_set_header Connection "upgrade";
proxy_set_header X-real-ip $remote_addr;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_read_timeout 600s;
}
}
本地连接的时候用的是ws://,因为是http链接,但是如果是服务器连接,且支持https连接方式,则就需要用wss://的连接方式。