WebSocket服务端消息推送

2023-05-16

**前言:**移动互联网蓬勃发展的今天,大部分手机 APP和网站都提供了消息推送功能,如新闻客户端的热点新闻推荐,IM 工具的聊天消息提醒,电商产品促销信息,企业应用的通知和审批流程等等。推送对于提高产品活跃度、提高功能模块使用率、提升用户粘性、提升用户留存率起到了重要作用,作为 APP 和网站运营中一个关键的渠道,对消息推送的合理运用能有效促进目标的实现。

一、浅析web端的消息推送原理

股票曲线实时变化,在线IM聊天等等,Web系统里总是能见到消息推送的应用。消息推送用好了能增强用户体验,实现消息推送有N种解决方案。

1.1、什么是消息推送

消息推送(Push)指运营人员通过自己的产品或第三方工具对用户当前网页或移动设备进行的主动消息推送。用户可以在网页上或移动设备锁定屏幕和通知栏看到push消息通知。以此来实现用户的多层次需求,使得用户能够自己设定所需要的信息频道,得到即时消息,简单说就是一种定制信息的实现方式。我们平时浏览邮箱时突然弹出消息提示收到新邮件就属于web端消息推送,在手机锁屏上看到的微信消息等等都属于APP消息推送。

Web网站推送:

当我们在浏览网站观望犹豫时,突然看到了系统发来一条消息,一位神秘的神豪老板竟然爆出了麻痹戒指!!!我的天,于是我果断开始了游戏!这消息很及时!

APP移动推送:

上述两种经典场景,是生活中比较常见的场景,也引出了两大推送种类,Web端消息推送和移动端消息推送。本篇博客主要介绍Web推送,顺便提一句移动端App常见第三方推送SDK有极光推送、小米推送等等。

****1.2、****Web端实现消息推送的四种方式

主要介绍web端其中的四种实现方式:短轮询、Comet长轮询、Server-sent、WebSocket。

(1)短轮询

