informer

2023-05-16

 list-watch机制

list-watch有两部分组成,分别是listwatchlist非常好理解,就是调用资源的list API罗列资源,基于HTTP短链接实现;watch则是调用资源的watch API监听资源变更事件,基于HTTP 长链接实现

Watch APIapiserver保持一个长链接,接收资源的状态变更事件并做相应处理。如果仅调用watch API,若某个时间点连接中断,就有可能导致消息丢失,所以需要通过list APIresourceversion(根据版本)解决消息丢失的问题。

watch失败后会自动调用list,list返回全量数据,每次watch失败都会relist。在大规模场景,如果所有client同时发生relist,那server肯定受不了。为了应对这种情况,提供了EtcdResync

List-watch的问题:

watch请求时http streaming方式,当服务端挂了后,客户端是感知不到网络断开,以为服务端没有数据,所以客户端不会主动关闭长连接。

处理:

1.Watch请求需要 带一个超时参数(默认是5-10min之间的随机数),时间一到,断开连接。

2.kube-proxy增加连接超时的参数

 

workqueue机制

client-go中抽象了几种队列,包括通用队列、限速队列、延时队列等

通用队列:防止两个相同类型的item同时处理

增加dirty set,当item正在处理,又来一个同类的item,为了保证不能同时有两个相同的item正在被处理,将后面的item加入dirty,等前面的item执行完,才将后面的item加入queue。

限速队列:防止短时间内处理失败的item再次处理,陷入循环

增加AddAfter方法, 可以允许用户告诉DelayingInterface过多长时间把该item加入队列中

延时队列:

根据元素错误次数逐渐累加等待时间,然后加入到延时队列

 

workqueue 提供的一个保障就是,如果是同一个object,比如同一个 pod,被多次加到 workqueue 里,在 dequeue 时,它只会出现一次。防止会有同一个 object 被多个 worker 同时处理。

普通队列workqueue:

防止 hot loop:它保证了一个 item 被 reenqueued 后,不会马上被处理

限速队列RateLimitingQueue,它相比普通的 workqueue 多了以下的功能:

  • 限流:可以限制一个 item 被 reenqueued 的次数。(维护了一个delaying_queue,并用heap将等待时间最小的item加入到 workqueue去处理)

  • 防止 hot loop:它保证了一个 item 被 reenqueued 后,不会马上被处理,(processing存在正在处理的item)

延时队列RateLimiter:

  • 根据元素错误次数逐渐累加等待时间,然后加入到延时队列


 

informer核心


 

0.kube-apiserver通过etcd自身的watch机制获取变化的资源,并将资源分类存入watchCache中,封装成pod/service Storage

0.1.客户端通过http长连接监听kube-apiserver的pod资源的变化,即从pod storage的缓存中获取kube-apiserver从etcd watch到的数据

1.ListWatch监听apiserver的资源变化,Lister一般用于首次获取某资源(如Pod)的全量信息,而Watcher用于持续获取该资源的增量变化信息
2.reflector使用listerWatcher获取资源,并将其保存在队列DeltaFIFO
3.DeltaFIFO是一个生产者-消费者队列,生产者为Reflector,消费者为Pop()函数,消费的数据一方面存储到Indexer中,另一方面可以通过informer的handler进行处理
4.Local Store是本地缓存,Indexer是含有索引能力的本地缓存,本质是能根据id、名字等key获取对象的map
5.workqueue用于保存DeltaFIFO中增量变化的对象,由worker将当前状态的对象处理成期望状态的对象。

*注:SharedInformer.Run启动了两个chan,s.c.Run为controller的入口,s.c.Run函数中会Pop DeltaFIFO中的元素,并根据DeltaFIFO的元素的类型(Sync/Added/Updated/Deleted)进两类处理,一类会使用indexer.Update,indexer,Add,indexer.Delete对保存的在Store中的数据进行处理;另一类会根据DeltaFIFO的元素的类型将其封装为sharedInformer内部类型updateNotification,addNotification,deleteNotification,传递给s.processor.Listeners.addCh,后续给注册的pl.handler处理。

 

