Spring Cloud Nacos源码讲解(五)- Nacos服务端健康检查

2023-10-31

Nacos服务端健康检查

长连接

概念:长连接,指在一个连接上可以连续发送多个数据包,在连接保持期间,如果没有数据包发送,需要双方发链路检测包

注册中心客户端2.0之后使用gRPC代替http,会与服务端建立长连接,但仍然保留了对旧http客户端的支持。

NamingClientProxy接口负责底层通讯,调用服务端接口。有三个实现类:

  • NamingClientProxyDelegate:代理类,对所有NacosNamingService中的方法进行代理,根据实际情况选择http或gRPC协议请求服务端。
  • NamingGrpcClientProxy:底层通讯基于gRPC长连接。
  • NamingHttpClientProxy:底层通讯基于http短连接。使用的都是老代码基本没改,原来1.0NamingProxy重命名过来的。

以客户端服务注册为例,NamingClientProxyDelegate代理了registerService方法。

// NacosNamingService.java
private NamingClientProxy clientProxy; // NamingClientProxyDelegate
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    clientProxy.registerService(serviceName, groupName, instance);
}

NamingClientProxyDelegate会根据instance实例是否是临时节点而选择不同的协议

​ 临时instance:gRPC

​ 持久instance:http

public class NamingClientProxyDelegate implements NamingClientProxy {
   private final NamingHttpClientProxy httpClientProxy;
   private final NamingGrpcClientProxy grpcClientProxy;
   @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
      getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
    }
  // 临时节点,走grpc长连接;持久节点,走http短连接
  private NamingClientProxy getExecuteClientProxy(Instance instance) {
      return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
  }
}

健康检查

​ 在之前的1.x版本中临时实例走Distro协议内存存储,客户端向注册中心发送心跳来维持自身healthy状态,持久实例走Raft协议持久化存储,服务端定时与客户端建立tcp连接做健康检查。

​ 但是2.0版本以后持久化实例没有什么变化,但是2.0临时实例不在使用心跳,而是通过长连接是否存活来判断实例是否健康。

ConnectionManager负责管理所有客户端的长连接。

每3s检测所有超过20s没发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在1s内成功响应,则检测通过,否则执行unregister方法移除Connection。