指在特定的的时间间隔(如每10秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客户端的浏览器。浏览器做处理后进行显示。无论后端此时是否有新的消息产生,都会进行响应。字面上看,这种方式是最简单的。这种方式的优点是,后端编写非常简单,逻辑不复杂。但是缺点是请求中大部分中是无用的,浪费了带宽和服务器资源。总结来说,简单粗暴,适用于小型(偷懒)应用。

(2)Comet长轮询

长轮询是客户端向服务器发送Ajax请求,服务器接到请求后hold住连接,直到有新消息才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求;长连接是在页面中的iframe发送请求到服务端,服务端hold住请求并不断将需要返回前端的数据封装成调用javascript函数的形式响应到前端,前端不断收到响应并处理。Comet的实现原理和短轮询相比,很明显少了很多无用请求,减少了带宽压力,实现起来比短轮询复杂一丢丢。想比用短轮询的同学有梦想时,就可以用Comet来实现自己的推送。

长轮询的优点很明显,在无消息的情况下不会频繁的请求,耗费资小并且实现了服务端主动向前端推送的功能,但是服务器hold连接会消耗资源,返回数据顺序无保证,难于管理维护。WebQQ(好像挂了)就是这样实现的。

(3)Server-sent

服务器推指的是HTML5规范中提供的服务端事件EventSource,浏览器在实现了该规范的前提下创建一个EventSource连接后,便可收到服务端的发送的消息,实现一个单向通信。客户端进行监听,并对响应的信息处理显示。该种方式已经实现了服务端主动推送至前端的功能。优点是在单项传输数据的场景中完全满足需求,开发人员扩展起来基本不需要改后端代码,直接用现有框架和技术就可以集成。

(4)WebSocket

WebSocket是HTML5下一种新的协议,是基于TCP的应用层协议,只需要一次连接,便可以实现全双工通信,客户端和服务端可以相互主动发送消息。客户端进行监听,并对响应的消息处理显示。

这个技术相信基本都听说过,就算没写过代码,也大概知道干嘛的。通过名字就能知道,这是一个Socket连接,一个能在浏览器上用的Socket连接。WebSocket是HTML5标准中的一个内容,浏览器通过javascript脚本手动创建一个TCP连接与服务端进行通讯。优点是双向通信,都可以主动发送消息,既可以满足“问”+“答”的响应机制,也可以实现主动推送的功能。缺点就是编码相对来说会多点,服务端处理更复杂(我觉得当一条有情怀的咸鱼就应该用这个!)。

1.3、实现个性化的推送

上面说了很多实现方案,针对自己系统的应用场景选择合适的推送方案才是合理的,因此最后简单说一下实现个性化推送的两种方式。第一种很简单,直接使用第三方实现的推送,无需复杂的开发运维,直接可以使用。第二种就是自己封装,可以选择如今较为火热的WebSocket来实现系统的消息推送。

①直接用第三方的消息推送服务(并发量多了会收费)

在这里推荐一个第三方推送平台,GoEasy。

推荐理由是GoEasy的理念符合我们的选择(可参考http://t.cn/Ex6jg3q):

(1)更简单的方式将消息从服务器端推送至客户端
(2)更简单的方式将消息从各种客户端推送至客户端

GoEasy具体的使用方式这里不再赘述,详见官网。对于后端后端开发者,可直接使用Rest方式调用推送,对于前端或web开发者,可以从web客户端用javascript脚本进行调用推送。

②封装自己的推送服务

如果是一个老系统进行扩展,那么更推荐使用Server-sent,服务端改动量不会很大。如果是新系统,更推荐websocket,实现的功能功能更全面。

我们如果需要使用websocket技术实现自己的推送服务,需要注意哪些点,或者说需要踩哪些坑呢,本文列出几点供大家参考:

长连接的心跳激活处理;

服务端调优实现高并发量client同时在线(单机服务器可以实现百万并发长连接);

群发消息;

服务端维持多用户的状态;

从WebSocket中获取HttpSession进行用户相关操作;

等等等….


二、WebSocket简介

2.1、websocket的由来

我们经常用的是HTTP协议,而HTTP协议是一种无状态的协议,要实现有状态的会话必须借助一些外部机制如session/cookie或者Token,这或多或少会带来一些不便,尤其是服务端和客户端需要实时交换数据的时候(监控,聊天),这个问题更加明显。为了适应这种环境,websocket就产生了,目的是即时通讯,替代轮询。

2.2、websocket概述

WebSocket 是HTML5一种新的协议。它实现了浏览器与服务器全双工通信(full-duplex)。一开始的握手需要借助HTTP请求完成。WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

websocket的特点或作用

  • 允许服务端主动向客户端推送数据

  • 在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

websocket使用的优点

  • 更强的实时性

  • 保持连接状态,创建一次连接后,之后通信时可以省略部分状态信息。较少的控制开销,在连接创建后,服务器和客户端之间交换数据时,用于协议控制的数据包头部相对较小。在不包含扩展的情况下,对于服务器到客户端的内容,此头部大小只有2至10字节(和数据包长度有关);对于客户端到服务器的内容,此头部还需要加上额外的4字节的掩码。相对于HTTP请求每次都要携带完整的头部,此项开销显著减少了

websocket使用的缺点

由于websocket使用的持久连接,与服务器一直保持连接,对服务器压力很大

2.3、websocket请求头和响应头

浏览器发送websocket请求头类似如下:

下面是对请求头部解释(比http协议多了Upgrade和Connection,是告诉服务器包协议设计ws):

  • Accept-Encoding:浏览器可以接受的数据的压缩类型。

  • Accept-Language:浏览器可以接受的语言类型。

  • Cache-Control:no-cache不使用强缓存。

  • Connection:Upgrade 通知服务器通信协议提升。

  • Host:主机名。

  • Origin:用于验证浏览器域名是否在服务器许可范围内。

  • Pragma:no-cache HTTP/1.0定义的不使用本地缓存。

  • Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits

  • Sec-WebSocket-Key:lb69kw8CsB4CrSk9tKa3 g==
    握手协议密钥,base64编码的16字节的随机字符串。

  • Sec-WebSocket-Version:13 版本号。

  • Upgrade:websocket 使用websocket协议进行传输数据,而不使用HTTP/1.1。

  • User-Agent:用户代理字符串。

服务器接收到客户端请求后做出响应并返回如下:

下面是服务器返回的头部解释:

  • Connection:Upgrade 通信协议提升。

  • Date:通信时间

  • Upgrade: websocket 传输协议升级为websocket。

  • Sec-WebSocket-Extensions:permessage-deflate

  • Sec-WebSocket-Accept:q9g5u1WfIWaAjNgMmjlTQTqkS/k=
    将Sec-WebSocket-Key的值进行一定的运算和该值进行比较来判断是否是目标服务器响应了WebSocket请求。

  • Upgrade: 使用websocket协议进行数据传输

2.4、WebSocket和Socket的区别

短答案:就像Java和JavaScript,并没有什么太大的关系,但又不能说完全没关系。可以这么说:

  • 命名方面,Socket是一个深入人心的概念,WebSocket借用了这一概念;

  • 使用方面,完全两个东西。

Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口(不是协议,为了方便使用TCP或UDP而抽象出来的一层,是位于应用层和传输控制层之间的一组接口)。

WebSocket是应用层协议。

2.5、向指定用户发送WebSocket消息并处理对方不在线的情况

给指定用户发送消息:

  • 如果接收者在线,则直接发送消息;

  • 否则将消息存储到redis,等用户上线后主动拉取未读消息。

2.6、WebSocket心跳机制

在使用WebSocket的过程中,有时候会遇到网络异常断开的情况,但是在网络断开的时候服务器端并没有触发onclose的事件。这样会有:服务器会继续向客户端发送多余的连接,并且这些数据还会丢失。所以就需要一种机制来检测客户端和服务端是否处于正常的连接状态,因此就有了WebSocket的心跳机制了。还有心跳,说明还活着,没有心跳说明已经挂掉了。

心跳机制

心跳机制是每隔一段时间会向服务器发送一个数据包,告诉服务器自己还活着,同时客户端会确认服务器端是否还活着,如果还活着的话,就会回传一个数据包给客户端来确定服务器端也还活着,否则的话,有可能是网络断开连接了,需要重连。

2.7、Netty可以实现WebSocket

Netty是由jboss提供的一款开源框架,常用于搭建RPC中的TCP服务器和WebSocket服务器,甚至是类似Tomcat的web服务器,反正就是各种网络服务器,在处理高并发的项目中,功能丰富且性能良好,基于Java中NIO的二次封装,具有比原生NIO更好更稳健的体验。


三、基于Netty实现WebSocket消息推送

因为产品需求,要实现服务端推送消息至客户端,并且支持客户端对用户点对点消息发送的社交功能。服务端给客户端推送消息,可以选择原生的WebSocket,或者更加高级的Netty框架实现。

在此我极力推荐netty,因为一款好的框架一般都是在原生的基础上进行包装成更加实用方便,很多我们需要自己考虑的问题都基本可以不用去考虑,不过此文不会去讲netty有多么的高深莫测,因为这些概念性的东西随处可见,而是通过实战来达到推送消息的目的。

这个小节,我们主要讲解下如何整合Netty和WebSocket。我们需要使用netty对接websocket连接,实现双向通信,这一步需要有服务端的netty程序,用来处理客户端的websocket连接操作,例如建立连接,断开连接,收发数据等。

WebSocket消息推送实现思路:

前端使用WebSocket与服务端创建连接的时候,将用户ID传给服务端,服务端将用户ID与channel关联起来存储,同时将channel放入到channel组中。

如果需要给所有用户发送消息,直接执行channel组的writeAndFlush()方法;

如果需要给指定用户发送消息,根据用户ID查询到对应的channel,然后执行writeAndFlush()方法;

前端获取到服务端推送的消息之后,将消息内容展示到文本域中

下面是具体的代码实现,基本上每一步操作都配有注释说明,配合注释看应该还是比较容易理解的。

3.1、引入Netty的依赖

netty-all包含了netty的所有封装,hutool-all封装了常用的一些依赖,如Json相关

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.33.Final</version>
</dependency>

<dependency>
	<groupId>cn.hutool</groupId>
	<artifactId>hutool-all</artifactId>
	<version>5.2.3</version>
</dependency>

3.2、修改配置文件application.yml

server:
  port: 8899

#netty的配置信息(端口号,webSocket路径)
webSocket:
  netty:
    port: 58080
    path: /webSocket
    readerIdleTime: 30 #读空闲超时时间设置(Netty心跳检测配置)
    writerIdleTime: 30 #写空闲超时时间设置(Netty心跳检测配置)
    allIdleTime: 30 #读写空闲超时时间设置(Netty心跳检测配置)

3.3、创建NettyConfig

在NettyConfig中定义一个单例的channel组,管理所有的channel,再定义一个map,管理用户与channel的对应关系**。**

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;

/**
 * NettyConfig类
 *
 * @author hs
 * @date 2021-09-18
 */
public class NettyConfig {

    /**
     * 定义一个channel组,管理所有的channel
     * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 存放用户与Chanel的对应信息,用于给指定用户发送消息
     */
    private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();

    private NettyConfig() {}

    /**
     * 获取channel组
     * @return
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }

    /**
     * 获取用户channel map
     * @return
     */
    public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
        return userChannelMap;
    }
}

3.4、创建Netty的初始化类NettyServer(重点)

定义两个EventLoopGroup,bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之间的读写操作,需要说明的是,需要开启一个新的线程来执行netty server,要不然会阻塞主线程,到时候就无法调用项目的其他controller接口了。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

/**
 * Netty初始化服务
 *
 * @author hs
 */
@Component
public class NettyServer{

    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * webSocket协议名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * 端口号
     */
    @Value("${webSocket.netty.port}")
    private int port;

    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path}")
    private String webSocketPath;

    /**
     * 在Netty心跳检测中配置 - 读空闲超时时间设置
     */
    @Value("${webSocket.netty.readerIdleTime}")
    private long readerIdleTime;

    /**
     * 在Netty心跳检测中配置 - 写空闲超时时间设置
     */
    @Value("${webSocket.netty.writerIdleTime}")
    private long writerIdleTime;

    /**
     * 在Netty心跳检测中配置 - 读写空闲超时时间设置
     */
    @Value("${webSocket.netty.allIdleTime}")
    private long allIdleTime;

    @Autowired
    private WebSocketHandler webSocketHandler;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;

    /**
     * 启动
     * @throws InterruptedException
     */
    private void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
        bootstrap.group(bossGroup,workGroup);
        // 设置NIO类型的channel
        bootstrap.channel(NioServerSocketChannel.class);
        // 设置监听端口
        bootstrap.localAddress(new InetSocketAddress(port));
        // 连接到达时会创建一个通道
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 心跳检测(一般情况第一个设置,如果超时了,则会调用userEventTriggered方法,且会告诉你超时的类型)
                ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
                // 流水线管理通道中的处理程序(Handler),用来处理业务
                // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
                ch.pipeline().addLast(new HttpServerCodec());
                ch.pipeline().addLast(new ObjectEncoder());
                // 以块的方式来写的处理器
                ch.pipeline().addLast(new ChunkedWriteHandler());
                /*
                    说明:
                    1、http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合
                    2、这就是为什么,当浏览器发送大量数据时,就会发送多次http请求
                 */
                ch.pipeline().addLast(new HttpObjectAggregator(8192));
                /*
                    说明:
                    1、对应webSocket,它的数据是以帧(frame)的形式传递
                    2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri
                    3、核心功能是将http协议升级为ws协议,保持长连接
                */
                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
                // 自定义的handler,处理业务逻辑
                ch.pipeline().addLast(webSocketHandler);
            }
        });
        // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
        ChannelFuture channelFuture = bootstrap.bind().sync();
        log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
        // 对关闭通道进行监听
        channelFuture.channel().closeFuture().sync();
    }

    /**
     * 释放资源
     * @throws InterruptedException
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if(bossGroup != null){
            bossGroup.shutdownGracefully().sync();
        }
        if(workGroup != null){
            workGroup.shutdownGracefully().sync();
        }
    }

    /**
     * 初始化(新线程开启)
     */
    @PostConstruct()
    public void init() {
        //需要开启一个新的线程来执行netty server 服务器
        new Thread(() -> {
            try {
                start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

**注意:**启动方法需要开启一个新线程执行netty server服务,服务中配置了IdleStateHandler心跳检测,此类要在创建一个通道的第一个设置,如果超时了,则会调用userEventTriggered方法,且会告诉你超时的类型)

3.5、具体实现业务的WebSocketHandler(重点)

创建Netty配置的操作执行类WebSocketHandler,userEventTriggered为心跳检测超时所调用的方法,超时后ctx.channel().close()执行完毕会主动调用handlerRemoved删除通道及用户信息。

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;


/**
 * 操作执行类
 *
 * TextWebSocketFrame类型,表示一个文本帧
 * @author hs
 */
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class);

    /**
     * 一旦连接,第一个被执行
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded 被调用"+ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        // 当用户ID已存入通道内,则不进行写入,只有第一次建立连接时才会存入,其他情况发送uid则为心跳需求
        if(!NettyConfig.getUserChannelMap().containsKey(uid)){
            log.info("服务器收到消息:{}",msg.text());
            NettyConfig.getUserChannelMap().put(uid,ctx.channel());
            // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
            AttributeKey<String> key = AttributeKey.valueOf("userId");
            ctx.channel().attr(key).setIfAbsent(uid);
            // 回复消息
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器连接成功!"));
        }else{
            // 前端定时请求,保持心跳连接,避免服务端误删通道
            ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
        }
    }

    /**
     * 移除通道及关联用户
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    /**
     * 异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}",cause.getMessage());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 心跳检测相关方法 - 会主动调用handlerRemoved
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if(event.state() == IdleState.ALL_IDLE){
                //清除超时会话
                ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
                writeAndFlush.addListener(new ChannelFutureListener() {

                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        ctx.channel().close();
                    }
                });
            }
        }else{
            super.userEventTriggered(ctx, evt);
        }
    }

    /**
     * 删除用户与channel的对应关系
     * @param ctx
     */
    private void removeUserId(ChannelHandlerContext ctx){
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getUserChannelMap().remove(userId);
        log.info("删除用户与channel的对应关系,uid:{}",userId);
    }
}

3.6、具体消息推送的接口

/**
 * 推送消息接口
 *
 * @author hs
 */
public interface PushService {

    /**
     * 推送给指定用户
     * @param userId 用户ID
     * @param msg 消息信息
     */
    void pushMsgToOne(String userId,String msg);

    /**
     * 推送给所有用户
     * @param msg 消息信息
     */
    void pushMsgToAll(String msg);

    /**
     * 获取当前连接数
     * @return 连接数
     */
    int getConnectCount();
}

3.7、消息推送接口实现类

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 推送消息接口实现类
 *
 * @author hs
 */
@Service
public class PushServiceImpl implements PushService {

    /**
     * 推送给指定用户
     * @param userId 用户ID
     * @param msg 消息信息
     */
    @Override
    public void pushMsgToOne(String userId, String msg){
        ConcurrentHashMap<String, Channel> userChannelMap = NettyConfig.getUserChannelMap();
        Channel channel = userChannelMap.get(userId);
        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }

    /**
     * 推送给所有用户
     * @param msg 消息信息
     */
    @Override
    public void pushMsgToAll(String msg){
        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }

    /**
     * 获取当前连接数
     * @return 连接数
     */
    @Override
    public int getConnectCount() {
        return NettyConfig.getChannelGroup().size();
    }
}

3.8、提供消息推送服务的Controller

主要为了测试

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * 请求Controller(用于postman测试)
 *
 * @author hs
 */
@RestController
@RequestMapping("/push")
public class PushController {

    @Autowired
    private PushService pushService;

    /**
     * 推送给所有用户
     * @param msg 消息信息
     */
    @PostMapping("/pushAll")
    public void pushToAll(@RequestParam("msg") String msg){
        pushService.pushMsgToAll(msg);
    }

    /**
     * 推送给指定用户
     * @param userId 用户ID
     * @param msg 消息信息
     */
    @PostMapping("/pushOne")
    public void pushMsgToOne(@RequestParam("userId") String userId,@RequestParam("msg") String msg){
        pushService.pushMsgToOne(userId,msg);
    }

    /**
     * 获取当前连接数
     */
    @GetMapping("/getConnectCount")
    public int getConnectCout(){
        return pushService.getConnectCount();
    }
}

3.9、Web前端通过websocket与服务端连接

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script src="/static/jquery-2.2.4.min.js" charset="utf-8"></script>
</head>
<body>
<script>
    var socket;
    var userId = "123456";
    // 判断当前浏览器是否支持webSocket
    if(window.WebSocket){
        socket = new WebSocket("ws://127.0.0.1:58080/webSocket")
        // 相当于channel的read事件,ev 收到服务器回送的消息
        socket.onmessage = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "
" + ev.data;
        }
        // 相当于连接开启
        socket.onopen = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = "连接开启了..."
            socket.send(
                JSON.stringify({
                    // 连接成功将,用户ID传给服务端
                    uid: userId
                })
            );
        }

        //接受到服务端关闭连接时的回调方法
        socket.onclose = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "
