etcd学习和实战:4、Java使用etcd实现服务发现和管理

2023-10-27

etcd学习和实战:4、Java使用etcd实现服务发现和管理


1. 前言

Java一般使用zookeeper来实现分布式系统下服务管理,zookeeper也具备key-value的存取功能,这里我们不讨论zookeeper和etcd的优劣,只提一下对于Java实现类似功能可能也有zookeeper这样的方案。

同样分为服务注册和发现两大部分,思路和go实现时相同,所以直接上代码并进行测试即可。

2. 代码

使用了jetcd,目前还是beta版本,但是目前似乎只有这个支持etcd v3版本。

参考自:
https://github.com/etcd-io/jetcd/tree/master/jetcd-examples
https://xinchen.blog.csdn.net/article/details/115434576
https://www.jianshu.com/p/bd7eed1f250c

2.1 服务注册

  • 设置endpoints(端点)并创建etcd客户端
  • 设置租约注册服务并绑定
  • 持续监听租约

Register.java:

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Response;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.options.PutOption;
import io.grpc.stub.CallStreamObserver;

import static com.google.common.base.Charsets.UTF_8;

public class Register {
    private Client client;
    private String endpoints;
    private Object lock = new Object();


    public Register(String endpoints) {
        super();
        this.endpoints = endpoints;
    }

    /**
     * 新建key-value客户端实例
     * @return
     */
    private KV getKVClient(){

        if (null==client) {
            synchronized (lock) {
                if (null==client) {

                    client = Client.builder().endpoints(endpoints.split(",")).build();
                }
            }
        }

        return client.getKVClient();
    }

    public void close() {
        client.close();
        client = null;
    }

    public Response.Header put(String key, String value) throws Exception {
        return getKVClient().put(bytesOf(key), bytesOf(value)).get().getHeader();
    }

    /**
     * 将字符串转为客户端所需的ByteSequence实例
     * @param val
     * @return
     */
    public static ByteSequence bytesOf(String val) {
        return ByteSequence.from(val, UTF_8);
    }

    private Client getClient() {
        if (null==client) {
            synchronized (lock) {
                if (null==client) {
                    client = Client.builder().endpoints(endpoints.split(",")).build();
                }
            }
        }

        return client;
    }

    public void putWithLease(String key, String value) throws Exception {
        Lease leaseClient = getClient().getLeaseClient();

        leaseClient.grant(60).thenAccept(result -> {
            // 租约ID
            long leaseId = result.getID();

            // 准备好put操作的client
            KV kvClient = getClient().getKVClient();

            // put操作时的可选项,在这里指定租约ID
            PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();

            // put操作
            kvClient.put(bytesOf(key), bytesOf(value), putOption)
                    .thenAccept(putResponse -> {
                        // put操作完成后,再设置无限续租的操作
                        leaseClient.keepAlive(leaseId, new CallStreamObserver<LeaseKeepAliveResponse>() {
                            @Override
                            public boolean isReady() {
                                return false;
                            }

                            @Override
                            public void setOnReadyHandler(Runnable onReadyHandler) {

                            }

                            @Override
                            public void disableAutoInboundFlowControl() {

                            }

                            @Override
                            public void request(int count) {
                            }

                            @Override
                            public void setMessageCompression(boolean enable) {

                            }

                            /**
                             * 每次续租操作完成后,该方法都会被调用
                             * @param value
                             */
                            @Override
                            public void onNext(LeaseKeepAliveResponse value) {
                                System.out.println("续租完成");
                            }

                            @Override
                            public void onError(Throwable t) {
                                System.out.println(t);
                            }

                            @Override
                            public void onCompleted() {
                            }
                        });
                    });
            });
    }
}

RegisterTest.java:


public class RegisterTest {
    public static void main(String[] args) {
        Register register = new Register("http://localhost:2379");
        String key = "/web/node0";
        String value = "localhost:7999";
//        try {
//            register.put(key, value);
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
        try {
            register.putWithLease(key, value);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.2 服务发现

  • 设置endpoints创建etcd客户端
  • 初始化配置并监听服务前缀(实际上也可以直接监听key,但不够灵活,监听前缀值更好一些)
  • 根据监听到的对key的操作类型进行进一步处理

Discovery.java:

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Response;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.Watch.Watcher;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;

import static java.nio.charset.StandardCharsets.UTF_8;

public class Discovery {
    private Client client;
    private String endpoints;
    private final Object lock = new Object();
    private HashMap<String, String> serverList = new HashMap<String, String>();

    /**
     * 发现服务类信息初始化
     * @param endpoints:监听端点,包含ip和端口,如:"http://localhost:2379“,多个端点则使用逗号分割, 比如:”http://localhost:2379,http://192.168.2.1:2330“
     */
    public Discovery(String endpoints) {
        this.endpoints = endpoints;
        newServiceDiscovery();
    }

    public Client newServiceDiscovery() {
        if (null == client) {
            synchronized (lock) {
                if (null == client) {
                    client = Client.builder().endpoints(endpoints.split(",")).build();
                }
            }
        }

        return client;
    }

    public void watchService(String prefixAddress) {
        //请求当前前缀
        CompletableFuture<GetResponse> getResponseCompletableFuture =
                client.getKVClient().get(ByteSequence.from(prefixAddress,
                        UTF_8),
                        GetOption.newBuilder().withPrefix(ByteSequence.from(prefixAddress, UTF_8)).build());

        try {
            //获取当前前缀下的服务并存储
            List<KeyValue> kvs = getResponseCompletableFuture.get().getKvs();
            for (KeyValue kv : kvs) {
                setServerList(kv.getKey().toString(UTF_8), kv.getValue().toString(UTF_8));
            }

            //创建线程监听前缀
            new Thread(new Runnable() {

                @Override
                public void run() {
                    watcher(prefixAddress);
                }
            }).start();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    private void watcher(String prefixAddress) {
        Watcher watcher;

        System.out.println("watching prefix:" + prefixAddress);
        WatchOption watchOpts = WatchOption.newBuilder().withPrefix(ByteSequence.from(prefixAddress,
                UTF_8)).build();

        //实例化一个监听对象,当监听的key发生变化时会被调用
        Watch.Listener listener = Watch.listener(watchResponse -> {
            watchResponse.getEvents().forEach(watchEvent -> {
                WatchEvent.EventType eventType = watchEvent.getEventType();
                KeyValue keyValue = watchEvent.getKeyValue();
                System.out.println("type="+eventType+",key="+keyValue.getKey().toString(UTF_8)+",value="+keyValue.getValue().toString(UTF_8));

                switch (eventType) {
                    case PUT:  //修改或者新增
                        setServerList(keyValue.getKey().toString(UTF_8), keyValue.getValue().toString(UTF_8));
                        break;
                    case DELETE: //删除
                        delServerList(keyValue.getKey().toString(UTF_8), keyValue.getValue().toString(UTF_8));
                        break;
                }
            });
        });

        client.getWatchClient().watch(ByteSequence.from(prefixAddress, UTF_8), watchOpts,
                listener);
    }

    private void setServerList(String key, String value) {
        synchronized (lock) {
            serverList.put(key, value);
            System.out.println("put key:" + key + ",value:" + value);
        }
    }

    private void delServerList(String key, String value) {
        synchronized (lock) {
            serverList.remove(key);
            System.out.println("del key:" + key);
        }
    }

    public void close() {
        client.close();
        client = null;
    }
}

DiscoveryTest.java:

public class DiscoveryTest {
    public static void main(String[] args) {
        String endpoints = "http://localhost:2379";
        Discovery ser = new Discovery(endpoints);
        ser.watchService("/web/");
        ser.watchService("/grpc/");
        while (true) {

        }
    }
}

2.3 运行结果

//先运行服务发现
$ java -jar discovery.jar 
put key:/web/node0,value:localhost:7999
watching prefix:/web/
watching prefix:/grpc/
type=PUT,key=/web/node0,value=localhost:7999
put key:/web/node0,value:localhost:7999
...

//再运行服务注册
$ java -jar register.jar 
续租完成
续租完成
续租完成
续租完成
...

2.4 问题

六月 07, 2021 4:53:24 下午 io.grpc.internal.ManagedChannelImpl$NameResolverListener handleErrorInSyncContext
警告: [Channel<1>: (etcd)] Failed to resolve name. status=Status{code=NOT_FOUND, description=null, cause=null}
...

这个是endpoints错误导致的,需要在前面添加http://,即"http://localhost:2379"而不是"localhost:2379"。

3. 最后

实际生产环境中目前etcd+grpc更适合Go,java目前仅有jetcd支持gRPC,更多的还是v2版本,使用的gRPC的版本也比较底,还处于beat版本。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

etcd学习和实战:4、Java使用etcd实现服务发现和管理 的相关文章

  • Objective-C 相当于 Java 枚举或“静态最终”对象

    我试图找到一个与 Java 枚举类型或 public static final 对象等效的 Objective C 例如 public enum MyEnum private String str private int val FOO f
  • 可序列化对象的 ArrayList 的加密保存和解密加载

    我在 SD 卡中保存并加载一个文件 其中包含ArrayList使用这两种方法的可序列化对象 保存方法 public static void saveUserList ArrayList
  • 如何打印JTable中选定的行

    我尝试使用主 JTable 的选定行和相同的头和单元格渲染来创建临时 JTable 但是当我尝试打印它时 我只得到一个带有线边框的空矩形 我在如何打印 JTable 的特定行 列 https stackoverflow com questi
  • 配置 Eclipse 将 App Engine 类预先捆绑到单个 JAR 中以加快预热速度

    在与另一家同样使用 App Engine 的公司的同事进行讨论后 他告诉我 他通过以下步骤成功地将应用程序预热时间从约 15 秒缩短到约 5 秒 配置 Eclipse 将编译过程中生成的类捆绑到单个 JAR 文件中 配置 Eclipse 以
  • Java 8 可选

    我想检查特定对象大小是否大于 0 如果它大于 0 那么我想创建一个可选对象 如果不是 那么我想返回一个可选的空对象 这是java代码的长版本 if fooA size gt 0 return Optional of new Foo else
  • Java 比 Xmx 参数消耗更多内存

    我有一个非常简单的 Web 服务器类 基于 Java SEHttpServer class 当我使用此命令启动编译的类来限制内存使用时 java Xmx5m Xss5m Xrs Xint Xbatch Test 现在如果我使用检查内存top
  • Maven项目中的HDF5

    我正在尝试将 hdf hdf5lib H5 导入到 NetBeans 中的 Maven 项目中 它有这个作为导入行 import hdf hdf5lib H5 正如这里所建议的 https support hdfgroup org prod
  • 将更改(永久)保存在数组列表中?

    那可能吗 例如 用户将新的项目 元素添加到数组列表 缓冲读取器进程 中 并且肯定会发生更改 我的问题是 即使用户多次更改数组列表 它也可能会永久存在 即使他们关闭程序并再次打开它 它也会一直存在 注意 不使用 txt 很抱歉问这样的问题 但
  • 内部/匿名类的最佳实践[关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 匿名类和静态内部类的最佳实践 设计和性能方面 是什么 就我个人而言 我认为静态内部类提供了更好的封装 并且应该提供更好的性能 因为它们无法访问类
  • 图像在 3D 空间中绕 Y 轴旋转

    我有一个 BufferedImage 我想用 theta 角而不是仿射变换绕 Java 中的 Y 轴旋转图像 图片 旋转将如下图所示 矩形将是图像 我可以通过旋转图像的每个像素并绘制图像来做到这一点 因为我必须旋转很多图像 所以我认为这不是
  • TableModel setCellEditable 并自动将值设置回 false

    我目前正在尝试在 JTable 中实现 JPopupMenu 它允许解锁单元格以进行编辑 Override public void actionPerformed ActionEvent e if e getActionCommand Un
  • 在 alpine / Jprofile 10 中运行 jpenable 时出现 UnsatisfiedLinkError

    当运行 jpenable 以允许在运行 JDK 8 的 alpine 3 3 容器中对 Jprofiler10 进行分析时 我收到 UnsatisfiedLinkError 异常 有任何想法吗 ERROR The agent could n
  • 如何使用Gson将JSONArray转换为List?

    在我的 Android 项目中 我试图将收到的 JSONArray 转换为列表 在 的帮助下这个答案 https stackoverflow com questions 8371274 how to parse json array in
  • Hibernate更新查询问题

    对于此更新查询 update TestDB dbo MyEmp set empname where empid 我在 DAO 课上写的 MyEmployee myEmployee new MyEmployee MyEmployee myEm
  • 相对重力

    我最近开始使用jMonkey引擎 这非常好 但我在尝试实现相对重力时陷入了困境 我想让行星彼此围绕轨道运行 不一定是完美的圆形轨道 取决于速度 所以每个对象都应该影响其他对象 我现在拥有的 关闭全球重力 bulletAppState get
  • String.intern() 线程安全吗

    我想在Java中使用 String intern 来节省内存 对具有相同内容的字符串使用内部池 我从不同的线程调用这个方法 这是个问题吗 对你的问题的简短回答是肯定的 它是线程安全的 但是 您可能需要重新考虑使用此工具来减少内存消耗 原因是
  • 如何在列表视图中选择时启用视频序列自动播放?

    大家好 有人可以与我分享一下我如何编写我的 viewvideo java 类 以便它允许自动播放视频功能 自动排序在列表视图中播放所选视频的任务 从当前位置到最新录制的视频 按顺序直到最新的视频播放完毕 这类似于 YouTube 自动播放功
  • 找出对象列表中是否包含具有指定字段值的内容?

    我有一个从数据库收到的 DTO 列表 它们有一个 ID 我想确保我的列表包含具有指定 ID 的对象 显然 在这种情况下创建具有预期字段的对象不会有帮助 因为 contains 调用 Object equals 并且它们不会相等 我想出了这样
  • 与手动搜索列表相比,Collections.binarySearch 的性能如何?

    我想知道该使用哪一个 我有一份学生名单 我想用他的名字搜索一个学生 到目前为止 我是通过迭代列表手动完成的 如下所示 for int i 0 i lt list size i Student student list get i if st
  • 如何将元素添加到通用集合

    我想知道如何将专用对象添加到通用集合中 我正在使用以下代码 Collection

随机推荐

  • Quartus ii调试工具之SignalProbe

    下图是quartusii handbook 给出的6个片上调试工具 列出了各自的用途 其中SignalProbe是Quartus ii提供的一个通过外部设备探测FPGA内部信号的一个工具 即把FPGA内部需要探测的信号连接到没有用到的IO管
  • 论文阅读——A Comprehensive Study on Deep Learning-Based 3D Hand Pose Estimation Methods综述阅读2

    3D手势姿态估计综述 本文通过对大量有代表性的论文研究 提出一种基于输入数据模式的新分类法 即RGB 深度或多模态信息 最后 我们展示了在最流行的RGB和基于深度的数据集上的结果 并讨论了这一快速增长领域的潜在研究方向 1 Introduc
  • shiro多realm的spring-boot案例剖析

    shiro多realm整合的spring boot案例剖析 概述 shiro认证的流程主要是通过securityManager调用login Subject subject AuthenticationToken token 方法 实际上委
  • 计算机网络原理 谢希仁(第8版)第四章习题答案

    4 01 网络层向上提供的服务有哪两种 试比较其优缺点 面向连接的和无连接 面向连接优点 通过虚电路发送分组 分组只用填写虚电路编号 分组开销较小 分组按序达到终点 面向连接缺点 一个节点出故障 所有通过该节点的虚电路均不能工作 可靠通信交
  • 什么是JavaBean、bean? 什么是POJO、PO、DTO、VO、BO ? 什么是EJB、EntityBean?

    前言 在Java开发中经常遇到这些概念问题 有的可能理解混淆 有的可能理解不到位 特此花了很多时间理顺了这些概念 不过有些概念实际开发中并没有使用到 可能理解还不够准确 只能靠后续不断纠正了 1 什么是POJO POJO Plain Old
  • RPC 技术及其框架 Sekiro 在爬虫逆向中的应用,加密数据一把梭

    文章目录 什么是 RPC JSRPC Sekiro 优缺点 什么是 RPC RPC 英文 RangPaCong 中文让爬虫 旨在为爬虫开路 秒杀一切 让爬虫畅通无阻 开个玩笑 实际上 RPC 为远程过程调用 全称 Remote Proced
  • LeetCode——036

    Valid Sudoku My Submissions QuestionEditorial Solution Total Accepted 71051 Total Submissions 233215 Difficulty Easy Det
  • AI 大行其道,你准备好了吗?—谨送给徘徊于转行 AI 的程序员

    前言 近年来 随着 Google 的 AlphaGo 打败韩国围棋棋手李世乭之后 机器学习尤其是深度学习的热潮席卷了整个 IT 界 所有的互联网公司 尤其是 Google 微软 百度 腾讯等巨头 无不在布局人工智能技术和市场 百度 腾讯 阿
  • 学习Javascript闭包(Closure)[非常棒的文章]

    作者 阮一峰 日期 2009年8月30日 闭包 closure 是Javascript语言的一个难点 也是它的特色 很多高级应用都要依靠闭包实现 下面就是我的学习笔记 对于Javascript初学者应该是很有用的 一 变量的作用域 要理解闭
  • 关于论青少年尽早学少儿编程之说

    关于论青少年尽早学少儿编程之说 正如一本书中所描述的一句话 尽早学习编程 是孩子为未来做好准备必不可少的一步 看完这句话之后 给我们的直观印象可能就是 不教孩子学习编程在某种程度上等于不教他们读书写字 这种说法明显是片面的 编程 读书写字
  • 若依系统注册功能

    加油 三步实现注册 前端 后端 分配角色 总结 前端 login vue中打开注册开关 后端 打开数据库sys config表 开启注册功能 分配角色 在SysUserMapper中添加方法 实现方法 在SysUserServiceImpl
  • dialog中二维码显示问题

    由于dialog加载过程会耗费一定时间 因此在dialog中直接调用会导致在一次打开的dialog无法加载二维码 在dialog标签中加入 opened ShowQRCode 属性 opened是dialog动画打开完毕之后的回调 当页面加
  • 计算机网络层提供的面向连接服务还是无连接服务讨论与思考

    概要 在计算机网络领域 网络层应该向运输层提供怎样的服务 面向连接 还是 无连接 曾引起了长期的争论 争论焦点的实质就是 在计算机通信中 可靠交付应当由谁来负责 是网络还是端系统 介绍 有些人认为应当借助于电信网的成功经验 让网络负责可靠交
  • 计算机主机名与用户名区别

    一 主机名概念 主机名就是计算机的名字 计算机名 网上邻居就是根据主机名来识别的 这个名字可以随时更改 从我的电脑属性的计算机名就可更改 用户登陆时候用的是操作系统的个人用户帐号 这个也可以更改 从控制面板的用户界面里改就可以了 这个用户名
  • 1. Inna and Pink Pony

    1 Inna and Pink Pony 首先找出四个边界点 但要注意当横纵坐标等于边界横纵坐标时 需考虑是否会出界 满足以上条件时 考虑横纵坐标移动次数其和为偶数时便可以完成移动 因为正负抵消原则 话不多说 直接上Python代码 n m
  • 解决 CommandNotFoundError: Your shell has not been properly configured to use ‘conda activate’问题

    针对使用conda进入虚拟环境时遇到的问题 CommandNotFoundError Your shell has not been properly configured to use conda activate 解决方法 win r
  • 解决Android中使用RecyclerView滑动时底部item显示不全的问题

    感觉这个bug是不是因人而异啊 找了很多文章都没能解决我的问题 包括在RecyclerView上在嵌套上一层RelativeLayout 添加属性android descendantFocusability blocksDescendant
  • 解决“L6200E Symbol xx defined (by xx.o and xx.o)”重复定义问题

    今天来分享一个关于自己之前遇到的一个问题 就是关于重复定义会造成的一个错误 错误提示为 OBJ LCD axf Error L6200E Symbol ascii 1206 multiply defined by lcd user o an
  • C语言每日一题:7.寻找数组中心下标。

    思路一 暴力求解 1 定义一个ps作为中间下标去记录下标值 2 循环下标ps从头到位 定义四个变量分别是left sum left right sum right 3 初始化left ps 1和right ps 1 当ps0 gt 就让su
  • etcd学习和实战:4、Java使用etcd实现服务发现和管理

    etcd学习和实战 4 Java使用etcd实现服务发现和管理 文章目录 etcd学习和实战 4 Java使用etcd实现服务发现和管理 1 前言 2 代码 2 1 服务注册 2 2 服务发现 2 3 运行结果 2 4 问题 3 最后 1