如果客户端持续与服务端通讯,服务端是不需要主动探活的

Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
@PostConstruct
public void start() {

    // 启动不健康连接排除功能.
    RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {

                int totalCount = connections.size();
                Loggers.REMOTE_DIGEST.info("Connection check task start");
                MetricsMonitor.getLongConnectionMonitor().set(totalCount);
                //统计过时(20s)连接
                Set<Map.Entry<String, Connection>> entries = connections.entrySet();
                int currentSdkClientCount = currentSdkClientCount();
                boolean isLoaderClient = loadClient >= 0;
                int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit;
                int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);

                Loggers.REMOTE_DIGEST
                    .info("Total count ={}, sdkCount={},clusterCount={}, currentLimit={}, toExpelCount={}",
                          totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount),
                          currentMaxClient + (isLoaderClient ? "(loaderCount)" : ""), expelCount);

                List<String> expelClient = new LinkedList<>();

                Map<String, AtomicInteger> expelForIp = new HashMap<>(16);

                //1. calculate expel count  of ip.
                for (Map.Entry<String, Connection> entry : entries) {

                    Connection client = entry.getValue();
                    String appName = client.getMetaInfo().getAppName();
                    String clientIp = client.getMetaInfo().getClientIp();
                    if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) {
                        //get limit for current ip.
                        int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp);
                        if (countLimitOfIp < 0) {
                            int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName);
                            countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp;
                        }
                        if (countLimitOfIp < 0) {
                            countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault();
                        }

                        if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) {
                            AtomicInteger currentCountIp = connectionForClientIp.get(clientIp);
                            if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) {
                                expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp));
                            }
                        }
                    }
                }

                Loggers.REMOTE_DIGEST
                    .info("Check over limit for ip limit rule, over limit ip count={}", expelForIp.size());

                if (expelForIp.size() > 0) {
                    Loggers.REMOTE_DIGEST.info("Over limit ip expel info, {}", expelForIp);
                }

                Set<String> outDatedConnections = new HashSet<>();
                long now = System.currentTimeMillis();
                //2.get expel connection for ip limit.
                for (Map.Entry<String, Connection> entry : entries) {
                    Connection client = entry.getValue();
                    String clientIp = client.getMetaInfo().getClientIp();
                    AtomicInteger integer = expelForIp.get(clientIp);
                    if (integer != null && integer.intValue() > 0) {
                        integer.decrementAndGet();
                        expelClient.add(client.getMetaInfo().getConnectionId());
                        expelCount--;
                    } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {
                        outDatedConnections.add(client.getMetaInfo().getConnectionId());
                    }

                }

                //3. if total count is still over limit.
                if (expelCount > 0) {
                    for (Map.Entry<String, Connection> entry : entries) {
                        Connection client = entry.getValue();
                        if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo()
                            .isSdkSource() && expelCount > 0) {
                            expelClient.add(client.getMetaInfo().getConnectionId());
                            expelCount--;
                            outDatedConnections.remove(client.getMetaInfo().getConnectionId());
                        }
                    }
                }

                String serverIp = null;
                String serverPort = null;
                if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {
                    String[] split = redirectAddress.split(Constants.COLON);
                    serverIp = split[0];
                    serverPort = split[1];
                }

                for (String expelledClientId : expelClient) {
                    try {
                        Connection connection = getConnection(expelledClientId);
                        if (connection != null) {
                            ConnectResetRequest connectResetRequest = new ConnectResetRequest();
                            connectResetRequest.setServerIp(serverIp);
                            connectResetRequest.setServerPort(serverPort);
                            connection.asyncRequest(connectResetRequest, null);
                            Loggers.REMOTE_DIGEST
                                .info("Send connection reset request , connection id = {},recommendServerIp={}, recommendServerPort={}",
                                      expelledClientId, connectResetRequest.getServerIp(),
                                      connectResetRequest.getServerPort());
                        }

                    } catch (ConnectionAlreadyClosedException e) {
                        unregister(expelledClientId);
                    } catch (Exception e) {
                        Loggers.REMOTE_DIGEST.error("Error occurs when expel connection, expelledClientId:{}", expelledClientId, e);
                    }
                }

                //4.client active detection.
                Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
                //异步请求所有需要检测的连接
                if (CollectionUtils.isNotEmpty(outDatedConnections)) {
                    Set<String> successConnections = new HashSet<>();
                    final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
                    for (String outDateConnectionId : outDatedConnections) {
                        try {
                            Connection connection = getConnection(outDateConnectionId);
                            if (connection != null) {
                                ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                                connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                                    @Override
                                    public Executor getExecutor() {
                                        return null;
                                    }

                                    @Override
                                    public long getTimeout() {
                                        return 1000L;
                                    }

                                    @Override
                                    public void onResponse(Response response) {
                                        latch.countDown();
                                        if (response != null && response.isSuccess()) {
                                            connection.freshActiveTime();
                                            successConnections.add(outDateConnectionId);
                                        }
                                    }

                                    @Override
                                    public void onException(Throwable e) {
                                        latch.countDown();
                                    }
                                });

                                Loggers.REMOTE_DIGEST
                                    .info("[{}]send connection active request ", outDateConnectionId);
                            } else {
                                latch.countDown();
                            }

                        } catch (ConnectionAlreadyClosedException e) {
                            latch.countDown();
                        } catch (Exception e) {
                            Loggers.REMOTE_DIGEST
                                .error("[{}]Error occurs when check client active detection ,error={}",
                                       outDateConnectionId, e);
                            latch.countDown();
                        }
                    }

                    latch.await(3000L, TimeUnit.MILLISECONDS);
                    Loggers.REMOTE_DIGEST
                        .info("Out dated connection check successCount={}", successConnections.size());
					// 对于没有成功响应的客户端,执行unregister移出
                    for (String outDateConnectionId : outDatedConnections) {
                        if (!successConnections.contains(outDateConnectionId)) {
                            Loggers.REMOTE_DIGEST
                                .info("[{}]Unregister Out dated connection....", outDateConnectionId);
                            unregister(outDateConnectionId);
                        }
                    }
                }

                //reset loader client

                if (isLoaderClient) {
                    loadClient = -1;
                    redirectAddress = null;
                }

                Loggers.REMOTE_DIGEST.info("Connection check task end");

            } catch (Throwable e) {
                Loggers.REMOTE.error("Error occurs during connection check... ", e);
            }
        }
    }, 1000L, 3000L, TimeUnit.MILLISECONDS);

}