" + "连接关闭了...";
        }

   // 监听窗口事件,当窗口关闭时,主动断开websocket连接,防止连接没断开就关闭窗口,server端报错
         window.onbeforeunload = function(){
            socket.close();
         }

    }
    else
    {
        alert("当前浏览器不支持webSocket")
    }

    // 如果前端需要保持连接,则需要定时往服务器针对自己发送请求,返回的参数和发送参数一致则证明时间段内有交互,服务端则不进行连接断开操作
    var int = self.setInterval("clock()",10000);
    function clock() {
        socket.send(
            JSON.stringify({
                // 连接成功将,用户ID传给服务端
                uid: userId
            })
        );
    }

</script>
<form onsubmit="return false">
    <textarea id="responseText" style="height: 150px; width: 300px;"></textarea>
    <br>
    <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>

3.10、WebSocket断开的原因

原因有很多,最好在WebSocket断开时,将错误打印出来。

ws.onclose = function (ev) {
  console.log('websocket 断开: ' + ev.code + ' ' + ev.reason + ' ' + ev.wasClean)
  console.log(ev)
}

错误状态码:

WebSocket断开时,会触发CloseEvent, CloseEvent会在连接关闭时发送给使用 WebSockets 的客户端. 它在 WebSocket 对象的 onclose 事件监听器中使用。CloseEvent的code字段表示了WebSocket断开的原因。可以从该字段中分析断开的原因。