informer工作流程


1.reflector通过ListWatch的list获取资源的全量信息,包括监听对象最新的resourceVersion。将获取的资源信息存入indexer
2.reflector通过ListWatch的watch监听对象resourceVersion之后的所有变化,并将变化存入DeltaFIFO。
3.Informer的controller不断地pop DeltaFIFO的变化,根据变化更新indexer对应的数据,根据变化调用回调函数handler去处理,即将变化的对象放入workqueue
4.controller不断地从workqueue取出对象,通过sync方法将对象从当前状态处理成期望状态。

 

使用informer创建controller

1.与kube-apiserver建立连接client

2.创建sharedInformerFactory对象informer,并关联要监听的资源

3.定义回调函数并关联到informer

4.启动informer,并等待资源信息同步到本地缓存

5.informer监听变更事件,并将事件存入workqueue,

6.controller从workqueue去除object,并通过调用sync方法将对象从当前状态处理成期望状态。

 

    1. Informer 首先会 list/watch apiserver,Informer 所使用的 Reflector 包负责与 apiserver 建立连接,Reflector 使用 ListAndWatch 的方法,会先从 apiserver 中 list 该资源的所有实例,list 会拿到该对象最新的 resourceVersion,然后使用 watch 方法监听该 resourceVersion 之后的所有变化,若中途出现异常,reflector 则会从断开的 resourceVersion 处重现尝试监听所有变化,一旦该对象的实例有创建、删除、更新动作,Reflector 都会收到"事件通知",这时,该事件及它对应的 API 对象这个组合,被称为增量(Delta),它会被放进 DeltaFIFO 中。
    1. Informer 会不断地从这个 DeltaFIFO 中读取增量,每拿出一个对象,Informer 就会判断这个增量的时间类型,然后创建或更新本地的缓存,也就是 store。
    1. 如果事件类型是 Added(添加对象),那么 Informer 会通过 Indexer 的库把这个增量里的 API 对象保存到本地的缓存中,并为它创建索引,若为删除操作,则在本地缓存中删除该对象。
    1. DeltaFIFO 再 pop 这个事件到 controller 中,controller 会调用事先注册的 ResourceEventHandler 回调函数进行处理。
    1. 在 ResourceEventHandler 回调函数中,其实只是做了一些很简单的过滤,然后将关心变更的 Object 放到 workqueue 里面。
    1. Controller 从 workqueue 里面取出 Object,启动一个 worker 来执行自己的业务逻辑,业务逻辑通常是计算目前集群的状态和用户希望达到的状态有多大的区别,然后孜孜不倦地让 apiserver 将状态演化到用户希望达到的状态,比如为 deployment 创建新的 pods,或者是扩容/缩容 deployment。
    1. 在worker中就可以使用 lister 来获取 resource,而不用频繁的访问 apiserver,因为 apiserver 中 resource 的变更都会反映到本地的 cache 中。

Informer 在使用时需要先初始化一个 InformerFactory,目前主要推荐使用的是 SharedInformerFactory,Shared 指的是在多个 Informer 中共享一个本地 cache,即共同使用一个SharedIndexer。

建议使用 RateLimitingQueue,它相比普通的 workqueue 多了以下的功能:

  • 限流:可以限制一个 item 被 reenqueued 的次数。
  • 防止 hot loop:它保证了一个 item 被 reenqueued 后,不会马上被处理。


 

参考:https://www.jianshu.com/p/1e2e686fe363

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

