etcd学习和实战:3、go使用etcd实现服务发现和管理
1. 前言
根据我们上一节学习的gRPC服务和etcd结合的内容,我们已经了解到v3版本下gRPC服务和etcd结合的方法,主要就是用etcd客户端提供的gRPC解析器来创建要管理的服务端点的前缀用于监听是否存在新注册的服务端点,之后通过租约方式确认服务端点是否正常运行,长时间未续约则可能删除服务节点(真像租房啊,租客找房东租房并根据租约定期交房租,到期后不续约就删掉你这个租客.,etcd就是中介,这个中介不收钱提供免费平台接口给你用而且性能还挺好可太良心了)。
根据原理的话服务发现功能的实现就分为两大部分:1、使用etcd提供的客户端进行的服务监听和管理;2、服务端点启动时调用etcd接口注册服务并签订租约,此后定期续约即可。
2. 代码及编译运行问题总结
自:https://bingjian-zhu.github.io/2020/05/14/etcd实现服务发现/
2.1 服务注册
register.go:
- 设置endpoints(端点)并创建etcd客户端
- 设置租约注册服务并绑定
- 持续监听租约
package main
import (
"context"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
cli *clientv3.Client //etcd client
leaseID clientv3.LeaseID //租约ID
//租约keepalieve相应chan
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string //key
val string //value
}
//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
ser := &ServiceRegister{
cli: cli,
key: key,
val: val,
}
//申请租约设置时间keepalive
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//设置租约时间
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
//注册服务并绑定租约
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
//设置续租 定期发送需求请求
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
log.Println(s.leaseID)
s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("续约成功", leaseKeepResp)
}
log.Println("关闭续租")
}
// Close 注销服务
func (s *ServiceRegister) Close() error {
//撤销租约
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser, err := NewServiceRegister(endpoints, "/web/node1", "localhost:8000", 5)
if err != nil {
log.Fatalln(err)
}
//监听续租相应chan
go ser.ListenLeaseRespChan()
select {
// case <-time.After(20 * time.Second):
// ser.Close()
}
}
2.2 服务发现
- 设置endpoints创建etcd客户端
- 初始化配置并监听服务前缀(实际上也可以直接监听key,但不够灵活,监听前缀值更好一些)
- 根据监听到的对key的操作类型进行进一步处理
discovery.go:
package main
import (
"context"
"log"
"sync"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
)
//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
cli *clientv3.Client //etcd client
serverList map[string]string //服务列表
lock sync.Mutex
}
//NewServiceDiscovery 新建发现服务
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
serverList: make(map[string]string),
}
}
//WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
//根据前缀获取现有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
//监视前缀,修改变更的server
go s.watcher(prefix)
return nil
}
//watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //修改或者新增
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: //删除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.lock.Lock()
defer s.lock.Unlock()
s.serverList[key] = string(val)
log.Println("put key :", key, "val:", val)
}
//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.serverList, key)
log.Println("del key:", key)
}
//GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
s.lock.Lock()
defer s.lock.Unlock()
addrs := make([]string, 0)
for _, v := range s.serverList {
addrs = append(addrs, v)
}
return addrs
}
//Close 关闭服务
func (s *ServiceDiscovery) Close() error {
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser := NewServiceDiscovery(endpoints)
defer ser.Close()
ser.WatchService("/web/")
ser.WatchService("/gRPC/")
for {
select {
case <-time.Tick(10 * time.Second):
log.Println(ser.GetServices())
}
}
}
2.3 问题
我们在编译运行过程中可能会出现如下问题:
go mod ini
go mod tidy
...
go: gitlab/zhengyao/etcd_grpc imports
go.etcd.io/etcd/clientv3 tested by
go.etcd.io/etcd/clientv3.test imports
github.com/coreos/etcd/auth imports
github.com/coreos/etcd/mvcc/backend imports
github.com/coreos/bbolt: github.com/coreos/bbolt@v1.3.6: parsing go.mod:
module declares its path as: go.etcd.io/bbolt
but was required as: github.com/coreos/bbolt
etcd中使用的bbolt和grpc版本冲突(包的相互依赖以及各个包不同版本的兼容性问题和库的相互依赖以及版本兼容问题类似)
解决方法:
参考自:https://blog.csdn.net/skh2015java/article/details/111060465
删除原来已生成得go.mod和go.sum
go mod init
go mod edit -replace github.com/coreos/bbolt@v1.3.4=go.etcd.io/bbolt@v1.3.4
go mod edit -replace google.golang.org/grpc@v1.38.0=google.golang.org/grpc@v1.26.0
go mod tidy
注意:上面改前的grpc的版本取决于你的etcd的版本,也就是报的错误中提到的grpc版本。
2.4 运行结果
#运行服务发现
$go run discovery.go
watching prefix:/web/ now...
watching prefix:/gRPC/ now...
...
put key : /web/node1 val:localhost:8000
[localhost:8000]
...
#另一个终端运行服务注册
$go run register.go
Put key:/web/node1 val:localhost:8000 success!
续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:29 raft_term:7
续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:29 raft_term:7
...