CloseEvent有三个字段需要注意, 通过分析这三个字段,一般就可以找到断开原因

  • CloseEvent.code: code是错误码,是整数类型

  • CloseEvent.reason: reason是断开原因,是字符串

  • CloseEvent.wasClean: wasClean表示是否正常断开,是布尔值。一般异常断开时,该值为false

状态码

名称

描述

0–999

保留段, 未使用.

1000

CLOSE_NORMAL

正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.

1001

CLOSE_GOING_AWAY

终端离开, 可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开.

1002

CLOSE_PROTOCOL_ERROR

由于协议错误而中断连接.

1003

CLOSE_UNSUPPORTED

由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据).

1004

保留. 其意义可能会在未来定义.

1005

CLOSE_NO_STATUS

保留. 表示没有收到预期的状态码.

1006

CLOSE_ABNORMAL

保留. 用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧).

1007

Unsupported Data

由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).

1008

Policy Violation

由于收到不符合约定的数据而断开连接. 这是一个通用状态码, 用于不适合使用 1003 和 1009 状态码的场景.

1009

CLOSE_TOO_LARGE

由于收到过大的数据帧而断开连接.

1010

Missing Extension

客户端期望服务器商定一个或多个拓展, 但服务器没有处理, 因此客户端断开连接.

1011