//注销(移出)连接方法
public synchronized void unregister(String connectionId) {
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
        Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }
}

移除connection后,继承ClientConnectionEventListener的ConnectionBasedClientManager会移除Client,发布ClientDisconnectEvent事件

@Override
public boolean clientDisconnected(String clientId) {
    Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
    ConnectionBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    client.release();
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
    return true;
}

ClientDisconnectEvent会触发几个事件:

1)Distro协议:同步移除的client数据

2)清除两个索引缓存:ClientServiceIndexesManager中Service与发布Client的关系;ServiceStorage中Service与Instance的关系

3)服务订阅:ClientDisconnectEvent会间接触发ServiceChangedEvent事件,将服务变更通知客户端。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Cloud Nacos源码讲解(五)- Nacos服务端健康检查 的相关文章

  • 01_Numpy的图片处理(读取,变换,保存)

    Numpy的图片处理 读取 变换 保存 使用Numpy的ndarray可以读取图片文件 并且可以对图片进行各种各样的处理 例如 图片像素值的读取 替换 随机剪裁 拼接等等都可以使用ndarray 对于已经习惯使用Numpy的人们来说 已经可
  • Springboot集成华为云OBS

    Springboot实现华为云对象存储OBS文件上传下载 文章目录 Springboot实现华为云对象存储OBS文件上传下载 前言 一 OBS是什么 二 使用步骤 1 引入依赖 2 HuaweiyunOss工具类 3 上传下载测试 总结 前
  • lua快速入门

    Lua语言简介 1993 年在巴西里约热内卢天主教大学 Pontifical Catholic University of Rio de Janeiro in Brazil 诞生了一门编程语言 发明者是该校的三位研究人员 他们给这门语言取了

