4. 消息中心的设计与实现

2023-11-04

消息中心的设计与实现

一、引言

运用场景:

1、消息的主动提醒(客户端被动接收)
2、客户模块(及时通讯)
3、单一登录(一个账号只能在一个设备登录)

消息中心的实现方案:

1、客户端轮询

2、TCP长连接(常用)

Java的长连接的方案

Java - Blocking I/O - JDK1.0 (同步阻塞式IO)
Java - Non Blocking I/O - JDK1.4(同步非阻塞式IO)
第三方的组织 - Mina、Netty(NIO)
Java - Async I/O -JDK1.7(异步非阻塞式IO)

二、BIO的实现点对点发送消息

服务器

public class Server {

    public static void main(String[] args) throws IOException {

        //创建Socket服务对象
        ServerSocket serverSocket = new ServerSocket(8888);
        //服务端要接收客户端的连接 - 阻塞式的
        final Socket socket = serverSocket.accept();//一旦有客户端连接,该方法就会返回连接该客户端的Socket对象,如果没有客户端连接,就会一直阻塞
        System.out.println("有一个客户端连接!");

        Scanner scanner = new Scanner(System.in);

        //开了一个子线程监听客户端的发送消息
        new Thread(){
            @Override
            public void run() {
                while(true) {
                    //获得客户端的请求
                    try {
                        InputStream in = socket.getInputStream();
                        byte[] bytes = new byte[10 * 1024];
                        int len = 0;//read方法是一个阻塞式的方法,如果没有客户端的消息,线程会阻塞在该方法上
                        len = in.read(bytes);
                        System.out.println("获取到客户端的请求数据:" + new String(bytes, 0, len));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();


        while (true) {

            //返回客户端的响应
            System.out.println("请输入发送的内容:");
            String content = scanner.next();
            OutputStream out = socket.getOutputStream();
            out.write(content.getBytes());
        }

    }
}

客户端

public class Client {

    public static void main(String[] args) throws IOException {
        //创建客户端的socket对象,并且连接服务器
        final Socket socket = new Socket("127.0.0.1", 8888);

        Scanner scanner = new Scanner(System.in);

        //开启子线程接收服务器的响应
        new Thread(){
            @Override
            public void run() {
                while(true) {
                    //接收响应
                    InputStream in = null;
                    try {
                        in = socket.getInputStream();
                        byte[] bytes = new byte[10 * 1024];
                        int len = in.read(bytes);
                        System.out.println("接收到服务器的响应:" + new String(bytes, 0, len));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        while (true) {
            //发消息
            System.out.println("请输入发送的内容:");
            String content = scanner.next();
            OutputStream out = socket.getOutputStream();
            out.write(content.getBytes());
            out.flush();
        }
    }
}

群发服务器

public class Server {

    public static List<Socket> socketList = new ArrayList<Socket>();

    public static void main(String[] args) throws IOException {

        //创建Socket服务对象
        ServerSocket serverSocket = new ServerSocket(8888);
        //服务端要接收客户端的连接 - 阻塞式的
        //死循环反复监听客户端的连接
        while (true) {
            final Socket socket = serverSocket.accept();//一旦有客户端连接,该方法就会返回连接该客户端的Socket对象,如果没有客户端连接,就会一直阻塞
            //保存当前的连接对象
            socketList.add(socket);
            System.out.println("有一个客户端连接!");

            //开了一个子线程监听客户端的发送消息
            new Thread() {
                @Override
                public void run() {
                    //获得客户端的请求
                    try {
                        while (true) {
                            InputStream in = socket.getInputStream();
                            byte[] bytes = new byte[10 * 1024];
                            int len = 0;//read方法是一个阻塞式的方法,如果没有客户端的消息,线程会阻塞在该方法上
                            len = in.read(bytes);
                            String content = new String(bytes, 0, len);
                            System.out.println("获取到客户端的请求数据:" + content + ", 并且将数据群发给其他的客户端!");

                            //群发给其他的客户端
                            for (Socket sock : socketList) {
                                if (sock != socket) {
                                    //不发送给自己
                                    sock.getOutputStream().write(content.getBytes());
                                }
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }.start();
        }
    }
}

群发客户端

public class Client {

    public static void main(String[] args) throws IOException {
        //创建客户端的socket对象,并且连接服务器
        final Socket socket = new Socket("127.0.0.1", 8888);

        Scanner scanner = new Scanner(System.in);

        //开启子线程接收服务器的响应
        new Thread(){
            @Override
            public void run() {
                try {
                    while(true) {
                        //接收响应
                        InputStream in = socket.getInputStream();
                        byte[] bytes = new byte[10 * 1024];
                        int len = in.read(bytes);
                        System.out.println("接收到服务器的响应:" + new String(bytes, 0, len));

                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        while (true) {
            //发消息
            System.out.println("请输入发送的内容:");
            String content = scanner.next();
            OutputStream out = socket.getOutputStream();
            out.write(content.getBytes());
            out.flush();
        }
    }
}

三、NIO的介绍与使用

BIO:
ServerSocket - 服务端
Socket - 连接对象
byte[] - 传递的数据类型

NIO:
ServerSocketChannel -
SocketChannel -
ByteBuffer - 本质还是byte数组
Selector - 多路复用器

多路复用器 (非阻塞)

在这里插入图片描述
在这里插入图片描述

ByteBuffer - NIO数据传递的对象,本质还是Byte数组

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qRPPExJe-1629036777488)(img/image-20200724113948220.png)]

使用NIO编写一个本地的文件拷贝

/**
 * 文件拷贝的demo - NIO
 */
public class Demo {

    public static void main(String[] args)  {

        try (
            FileChannel inChannel = new FileInputStream("C:\\Users\\Ken\\Pictures\\Saved Pictures\\奥格瑞玛.jpg").getChannel();
            FileChannel outChannel = new FileOutputStream("C:\\Users\\Ken\\Desktop\\a.jpg").getChannel();
        ) {
            //准备ByteBuffer
            ByteBuffer byteBuffer = ByteBuffer.allocate(10 * 1024);//位置为0  容量最大 界限指到容量
            //循环写入数据到byteBuffer
            while (inChannel.read(byteBuffer) != -1) {
                //byteBuffer  位置指向写入的数据的末端 容量最大 界限指到容量
                byteBuffer.flip();//将界限移动到位置的地方,位置重置为0 -- 为读取数据做准备
                //读取byteBuffer的数据,写入输出管道
                outChannel.write(byteBuffer);
                //重置byteBuffer
                byteBuffer.clear(); // 将界限移动到容量的位置,位置重置为0 -- 为写入数据做准备
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

使用NIO实现一个消息群发的功能

服务器

public class NioServer {

    public static void main(String[] args) throws IOException {

        List<SocketChannel> socketChannels = new ArrayList<>();

        //创建一个服务端的Channel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //绑定端口
        serverSocketChannel.bind(new InetSocketAddress(9999));
        //设置Channel的类型
        serverSocketChannel.configureBlocking(false);//设置当前Channel为非阻塞模式

        //创建一个多路复用器
        Selector selector = Selector.open();
        //将Channel注册到多路复用器上
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//注册到多路复用器,注册的动作为(等待客户端连接)

        //主线程轮询多路复用器
        while(true){

            //询问多路复用器是否有准备好的动作
            int select = selector.select();
            if(select > 0){
                //有客户端进行了某些操作
                //SelectionKey - 封装(channel, 动作)
                Set<SelectionKey> selectionKeys = selector.selectedKeys();//从多路复用器中,返回有动作的Channel集合
                //循环处理channel
                for (SelectionKey selectionKey : new HashSet<>(selectionKeys)) {
                    //从集合中移除该channel
                    selectionKeys.remove(selectionKey);

                    //判断当前发生了什么动作
                    if (selectionKey.isAcceptable()){
                        //说明有一个新的客户端连接了!
                        ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
                        //等待客户端连接
                        SocketChannel socketChannel = serverChannel.accept();
                        System.out.println("接收到客户端的连接!!!!");
                        //将客户端的Channel注册到多路复用器上
                        socketChannel.configureBlocking(false);//非阻塞模式
                        socketChannel.register(selector, SelectionKey.OP_READ);//注册多路复用器

                        //保存管理客户端的Channel集合
                        socketChannels.add(socketChannel);
                    } else if(selectionKey.isReadable()){
                        //说明有一个客户端发送了消息
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        //读取channel中的数据
                        ByteBuffer byteBuffer = ByteBuffer.allocate(10 * 1024);
                        socketChannel.read(byteBuffer);

                        //群发
                        for (SocketChannel otherChannel : socketChannels) {
                            if (otherChannel != socketChannel) {//排除本人
                                byteBuffer.flip();//bytebuffer读取准备
                                otherChannel.write(byteBuffer);
                            }
                        }
                        byteBuffer.flip();
                        byte[] bytes = byteBuffer.array();
                        System.out.println("接收到客户端的数据:" + new String(bytes) + ", 群发给其他的客户端");
                    }
                }
            }
        }
    }
}

客户端

public class NioClient {

    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
        socketChannel.configureBlocking(true);//设置为阻塞模式

        //开启子线程处理服务器的读请求
        new Thread(() -> {
            try {
                while (true) {
                    ByteBuffer byteBuffer = ByteBuffer.allocate(10 * 1024);
                    socketChannel.read(byteBuffer);
                    //打印数据
                    byteBuffer.flip();
                    byte[] array = byteBuffer.array();
                    System.out.println("读取到服务器的消息:" + new String(array));
                }
            } catch (Exception e){
                System.out.println("服务器读取异常!");
            }
        }).start();

        //主线程本身 - 死循环 写入
        Scanner scanner = new Scanner(System.in);
        while(true){
            System.out.println("请输入需要群发的消息:");
            String content = scanner.next();

            //String -> byte[] -> ByteBuffer
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 10);
            byteBuffer.put(content.getBytes(), 0, content.getBytes().length);
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
        }
    }
}

四、Netty的基本使用

1、添加依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.49.Final</version>
</dependency>

Netty线程模型

* Netty的线程模型
* 1、单线程模型 - 1个线程处理所有客户端的连接和数据读取
* 2、多线程模型 - 1个线程处理客户端的连接、线程池处理客户端数据的读写
* 3、主从线程池模型 - 主线程池处理客户端的连接、从线程池处理客户端数据的读写

4.1 Netty的ChannelHandler

入站消息处理器:ChannelInboundHandlerAdapter(SimpleChannelInboundHandler)
出站消息处理器:ChannelOutboundHandlerAdapter

处理器链:

在这里插入图片描述

编解码器:

什么是编解码器?

编解码器就是netty用来对消息格式转换的工具类

本质上编解码器就是入站出站消息的处理器。解码器 -> 入站消息,编码器 -> 出站消息

常用的编码解码器

StringDecoder - String的解码器

StringEncoder - String的编码器

LineBasedFrameDecoder - 按照行进行拆包的解码器

TCP的拆包和粘包

什么是拆包?-在一个TCP请求中,一个完整的消息,可能被拆分成多个消息处理
什么是粘包?-在一个TCP请求中,多个消息可能被粘在一起,成为一个消息处理

4.2 Netty对Http协议的支持

什么是HTTP协议?

本质上就是一个按照固定规则编写的字符串(请求行、请求头、请求体,响应码,响应头,响应体)

Netty处理Http请求的编解码器

Http服务端使用:
HttpRequestDecoder - Http请求的解码器
HttpResponseEncoder - Http响应的编码器
HttpServerCodec - 上面两个编解码器的集合体

Http客户端使用:
HttpRequestEncoder - Http请求的编码器
HttpResponseDocoder - Http响应的解码器
HttpClientCodec - 上面两个编解码器的集合体

使用了HttpServerCodec 之后,消息就会被拆解成为:

HttpRequest(请求行、请求头)、HttpContent、LastHttpContent(请求体)

Netty编写Http文件服务器

package com.qf.httpfileserver;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * 基于Netty实现的一个HTTP文件服务器
 */
public class HttpFileServer {

    public static void main(String[] args) {
        EventLoopGroup master = new NioEventLoopGroup();
        EventLoopGroup slave = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap()
                .group(master, slave)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {

                        //自定义处理器
                        ChannelPipeline pipeline = ch.pipeline();

                        //添加分片文件处理的编码器
                        pipeline.addLast(new ChunkedWriteHandler());

                        //Http服务器使用
                        //一旦使用了Http的编解码器,后续的消息就会变成3个对象,HttpRequest,HttpContent(N个),LastHttpContent
                        pipeline.addLast(new HttpServerCodec());
                        //这是一个Http请求的聚合器,该聚合器会将所有的HttpRequest、HttpContent、LastHttpContent聚合成为一个FullHttpRequest对象
                        pipeline.addLast(new HttpObjectAggregator(10 * 1024 * 1024));
                        pipeline.addLast(new MyFileChannelHandler());
                    }
                });

        //绑定端口
        try {
            serverBootstrap.bind(8080).sync();
            System.out.println("端口绑定成功,服务已经启动!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
package com.qf.httpfileserver;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;

import java.io.File;
import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.Arrays;

/**
 * Http请求处理器
 */
public class MyFileChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    //本地开发的路径url
    private String path = "C:\\worker\\Linux";

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {

        //获得请求类型
        HttpMethod method = request.method();

        //只接受Get请求
        if (!method.name().equalsIgnoreCase("GET")) {
            //非GET请求
            setError(ctx, "请求类型异常!");
            return;
        }

        //获得请求的URL
        String uri = request.uri();
        //解码
        uri = URLDecoder.decode(uri, "UTF-8");
        System.out.println("请求的url:" + uri);
        //判断请求的uri是否存在
        File file = new File(path, uri);
        if (!file.exists()){
            //请求的路径不存在
            setError(ctx, "请求的路径不存在,请不要乱来!");
            return;
        }

        //判断请求的文件类型
        if(file.isFile()){
            //请求的是文件,进行下载
            fileHandler(ctx, file);
        } else {
            //请求的是文件夹,返回该文件夹下的文件列表
            dirHandler(ctx, file, uri);
        }
    }

    /**
     * 文件的处理方式
     * @param ctx
     * @param file
     */
    private void fileHandler(ChannelHandlerContext ctx, File file){
        //下载该文件 - 读取该文件

        //编写响应行 响应头
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        //响应体的内容
        if (file.getName().endsWith(".jpg") || file.getName().endsWith(".png")) {
            response.headers().add("Content-Type", "image/jpeg");
        } else {
            response.headers().add("Content-Type", "application/octet-stream");
        }
        //响应体的大小
        response.headers().add("Content-Length", file.length());
        //返回响应头和响应码给客户端
        ctx.writeAndFlush(response);

        //下载的文件(响应体)分块传递给客户端
        try {
            ChunkedFile cFile = new ChunkedFile(file, 1024 * 10);
            ChannelFuture channelFuture = ctx.writeAndFlush(cFile);
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()){
                        System.out.println("文件下载完成!");
                        ctx.close();
                    }
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 路径的处理方式
     */
    private void dirHandler(ChannelHandlerContext ctx, File file, String uri){
        StringBuilder sb = new StringBuilder();
        sb.append("<ul>");
        
        //获取该路径下的所有子文件
        File[] files = file.listFiles();
        Arrays.stream(files).forEach(f -> {
            sb.append("<li><a href='").append(uri.equals("/") ? "" : uri).append("/").append(f.getName()).append("'>")
                    .append("(").append(f.isFile() ? "文件" : "文件夹").append(")")
                    .append(f.getName()).append("</a></li>");
        });

        sb.append("</ul>");

        //响应给客户端
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        response.headers().add("Content-Type", "text/html;charset=utf-8");
        response.content().writeBytes(sb.toString().getBytes(Charset.forName("UTF-8")));
        //返回响应
        ctx.writeAndFlush(response);
        //关闭连接
        ctx.close();
    }

    /**
     * 返回错误页面
     */
    private void setError(ChannelHandlerContext ctx, String errorMsg) {
        //响应给客户端
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
        response.headers().add("Content-Type", "text/html;charset=utf-8");
        response.content().writeBytes(errorMsg.getBytes(Charset.forName("UTF-8")));
        //返回响应
        ctx.writeAndFlush(response);
        //关闭连接
        ctx.close();
    }
}

4.3 Netty对WebSocket协议的支持

什么是WebSocket协议?

WebSocket是一种基于TCP协议的长连接协议,简单来说,就是客户端和服务器可以随意发送消息。

优势:可以直接在浏览器和服务器之间构建长连接(也支持程序和程序之间构建长连接)

WebSocket协议的构建构成

在这里插入图片描述

WebSocket的数据帧

数据帧:
文本帧
二进制帧
文本+二进制帧

状态帧:
ping帧
pong帧
close帧

Netty搭建一个WebSocket服务器

服务端:

package com.qf.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

public class WebSocketServer {

    public static void main(String[] args) {
        EventLoopGroup master = new NioEventLoopGroup();
        EventLoopGroup slave = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap()
                .group(master, slave)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {

                        //自定义处理器
                        ChannelPipeline pipeline = ch.pipeline();

                        //Http服务器使用
                        //一旦使用了Http的编解码器,后续的消息就会变成3个对象,HttpRequest,HttpContent(N个),LastHttpContent
                        pipeline.addLast(new HttpServerCodec());
                        //这是一个Http请求的聚合器,该聚合器会将所有的HttpRequest、HttpContent、LastHttpContent聚合成为一个FullHttpRequest对象
                        pipeline.addLast(new HttpObjectAggregator(10 * 1024 * 1024));
                        //WebSocket升级握手的编解码器
                        //该编解码器的作用
                        //1、进行websocket的握手升级
                        //2、自动处理客户端发送的所有状态帧
                        pipeline.addLast(new WebSocketServerProtocolHandler("/"));
                        //后续的消息,都是数据帧
                        pipeline.addLast(new MyWebSocketHandler());
                    }
                });

        //绑定端口
        try {
            serverBootstrap.bind(8080).sync();
            System.out.println("端口绑定成功,服务已经启动!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

客户端(浏览器):

var ws;

//连接服务器
function conn(){
	if(window.WebSocket){
		
		//浏览器支持WebSocket协议
		ws = new WebSocket("ws://127.0.0.1:8080/");
		//设置websocket的回调方法
		ws.onopen = function (){
			//websocket连接成功后触发
			console.log("服务器连接成功!");
			$("#chatlist").append("<font color='green'>服务器连接成功!</font><br/>");
		}
		
		ws.onerror = function(){
			//连接异常时触发
			console.log("服务器连接异常!!");
			$("#chatlist").append("<font color='red'>服务器连接异常!</font><br/>");
		}
		
		ws.onclose = function(){
			//连接关闭时触发
			console.log("服务器连接已经关闭!!");
			$("#chatlist").append("<font color='red'>服务器连接关闭!</font><br/>");
		}
		
		ws.onmessage = function(data){
			//服务器给客户端发消息时触发
			console.log("接收到服务器的消息:" + data.data);
			$("#chatlist").append("<font color='blueviolet'>服务器:" + data.data + "</font><br/>");
		}
		
	} else {
		alert("骚瑞,浏览器不支持WebSocket协议,请换个电脑!");
	}
}

/**
 * 发送消息给服务器
 */
function sendMsg(){
	var text = $("#sendInput").val();
	$("#sendInput").val("");
	
	//将内容写入div
	$("#chatlist").append("<font color='blue'>我:" + text + "</font><br/>");
	
	ws.send(text);
}

客户端的重连与心跳机制:

重连的实现:

//重连方法
function reconn(){
	
	console.log("重新连接服务器!!!");
	setTimeout(function(){
		
		//重连
		conn();
		
	}, 10000);
}

......
ws.onclose = function(){
	//连接关闭时触发
	console.log("服务器连接已经关闭!!");
				
	//开始重连
	reconn();
}

心跳的实现:

1、什么时候发送心跳?
2、如何循环发送心跳?
3、如果收不到心跳回复,如何关闭连接进行重连?
4、在线要发送心跳,离线就停止发送心跳

//定时关闭方法
var closeTimeout;
function closeConn(){
	
	closeTimeout = setTimeout(function(){
		
		//关闭服务器的连接
		ws.close();
		
	}, 10000);
}


//心跳的实现
var heartTimeout;
function heart(){
	
	//循环发送心跳
	heartTimeout = setTimeout(function(){
		
		console.log("发送心跳信息.....");
		ws.send("heart");
		
		heart();
	}, 5000);
	
}


...
//设置websocket的回调方法
ws.onopen = function (){
	//websocket连接成功后触发
	console.log("服务器连接成功!");
	
			
	//开始发送心跳
	heart();
	//定时关闭连接
	closeConn();
}

...
ws.onclose = function(){
	//连接关闭时触发
	console.log("服务器连接已经关闭!!");

			
	//停止心跳发送
	if(heartTimeout){
		clearTimeout(heartTimeout);
	}		
}

4.4 Netty的集群构建

Netty集群的搭建:

在这里插入图片描述

Netty服务器Channel管理的方案:
在这里插入图片描述

Netty集群发行消息的方案:

在这里插入图片描述

五、消息中心的设计方案与实现

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

4. 消息中心的设计与实现 的相关文章

  • Windows安装Neo4j

    图数据库概述 图数据库是基于图论实现的一种NoSQL数据库 其数据存储结构和数据查询方式都是以图论 它以图为研究对象图论中的图是由若干给定的点及连接两点的线所构成的图形 为基础的 图数据库主要用于存储更多的连接数据 Neo4j说明 Neo4
  • Weblogic XMLDecoder 反序列化漏洞原理与漏洞复现(基于vulhub,保姆级的详细教程)

    漏洞原理 本文介绍了Weblogic XMLDecoder反序列化相关的漏洞原理 并以CVE 2017 10271为例在vulhub上进行了复现 有关Weblogic XMLDecoder反序列化漏洞包括CVE 2017 3506 CVE
  • Activiti7与Spring、Spring Boot整合开发

    Activiti整合Spring 一 Activiti与Spring整合开发 1 1 Activiti与Spring整合的配置 1 在pom xml文件引入坐标 如下
  • Node利用connect中间件 及bodyParser处理文件上传

    1 html
  • 【Node.js】中间件

    目录 什么是中间件 定义中间件函数 最简单的中间件函数 全局生效的中间件 定义多个全局中间件 局部生效的中间件 中间件的分类 应用级别的中间件 路由级别的中间件
  • nodejs egg框架统一错误信息返回封装

    use strict app middleware error handler js module exports gt return async function errorHandler ctx next try await next
  • Nacos启动出现Error creating bean with name ‘memoryMonitor‘ 、‘externalDumpService‘

    目录 问题 解决方法 这里是CS大白话专场 让枯燥的学习变得有趣 没有对象不要怕 我们new一个出来 每天对ta说不尽情话 好记性不如烂键盘 自己总结不如收藏别人 问题 用KubeSphere创建Nacos时出现Error creating
  • Redis底层数据结构.md

    1 Redis 概述 Redis 数据库里面的每个键值对 key value 都是由对象 object 组成的 数据库键总是一个字符串对象 string object 数据库的值则可以是字符串对象 列表对象 list 哈希对象 hash 集
  • ElasticSearch安装在Windows上详细教程

    ElasticSearchWindows安装教程 Download Elasticsearch Elastic 解压ElasticSearch 打开elasticsearch 6 4 2 bin调用黑窗口 输入elasticsearch b
  • Redis基础知识(三):缓存穿透、缓存击穿、缓存雪崩

    文章目录 一 缓存穿透 出现过程 解决方法 二 缓存击穿 出现过程 解决方法 三 缓存雪崩 出现过程 解决方法 我们在项目中大量使用Redis承接海量数据的冲击 但是使用过程中也会遇到一些特殊的情况 这个就是缓存击穿 缓存穿透 缓存雪崩 一
  • 你遇到过的测试难题(6)记一次xxl-job的故障失败没有重试机制

    你遇到过的测试难题 6 记一次xxl job的故障失败没有重试机制 你遇到过的测试难题 6 记一次xxl job的故障失败没有重试机制 业务背景 线上故障表现 故障结论 测试过程 总结 你遇到过的测试难题 6 记一次xxl job的故障失败
  • thinkphp6 入门(6)--中间件是什么 怎么用

    一 什么是中间件 当客户端发送请求至服务器时 HTTP请求会经过多个中间件 最后返回响应给客户端 中间件可以 在请求到达目标控制器或动作之前对请求进行操作 可以在响应离开目标控制器或动作之前对响应进行操作 二 中间件的作用 我们可以在不修改
  • docker安装rocketmq4.6.1(精简版)

    一 创建文件 mkdir p usr local rocketmq server logs usr local rocketmq server store usr local rocketmq broker logs usr local r
  • 分布式 dataX 详细 (落地) 设计

    1 背景 分布式 DataX 基于 datax 打造的语义分分布式 ETL 平台 Datax 提供 reader framework writer 框架 方便开发两种异构数据源数据同步 但开源的 datax 缺少分布式特性 本文介绍基于 e
  • ESB开发WebService接口

    1 概述 在进行系统间集成时经常利用WebService 但是从建立WebService和调用的重复性和维护性的工作量都相当大 首先简单介绍一下 ESB全称为Enterprise Service Bus 即企业服务总线 它是传统中间件技术与
  • 带你使用Golang快速构建出命令行应用程序

    在日常开发中 大家对命令行工具 CLI 想必特别熟悉了 如果说你不知道命令工具 那你可能是个假开发 每天都会使用大量的命令行工具 例如最常用的Git Go Docker等 不管是做技术开发还是业务开发 都会有开发命令行程序的场景 例如如果是
  • 服务攻防-中间件安全&CVE复现&IIS&Apache&Tomcat&Nginx漏洞复现

    目录 一 导图 二 ISS漏洞 中间件介绍 gt 1 短文件 2 文件解析 3 HTTP SYS 4 cve 2017 7269 三 Nignx漏洞 中间件介绍 gt 1 后缀解析漏洞 2 cve 2013 4547 3 cve 2021
  • 中国中间件第一人---袁红岗

    最早开发Windows上的企业应用软件 打造独立知识产权的EJB服务器Apusus 很多JAVA程序员对袁红岗极其佩服 源于他做了很多人不敢想更不敢做的事情 这就是他打造了国产的EJB服务器 很快 金蝶将在国内推出自主产权EJB服务器的3
  • JAVA WEB 中间件为SERVLET(四)

    写一个用户登录部署到tomcat 本地 先找到一个模板 HTML代码复制到本地的项目index jsp中 这个登录模板包含一个JSP 一个JS 三个CSS等文件 这个是index jsp代码
  • 用一个简单的例子教你如何 自定义ASP.NET Core 中间件(二)

    上一章已经说过了 中间件是一种装配到应用管道以处理请求和响应的软件 每个组件 选择是否将请求传递到管道中的下一个组件 可在管道中的下一个组件前后执行工作 请求委托用于生成请求管道 请求委托处理每个 HTTP 请求 一句话总结 中间件是比筛选

随机推荐

  • 在kali环境下安装dvwa

    1 下载dvwa 登陆进去kali 输入 git clone https github com ethicalhack3r DVWA 2 安装DVWA 把下载好得DVWA cp到 var www html 下 给dvwa一个权限 查看一下所
  • 技术革命与金融资本:泡沫与黄金时代的动力学 (美)卡萝塔·佩雷丝着

    这本书总结了科技革命对社会的影响 分析科技革命后什么时间什么条件下对社会造成影响 很不错的一本书 必须推荐啊 2013 9 29
  • harbor 离线同步

    场景 客户环境为离线环境 只能把公司私服的镜像下载后 然后通过客户端中转传到客户的私服上 客户端在客户内网中 然后拨号到公司的内网 bin bash harbor的url URL https 10 27 6 59 31009 源harbor
  • 文件锁

    借助 fcntl函数来实现文件锁机制 操作文件的进程没有获得文件锁时 可以打开文件 但无法执行read write操作 注意 文件锁只能用于进程间同步 fcntl函数 int fcntl int fd int cmd int fcntl i
  • 2022年江西省“网络空间安全”赛项模块B--流量分析(中职组)

    2022年中职组山西省 网络空间安全 赛项 B 6 流量分析任务书及解析 不懂私信博主 一 竞赛时间 420分钟 共计7小时 吃饭一小时 二 竞赛阶段 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 第 阶段 单兵模式系统渗透测试 任务一 系
  • 【华为OD机试真题 Python】最差产品奖

    前言 本专栏将持续更新互联网大厂机试真题 并进行详细的分析与解答 包含完整的代码实现 希望可以帮助到正在努力的你 关于大厂机试流程 面经 面试指导等 如有任何疑问 欢迎联系我 wechat steven moda email nansun0
  • STM32外设天造地设的一对:ADC和DMA

    STM32外设天造地设的一对 ADC和DMA 引言 这篇文章主要介绍ADC和DMA配置的注意事项 适合懂得如何配置最基本的ADC和DMA 但是对它们两个的模式不是太理解的朋友们看 本文将重点介绍ADC和DMA模式的注意事项 DMA是CPU的
  • Spring Boot 报错org.springframework.jdbc.datasource.embedded.EmbeddedData

    记录一次Spring Boot错误解决方案 如果Spring Boot 在整合druid的时候 如果启动报错 并报以下错误的时候 org springframework beans factory UnsatisfiedDependency
  • 常用决策树模型ID3、C4.5、CART算法

    决策树概述 决策树 decision tree 是一种基本的分类与回归方法 下面提到的ID3 C4 5 CART主要讨论分类的决策树 在分类问题中 表示基于特征对实例进行分类的过程 可以认为是if then的集合 也可以认为是定义在特征空间
  • 计算机图形学期刊和会议

    目录 中国计算机学会推荐国际学术期刊 计算机图形学与多媒体 一 A类 二 B类 三 C类 中国计算机学会推荐国际学术会议 计算机图形学与多媒体 一 A类 二 B类 三 C类 中国计算机学会推荐国际学术期刊 计算机图形学与多媒体 一 A类 序
  • 光纤收发器怎么连?光纤收发器连接图解!

    光纤收发器可以实现光信号和电信号的转换 通过光纤进行信号的转换 最后和一些设备连接 让你的传输通信畅通无阻 当我们远距离传输时 通常会使用光纤来传输 因为光纤的传输距离很远 一般来说单模光纤的传输距离在10千米以上 而多模光纤的传输距离最高
  • C++报错提示某类名不是类或命名空间名称

    在学习C 的友元时 遇到一个问题 两个类互相调用时报错 如图 include
  • 三菱数控CNC系统G代码M代码大全

    G00 快速定位 G01 直线补间切削 G02 圆弧补间切削CW 顺时针 G03 圆弧补间切削CCW 逆时针 G02 3 指数函数补间 正转 G03 3 指数函数补间 逆转 G04 暂停 G05 高速高精度制御 1 G05 1 高速高精度制
  • C++ //STL 简介

    STL简介 STL Standard Template Library 即标准模板库 是一个具有工业强度的 高效的C 程序库 STL的一个重要特点是数据结构和算法的分离 STL另一个重要特性是它不是面向对象的 STL六大组件 容器 Cont
  • 非线性解方程组c语言_08 -- 非线性有限元分析方法

    到目前为止 讨论的都是线性问题 这里采用了两个基本假设 1 材料的应力 应变关系是线性的 D 2 结构的应变 位移关系是线性的 B 非线性有限元分析方法大同小异 以材料非线性为例 1 当材料的应力 应变关系是非线性时 刚度矩阵不是常数 与位
  • 代码管理工具SVN

    svn 什么是svn SVN是Subversion的简称 是一个开放源代码的版本控制系统 相较于RCS CVS 它采用了分支管理系统 它的设计目标就是取代CVS 互联网上很多版本控制服务已从CVS迁移到Subversion 说得简单一点SV
  • 力扣-->#剑指Offer 563 . 二叉树倾斜

    这道题要理解有一定的困难 首先看到这样的题目肯定想到的就是遍历 其次 需要有一个值来记录倾斜度 即 left right 再者 需要一个函数来帮忙辅助计算倾斜度 即findSum class Solution int findSum Tre
  • RIP、OSPF等路由协议严格意义上讲属哪一层?

    1 RIP基于UDP BGP基于TCP OSPF和EIGRP基于IP 这些在TCP IP协议栈中定义的路由协议用于发现和维护前往目的地的最短路径 你可以认为它们不属于网络层协议 注意 是用 based on 而不是实现了 BGP用TCP 所
  • 微信公众号h5页面实现授权,前端部分

    授权步骤 微信开发工具上配置公众号的apiId 微信环境内调起微信的授权功能 代码实现 此段代码放在app vue的监听函数中 每当路由发生变化 都会判断此页面是否已授权 没授权的话则进行授权 watch route handler fun
  • 4. 消息中心的设计与实现

    消息中心的设计与实现 一 引言 运用场景 1 消息的主动提醒 客户端被动接收 2 客户模块 及时通讯 3 单一登录 一个账号只能在一个设备登录 消息中心的实现方案 1 客户端轮询 2 TCP长连接 常用 Java的长连接的方案 Java B