Internal Error

客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.

1012

Service Restart

服务器由于重启而断开连接.

1013

Try Again Later

服务器由于临时原因断开连接, 如服务器过载因此断开一部分客户端连接.

1014

由 WebSocket标准保留以便未来使用.

1015

TLS Handshake

保留. 表示连接由于无法完成 TLS 握手而关闭 (例如无法验证服务器证书).

1016–1999

由 WebSocket标准保留以便未来使用.

2000–2999

由 WebSocket拓展保留使用.

3000–3999

可以由库或框架使用. 不应由应用使用. 可以在 IANA 注册, 先到先得.

4000–4999

可以由应用使用.


四、WebSocket和Http之长连接和短连接区别

4.1、HTTP1.0、HTTP1.1 和 HTTP2.0 的区别

HTTP是一个应用层协议,无状态的,端口号为80。主要的版本有1.0/1.1/2.0.

(1) HTTP/1.0

一次请求-响应,建立一个连接,用完关闭;

(2) HTTP/1.1

HTTP 1.1支持长连接(PersistentConnection)和请求的流水线(Pipelining)处理

串行化单线程处理,可以同时在同一个tcp链接上发送多个请求,但是只有响应是有顺序的,只 有上一个请求完成后,下一个才能响应。一旦有任务处理超时等,后续任务只能被阻塞(线头阻塞);

(3)HTTP/2

**HTTP2支持多路复用,**所以通过同一个连接实现多个http请求传输变成了可能。请求并行执行,某任务耗时严重,不会影响到任务正常执行。