informer 的相关文章

  • Android 8.0 利用Settings.Global属性跨应用定义标志位

    需求 需要在不同应用中定义一个标志位 xff0c 这里介绍下系统级别的应用和非系统级别应用如何添加 当然这不一定是最好的办法 xff0c 因为不能够添加intent putExtra 属性 系统级别应用 在需要定义的地方使用 SystemP
  • k-近邻算法实现手写数字识别系统

    k 近邻算法实现手写数字识别系统 一 实验介绍 1 1 实验内容 本实验将会从电影题材分类的例子入手 xff0c 详细讲述k 近邻算法的原理 在这之后 xff0c 我们将会使用该算法实现手写数字识别系统 1 2 课程来源 本课程源自 图灵教
  • 调整eclipse控制台console的方法

    调整eclipse控制台console的方法 会把在用eclipse的过程中产生的问题和找到的解决方案记录一下 xff0c 以便之后再用到 今天在运行代码的时候 xff0c 突然控制台和代码并列了 然后百度了一下找到了方法 windows
  • Linux: 运行sh命令时command not found

    问题 xff1a 解决 xff1a 1 查看PATH变量 echo PATH 2 把查询出来的PATH放到sh文件中并导入
  • 修改git tag的描述信息

    今天手贱 xff0c 非要用TortoiseGit打tag xff0c 没用命令行 xff0c 结果这不是还没有学习么 xff0c 然后就出现问题了 不过好在是我自己的Toy代码 xff0c 那就看看如何解决吧 问题描述 使用Tortois
  • Linux 设置用户登录超时

    Linux 系统中使用SSH进行远程登录 xff0c 如果长时间不操作将自动注销用户的登录 原本以为在 etc ssh sshd config文件中配置 查了资料和测试只需要在shell环境变量中设置即可 span class hljs c
  • rime配置

    文件路径 AppData Rime 配置修改 default custom yaml span class hljs label customization span span class hljs label distribution c
  • matlab中(),[],与{}的区别认识

    转载自 http blog csdn net CV YOU article details 52873666 在matlab中 xff0c 常常会遇到 xff0c 和 这个3种符号怎么区分 xff0c 怎么用 xff0c 这里我来总结一下
  • WinServer2012 R2忘记密码的解决方案+远程连接另一种莫名其妙故障

    WinServer2012 R2忘记密码的解决方案 43 远程连接另一种莫名其妙故障 参考文章 xff1a xff08 1 xff09 WinServer2012 R2忘记密码的解决方案 43 远程连接另一种莫名其妙故障 xff08 2 x
  • 迅雷 应版权方要求,文件无法下载 解决方法

    迅雷 应版权方要求 xff0c 文件无法下载 解决方法 参考文章 xff1a xff08 1 xff09 迅雷 应版权方要求 xff0c 文件无法下载 解决方法 xff08 2 xff09 https www cnblogs com sui
  • redis集群搭建报错-(error) CLUSTERDOWN The cluster is down

    README 最近搭建一个redis集群 xff0c 参考博文 xff08 https www cnblogs com mafly p redis cluster html xff09 对集群配置后 xff0c master xff0c s
  • rabbitmq-通配符模式

    README 本文介绍 通配符模式 xff0c 及代码示例 1 intro to rabbitmq通配符模式 0 xff09 通配符模式 交换机类型为 Topic xff1b 1 xff09 与路由模式相比 xff0c 相同点是 两者都可以
  • springboot:BeanPostProcessor示例及分析

    README 1 xff0c 本文主要分析 BeanPostProcessor 的作用 xff0c 开发方式 xff1b 2 xff0c BeanPostProcessor 是bean后置处理器 xff0c 简而言之就是bean被创建好了
  • 字节数组转jsonobject(如读取HttpServletRequest.inputstream到jsonobject)

    README 本文po出了 如何读取 字节数组到jsonobject xff1b 字节数组如何获取 xff0c 本文不再赘述 xff1b 1 代码 64 Description 字节数组转json演示 64 author xiao tang
  • 中断屏蔽技术

    README 本文总结自bilibili 计算机组成原理 xff08 哈工大刘宏伟 xff09 的视频讲解 xff0c 非常棒 xff0c 墙裂推荐 xff1b 1 中断屏蔽 1 xff0c 中断屏蔽的意思是 xff0c 在中断1的服务程序
  • NIST BGP SRx的使用

    NIST BGPsec的使用 地址 xff1a https github com usnistgov NIST BGP SRx 推荐使用centos 7 安装依赖包 xff0c 就是CAT CONTENT grep requires下面那个

随机推荐