随机推荐

  • pytorch dataset自定义_目标检测:SSD模型——pytorch数据载入及增广

    进行模型训练的第一步是载入数据 使用pytorch框架载入数据需要两个步骤 构建Dataset数据集和创建Dataloader数据迭代器 pytorch要载入数据训练SSD 可以直接调用 torchvision datasets VOCDe
  • 【arxiv】Few-Shot Text Generation with Pattern-Exploiting Training

    原文链接 https arxiv org pdf 2012 11926 pdf Abstract 为预先训练好的语言模型提供简单的任务描述或自然语言提示 可以在文本分类任务产生令人印象深刻的few shot结果 在本文中 我们表明了这个潜在
  • storm ui 启动失败,zooper重启解决方案

    给大家写了一个zk集群 如果你linux命令的基础部分 搭建zk集群是很简单的事情 zk集群搭建好了我们怎么连接zk 客户端 进行操作那 接下来就给大家演示如何使用zk shell 的使用 补充 登陆zooper客户端 删除操作 删除sto
  • 关于python报证书验证失败如何解决

    1 使用python访问https的时候返回错误 urllib error URLError
  • 利用Python实现四则运算

    利用Python实现四则运算 输入两个变量 根据类型判断他是否进行运算 若为a和b同时满足int或float类型时 则利用choice选择你所要进行的那种运算 若类型错误 输出其类型 a eval input 请输入a b eval inp
  • tensorflow 移植到android平台

    我的书 淘宝购买链接 当当购买链接 京东购买链接 本文基于 https github com MindorksOpenSource AndroidTensorFlowMachineLearningExample 下载和安装jdk ndk和s
  • shell脚本根据端口杀死进程(带完整解析)

    各位可以将下述内容当为学习Shell脚本 如果只是想要更方便地根据端口杀死进程 可以直接使用该方法 port是端口 kill 9 lsof ti port 在项目开发的时候 我们经常需要根据相对应的端口来杀死进程 而这样的操作最少需要两步
  • 直流无刷减速电机PID控制

    最近做了直流无刷减速电机的控制的项目 针对项目中遇到的问题做下总结 PID Control PID 代码 速度环 位置环 串级 STM32F407VET6 STM32CubeMX 更新记录 V1 0 0 2022 8 5 完善了RTOS程序
  • 怎么使用大疆无人机建模?

    倾斜摄影测量技术是国际测绘遥感领域近年发展起来的一项高新技术 以大范围 高精度 高清晰的方式全面感知复杂场景 通过高效的数据采集设备及专业的数据处理流程生成的数据成果直观反映地物的外观 位置 高度等属性 为真实效果和测绘级精度提供保证 同时
  • 说说学习python pycharm中踩的坑

    说说学习python pycharm中踩的坑 我真的很讨厌很痛苦这种啥都不懂 只能在黑暗中摸索的感觉 1 python 3 9 及以上版本是不支持win7的 2 要安装 python 2 7 和 python 3 8 这样才能在pychar
  • 反射、xml解析

    反射 反射就读取class文件 获取该文件中的属性 方法等 作用 用来获取指定路径下的class文件中所具备的的所有属性和方法 返回Class对象的方式之一 getClass 每一个引用数据类型都有一个getClass的方法 返回的是该类的
  • EXCEL的快速分列

    1 打开Excel并选择分列 选择智能分列 点击 2 选择手动设置分列 3 注意符号一定是英文要和你分列的数据内符号一致 4 点击下一步完成 效果如下
  • 数据链路层三个基本问题(封装成帧 、透明传输和差错检测 )

    文章目录 使用点对点信道的数据链路层 1 1 数据链路和帧 1 2 三个基本问题 1 封装成帧 2 透明传输 3 差错检测 循环冗余检验CRC 帧检验序列 FCS 接收端对收到的每一帧进行 CRC 检验 数据链路层使用的信道主要有以下两种类
  • linux suse设置中文系统

    Linux字符编码默认为UTF 8 如出现乱码可设置为GBK 1 手动更改profile文件的命令 vi etc profile 2 在文件的末尾添加以下两行命令 export LC ALL zh CN GBK export LANG zh
  • Happiness【2019EC Final G题】【模拟】

    题目链接 题意很长 先翻译一下 由N个参赛队伍 给出其余N 1只参赛队伍 另外一支队伍是我们 本次ICPC一共有10道题 我们知道其余N支队伍每道题的通过时间和错误次数 如果是 则为没有在300分钟内解决该问题 最后给出我们队伍 做出每道题
  • 窨井液位计(下水道液位计)的分类

    窨井液位计又称下水道液位计 是应用在市政管网监测集水井 雨水井 污水井 观察井等测量水位变化的仪表 根据原理不同可分为 压力式 雷达式和超声波式3种 通过传感器测量液位数值 利用无线远传的方式上传到数据平台 实现对井下水位实时监测的目的 压
  • yapi的安装

    Yapi的安装 Yapi是一款不错的接口管理软件 我主要用它来进行接口Mock Yapi安装所需环境 Node js 7 6 Mongodb 2 6 git 各环境安装地址 git https git scm com downloads N
  • F.softmax()的用法

    F softmax 的用法 gt gt gt import torch gt gt gt import torch nn functional as F gt gt gt logits torch rand 2 2 gt gt gt pre
  • C++函数返回引用

    注 C 有三种传递方式 值传递 指针传递 引用传递 返回 值 和返回 引用 是不同的 函数返回值时会产生一个临时变量作为函数返回值的副本 而返回引用时不会产生值的副本 既然是引用 那引用谁呢 这个问题必须清楚 否则将无法理解返回引用到底是个
  • Spring Cloud Nacos源码讲解(五)- Nacos服务端健康检查

    Nacos服务端健康检查 长连接 概念 长连接 指在一个连接上可以连续发送多个数据包 在连接保持期间 如果没有数据包发送 需要双方发链路检测包 注册中心客户端2 0之后使用gRPC代替http 会与服务端建立长连接 但仍然保留了对旧http