4.2、什么是websocket?

Websocket是html5提出的一个协议规范,是为解决客户端与服务端实时通信。本质上是一个基于tcp,先通过HTTP/HTTPS协议发起一条特殊的http请求进行握手后创建一个用于交换数据的TCP连接。

WebSocket优势: 浏览器和服务器只需要要做一个握手的动作,在建立连接之后,双方可以在任意时刻相互推送信息。同时,服务器与客户端之间交换的头信息很小。

4.3、什么是Http长连接和短连接

在HTTP/1.0中默认使用短连接。也就是说,客户端和服务器每进行一次HTTP操作,就建立一次连接,任务结束就中断TCP连接。当客户端浏览器访问的某个HTML或其他类型的Web页中包含有其他的Web资源(如JavaScript文件、图像文件、CSS文件等),每遇到这样一个Web资源,浏览器就会重新建立一个HTTP会话。

而从HTTP/1.1起,默认使用长连接,用以保持连接特性。使用长连接的HTTP协议,会在响应头加入这行代码:

Connection:keep-alive

在使用长连接的情况下,当一个网页打开完成后,客户端和服务器之间用于传输HTTP数据的TCP连接不会关闭,客户端再次访问这个服务器时,会继续使用这一条已经建立的连接。Keep-Alive不会永久保持连接,它有一个保持时间,可以在不同的服务器软件(如Apache)中设定这个时间。实现长连接需要客户端和服务端都支持长连接。

HTTP协议的长连接和短连接,实质上是TCP协议的长连接和短连接。

4.4、http和websocket的长连接区别

HTTP1.1通过使用Connection:keep-alive进行长连接,HTTP 1.1默认进行持久连接。在一次 TCP 连接中可以完成多个 HTTP 请求,但是对每个请求仍然要单独发 header,Keep-Alive不会永久保持连接,它有一个保持时间,可以在不同的服务器软件(如Apache)中设定这个时间。这种长连接是一种“伪链接”

websocket的长连接,是一个真的全双工。长连接第一次tcp链路建立之后,后续数据可以双方都进行发送,不需要发送请求头。

keep-alive双方并没有建立正真的连接会话,服务端可以在任何一次请求完成后关闭。WebSocket 它本身就规定了是正真的、双工的长连接,两边都必须要维持住连接的状态。

4.5、HTTP2.0和HTTP1.X相比的新特性

  • 新的二进制格式(Binary Format),HTTP1.x的解析是基于文本。基于文本协议的格式解析存在天然缺陷,文本的表现形式有多样性,要做到健壮性考虑的场景必然很多,二进制则不同,只认0和1的组合。基于这种考虑HTTP2.0的协议解析决定采用二进制格式,实现方便且健壮。

  • 多路复用(MultiPlexing),即连接共享,即每一个request都是是用作连接共享机制的。一个request对应一个id,这样一个连接上可以有多个request,每个连接的request可以随机的混杂在一起,接收方可以根据request的 id将request再归属到各自不同的服务端请求里面。

  • header压缩,如上文中所言,对前面提到过HTTP1.x的header带有大量信息,而且每次都要重复发送,HTTP2.0使用encoder来减少需要传输的header大小,通讯双方各自cache一份header fields表,既避免了重复header的传输,又减小了需要传输的大小。

  • 服务端推送(server push),同SPDY一样,HTTP2.0也具有server push功能。


参考链接:

WebSocket使用

SpringBoot+WebSocket+Netty实现消息推送

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

WebSocket服务端消息推送 的相关文章

  • 如何使用Lua脚本语言打开Web套接字?

    作为初学者 我想在基于 Linux 的服务器上使用 Lua 打开一个 Web 套接字 该服务器应允许 Android 客户端连接到它 你能给我一些用Lua打开网络套接字的示例代码吗 您两周前已经问过同样的问题并得到了回答 LUA 脚本 We
  • Websocket 在客户端返回 500,在服务器返回 101

    我们尝试使用 nginx ingress 控制器在 Kubernetes 集群上实现 WebSocket 入口 yaml apiVersion extensions v1beta1 kind Ingress metadata annotat
  • Python 的 SignalR 替代方案

    Python 世界中 SignalR 的替代方案是什么 准确地说 我在Windows 8上使用tornado和python 2 7 6 我发现sockjs龙卷风 https github com MrJoes sockjs tornado
  • NodeJS Websocket如何在服务器重新启动时重新连接

    在 Node js 中我使用网络套接字 ws https github com websockets ws用于 WebSocket 连接 以下是客户端的代码 假设我们正在连接的服务器套接字宕机了一分钟 close 事件将会触发 但是每当服务
  • Websocket、Angular 2 和 JSON Web 令牌身份验证

    我的 Angular 2 应用程序 用打字稿编码 有一个简单的身份验证方案 用户登录 服务器返回 JSON Web 令牌 JWT abc123 在每次 API 调用时 应用程序都会将 JWT 发送到Authorization header
  • Mosquitto 1.4.2 Websocket 支持

    我正在尝试利用 Mosquittos 最近的更新来支持代理中的 websocket 我正在运行 Mosquitto v1 4 2 并将以下几行添加到 mosquitto 配置文件 mosquitto conf 中 listener 1000
  • 通过nodejs服务器+socket.io从mp3文件同步流式传输音乐

    我的服务器上有一个 mp3 文件 我希望所有访问该网址的客户都能同步收听该音乐 That is 假设该文件播放 6 分钟 我在上午 10 00 开始播放这首歌 上午 10 03 发出的请求应从歌曲的第 3 分钟开始收听 我所有的客户都应该同
  • Express-Session、Connect-Redis 和 einaros/ws

    我似乎在让 Express express session connect redis 和 websockets ws 很好地协同工作时遇到了一些麻烦 这很可能与我对这些模块和编码的总体理解还有限有关 这里的大部分代码取自存储库中的相应示例
  • Websockets:npm 中的 Rachet 和 autobahn 兼容吗?

    我正在尝试Ratchet PHP 库 http socketo me 特别是 我一直在尝试整合他们的推式整合 http socketo me docs push演示到 React 应用程序中 他们的演示参考看似一次性的 autobahn J
  • GO Websocket 向所有客户端发送消息

    这段代码一切正常 为了更好的阅读而缩短了它 When Client1向服务器发送请求 服务器立即响应他 但是 其他客户端看不到响应消息 所以我想更进一步 当客户端向服务器发送请求时 服务器会响应所有客户端 以便所有客户端都可以看到该消息 我
  • Heroku 上带有 Django Channels 的 Websocket

    我正在尝试将我的应用程序部署到heroku 该应用程序有一个简单的聊天系统 使用 Websockets 和 django 通道 当我使用 python manage py runserver 测试我的应用程序时 应用程序的行为正如预期的那样
  • 与 Socket.io 保持连接

    我正在尝试使用 asterisk websocket 连接socket io 客户端 https github com socketio socket io client socket io connect url transports w
  • NodeJS 如何在没有 WebSocket 的情况下处理持久连接?

    我对 NodeJS 真的很陌生 如果我对某些东西听起来很天真 我很抱歉 并且我一直在深入研究示例的源代码聊天应用 http github com ry node chat 但是 我无法理解一件事 我知道 WebSockets 有助于处理持久
  • webpack-dev-server 中的代理 websockets 连接

    是否可以在 webpack 开发服务器中代理 websocket 连接 我知道如何将常规 HTTP 请求代理到另一个后端 但它不适用于 websockets 大概是因为代理配置中的目标以 http 开头 webpack dev server
  • Dart 将客户端 Socket 升级为 WebSocket

    Since WebSocket https api dartlang org stable 2 1 0 dart io WebSocket class html在 Dart 中不允许直接设置安全上下文 https api dartlang
  • HTTP/2 世界中的 WebSocket 替代方案是什么?

    新的 HTTP 2 协议具有一些有前途的功能 他们中有一些 多路复用 单个 TCP 连接可用于发出多个 HTTP 2 请求并接收多个响应 到单个源 HTTP 2 服务器推送 将服务器响应发送到客户端而不接收请求 即由服务器发起 双向连接 H
  • Flask-SocketIO 未使用 Gevent/Gevent-websocket

    我正在使用 Flask 和 Flask SocketIO 构建用于 websocket 通信的 Web 界面 数据 API 我想开始转向使用 Gevent Gevent websocket Gunicorn 以及最终使用 Nginx 进行负
  • 标签库支持命名空间:http://xmlns.jcp.org/jsf/core,但没有为名称定义标签:websocket

    我正在将 jsf 2 2 升级到 jsf 2 3 使用 Wildfly 11 0 0 Beta 作为服务器 我按照本网站上的说明进行操作 http arjan tijms omnifaces org p jsf 23 html 1396 h
  • iOS 中通过 USB 进行反向端口转发

    我在桌面上有一个 Web 套接字服务器 在 iPhone 设备上有一个客户端 我想使用 USB 而不是任何网络与他们通信 我已经使用 adb reverse 在 android 上实现了它 但无法找到适用于 iOS 的任何解决方案 我尝试使
  • 使用单个“proxyServer”将 Websocket 代理到多个目标

    我正在开发一个nodeJS websocket代理服务器 用例是当 websocket 请求到来时 我将检查其凭据 添加新标头 然后根据其组 来自用户 ID 将 websocket 连接重定向到其目标 webscoket 服务器 我发现大多

随机推荐

  • Zabbix从零到邮箱告警,可用于生产环境

    在监控方面 xff0c Zabbix xff0c 夜莺 xff0c Prometheus xff0c open falcon xff1b 其中 xff0c Zabbix和Prometheus可以称得上监控界的老大哥 xff0c 而Prome
  • 谷歌浏览器打开总是闪退,已解决

    我的谷歌浏览器之前可以打开使用 xff0c 之后突然打开就闪退 xff0c 删了又下载 xff0c 还是闪退 xff0c 用下面的方法 xff0c 已有效解决 1 找到桌面上的谷歌浏览器 xff0c 右键 xff0c 打开文件位置 2 把它
  • 如何使用JavaScript从函数内部获取函数名?

    给定一个函数 xff0c 如何使用JavaScript从函数内部获取函数的名称 xff1f 下面本篇文章就来给大家介绍一下使用JavaScript从函数内部获取函数名的方法 xff0c 希望对大家有所帮助 可以先通过arguments ca
  • Ubuntu安装MySQL的三种方式跟卸载MySQL

    注意 xff1a 我所有操作一开始就执行了 xff1a sudo su 进入特权模式 xff0c 后续命令就不需要在用sudo提权了 一 卸载MySQL 看个人习惯 xff0c 我基本用的是第一种 xff0c 简单粗暴 删除mysql的数据
  • 栏目目录

    栏目目录 1 学web从何开始 xff1f 2 js说古道今 3 闭包的虐心故事 4 关于原型 xff0c 我想说这些 5 JQuery简要 6 用AngularJS开发前端框架 xff08 基础 xff09 7 用AngularJs开发前
  • 百度云离线下载含有违规内容检测方法分析

    最近国家开始一轮净网行动 xff0c 清除网上的淫秽色情信息 各大互联网厂家纷纷开始行动 xff0c 比如当年很好用的百度云离线下载就一度关闭 后来再次开启后 xff0c 就出现了这句经典词 xff0c 因含有违规内容被屏蔽无法下载 其实被
  • 用python打印购物小票和证书

    目录 1 打印购物小票 2 打印证书 在使用python打印东西时一定要注意到缩进 xff0c 在python语言中对缩进很重视 input用来接收数据 input后边小括号可以写接受数据的条件 在定义常量或者变量名时 xff0c 最好找有
  • prime算法

    prime算法 令无向图 G 61 V E G 61 V E
  • 深度学习二

    BT神经元为按照误 差逆向传播算法训练的多层前馈神经网络 BT神经网络分为输入层 隐藏层 输出层 输入层一般有数据种类多个神经元 xff0c 接受数据 隐藏层的神经元为根号下隐藏层 输出层加b个 xff0c 输入的每个数据加权和返回数之和为
  • Python基础

    一 python代码编译 python 是解释型语 在执 的时候 需要解释器 边解释 翻译 边执 从上到下执 下 代码出现的错误 不会影响上 代码的执 二 python中的三种波浪线 红色波浪线 xff1a 是代码中的错误 需要解决 否则会
  • Word处理控件Aspose.Words功能演示:使用 C# 在 Word 文档中创建和修改 VBA 宏

    Aspose Words 是一种高级Word文档处理API xff0c 用于执行各种文档管理和操作任务 API支持生成 xff0c 修改 xff0c 转换 xff0c 呈现和打印文档 xff0c 而无需在跨平台应用程序中直接使用Micros
  • 建硬盘分区,pvcreate 报:“Can topen /dev/sdb1 exclusivel...?“;磁盘分区报:设备或资源;RHEL本地存储项目二mkfs.vfat报错

    建立硬盘分区 xff0c pvcreate 时报错 xff1a 34 Can topen dev sdb1 exclusively Mounted filesystem 34 dmsetup remove all 清空所有陈旧条目后成功 x
  • 搜索文件内容的几种方式

    搜索文件的几种方式 xff1a 一 提取文件 xff0c 插入数据库text xff0c 使用like 查询 使用poi或PageOffice提取文件内容文字 缺点 xff1a 只适合数据量不大的情况 二 提取文件 xff0c 插入数据库t
  • Centos 7虚拟机ifconfig ens或ip addr时,ens33不显示inet地址

    systemctl stop NetworkManager systemctl disable NetworkManager
  • nodeinternalmodulescjsloader936 throw err; 求解决

    D ethereumDkfuwq gt node app js node internal modules cjs loader 936 throw err Error Cannot find module safe buffer Requ
  • Tomcat 下载安装教程

    文章目录 参考资料1 下载2 安装3 卸载4 启动5 关闭6 配置7 部署8 IDEA使用Tomcat 8 1 集成本地Tomcat8 2 Tomcat Maven插件 参考资料 视频 使用Tomcat的前提是你已经熟练Java xff0c
  • JS说古道今

    JS说古道今 本文概要 讲述js的来源及重要的语法特性 xff0c 包括数据类型 DOM 作用域等 xff08 由于专业性比较强就不写诗扯淡了 61 61 xff0c 我尽量写的有趣点吧 JS JSP xff1f JavaScript xf
  • 删除集合当中的空元素(Collections.singleton(null)与stream())

    Arrays asList创建的数据为定长集合 xff0c 集合长度在操作时是不可以改变的 xff0c 不能对集合进行增删操作 Collections singleton null 相关 span class token comment 反
  • RabbitMQ配置更改TCP默认端口5672

    前言 公司新项目需集成RabbitMQ xff0c 但服务器环境已经安装了ActiveMQ 今天同事不说还不知道 xff0c 导致安装后俩MQ打架 端口冲突 而发生的一系列问题 没办法 xff0c 后来居上的就很被动 xff0c 于是就得改
  • WebSocket服务端消息推送

    前言 xff1a 移动互联网蓬勃发展的今天 xff0c 大部分手机 APP和网站都提供了消息推送功能 xff0c 如新闻客户端的热点新闻推荐 xff0c IM 工具的聊天消息提醒 xff0c 电商产品促销信息 xff0c 企业应用的通知和审