一行代码引发的"血案"

2023-05-16

昨天在使用pykafka的时候又遇到了之前我遇到过的PartitionOwnedError、ConsumerStoppedException异常,关于这个异常我之前写过一篇分析的文章(链接在这里),我自认为之前应该是把这个问题彻底解决了的,但是这次它又幽灵般的出现了,使我百思不得其解。
一、问题的出现
我在多台机器上面同时开启了多个进程来读写同一个topic,这个topic有5个partition,我想着开启5个进程来读写,这样可以提高速度。在测试过程中我发现会出现PartitionOwnedError、ConsumerStoppedException异常,这个问题之前我记得我通过参数rebalance_max_retries、rebalance_backoff_ms已经解决过了,而且我确保代码中这两个参数都没有变化过。在日志中我也发现进程确实是会重试rebalance_max_retries以后才会报出异常,下面是我摘取的部分日志:

[pykafka.balancedconsumer] [balancedconsumer.py:580] INFO: Unable to acquire partition <pykafka.partition.Partition at 0x7f9a0a5586d0 (id=4)>. Retrying
[pykafka.balancedconsumer] [balancedconsumer.py:580] INFO: Unable to acquire partition <pykafka.partition.Partition at 0x7f3320806710 (id=3)>. Retrying

但是经过rebalance_max_retries的重试以后就会抛出PartitionOwnedError异常,也就是说我这个consumer没有获取到分配给我的partition。
二、问题的排查
PartitionOwnedError异常抛出的原理性解释大家可以参考前面的文章,不再赘述。
此时我想到肯定是其它的原因导致的这个问题,但是我明明记得我对操作pykafka的代码没有做过什么改动啊,唯一的改动就是把consumer_timeout_ms这个参数改成了-1(读取永不超时),难道是这个原因导致的吗?但是我明明在之前也测试过多次啊,之前都没有发现这个问题啊,这个就让我很不理解了。
为了验证我的猜测,我还是把consumer_timeout_ms改成了5000(5s),然后问题就没有再出现了,也就是说确实是这一行代码导致的问题,但是这个还不能完全解答我另外的一个疑惑,就是为什么我之前的多次测试没有发现这个问题,偏偏是这次测试的时候出现了。
没办法,只能又开始由源码开刀了。
三、问题的真正原因
因为之前对pykafka的代码有过一些了解,所以这次读起来就相对比较简单了,我理解每次当zookeeper上面的znode状态发生变化,kafka都会执行相应的rebalance,如下的代码就是实现这个功能:

    def _set_watches(self):
        """Set watches in zookeeper that will trigger rebalances.

        Rebalances should be triggered whenever a broker, topic, or consumer
        znode is changed in zookeeper. This ensures that the balance of the
        consumer group remains up-to-date with the current state of the
        cluster.
        """
        proxy = weakref.proxy(self)
        _brokers_changed = self._build_watch_callback(BalancedConsumer._brokers_changed, proxy)
        _topics_changed = self._build_watch_callback(BalancedConsumer._topics_changed, proxy)
        _consumers_changed = self._build_watch_callback(BalancedConsumer._consumers_changed, proxy)

        self._setting_watches = True
        # Set all our watches and then rebalance
        broker_path = '/brokers/ids'
        try:
            self._broker_watcher = ChildrenWatch(
                self._zookeeper, broker_path,
                _brokers_changed
            )   
        except NoNodeException:
            raise Exception(
                'The broker_path "%s" does not exist in your '
                'ZooKeeper cluster -- is your Kafka cluster running?'
                % broker_path)

        self._topics_watcher = ChildrenWatch(
            self._zookeeper,
            '/brokers/topics',
            _topics_changed
        )   

        self._consumer_watcher = ChildrenWatch(
            self._zookeeper, self._consumer_id_path,
            _consumers_changed
        )   
        self._setting_watches = False

代码逻辑比较简单,就是设置三个watcher函数,一旦对应的znode状态发生变化就执行相应的callback,这个也是为什么当一个consumer加入以后会分配到partition的原因,当我们新增加一个consumer的时候就会触发_consumers_changed这个函数,这个函数的逻辑也很简单:

    @_catch_thread_exception
    def _consumers_changed(self, consumers):                                                                                                             
        if not self._running:
            return False  # `False` tells ChildrenWatch to disable this watch
        if self._setting_watches:
            return
        log.debug("Rebalance triggered by consumer change ({})".format(
            self._consumer_id))
        self._rebalance()

就是执行_rebalance()函数,也就是触发了kafka的rebalance过程。
代码读到这里的时候我们能够发现很有可能就是之前开启的consumer进程没有执行rebalance过程,导致后面新加入的consumer进程一直获取不到partition,接着我们到_rebalance()函数一看究竟:


    def _rebalance(self):                                                                                                                                
        """Start the rebalancing process for this consumer

        This method is called whenever a zookeeper watch is triggered.
        """
        if self._consumer is not None:
            self.commit_offsets()
        # this is necessary because we can't stop() while the lock is held
        # (it's not an RLock)
        with self._rebalancing_lock:
            if not self._running:
                raise ConsumerStoppedException
            log.info('Rebalancing consumer "%s" for topic "%s".' % (
                self._consumer_id, self._topic.name))
            self._update_member_assignment()

函数逻辑也比较简单,之前的文章其实也分析过这个调用过程,真正的rebalance是在_update_member_assignment()函数中执行的,但是在这个函数之前有一行with self._rebalancing_lock,也就是执行rebalance之前要获得_rebalancing_lock锁,此时我能确认就是这个锁没有获取到导致的问题,也就是说其它地方把这个锁一直acquire了,没有释放,那么接下来就看看还有其它哪些函数会用到这个锁呢。
grep一遍源码你马上就会发现consume()函数会用到这个锁,代码如下:

    def consume(self, block=True):
        """Get one message from the consumer

        :param block: Whether to block while waiting for a message
        :type block: bool
        """

        def consumer_timed_out():
            """Indicates whether the consumer has received messages recently"""
            if self._consumer_timeout_ms == -1:
                return False
            disp = (time.time() - self._last_message_time) * 1000.0
            return disp > self._consumer_timeout_ms
        message = None
        self._last_message_time = time.time()
        while message is None and not consumer_timed_out():
            self._raise_worker_exceptions()
            try:
                # acquire the lock to ensure that we don't start trying to consume from
                # a _consumer that might soon be replaced by an in-progress rebalance
                with self._rebalancing_lock:                                                                                                             
                    message = self._consumer.consume(block=block)
            except (ConsumerStoppedException, AttributeError):
                if not self._running:
                    raise ConsumerStoppedException
                continue
            if message:
                self._last_message_time = time.time()
            if not block:
                return message
        return message

函数里面定义了一个超时函数consumer_timed_out()之前我代码是把_consumer_timeout_ms设置成了-1,那么这个函数就会返回False,此时就会进入while循环中获取到了_rebalancing_lock锁,接着就开始消费队列,self._consumer本质是一个SimpleConsumerorRdKafkaSimpleConsumer(如果设置了use_rdkafka参数),我们在BalanceConsumer构造函数中传入的consumer_timeout_ms也会传给对应的SimpleConsumer,所以如果我们设置的是-1(永不超时)那么这代码就会一直不返回,除非有消费到数据。
到这一步就基本解释了前面的疑惑,如果设置consumer_timeout_ms = -1那么consume()就会一直占有_rebalancing_lock锁,当新的consumer加入的时候之前的consumer本来应该执行rebalance操作的,但是又因为_rebalancing_lock锁一直没有获取到,所以就一直阻塞在那里,等到新加入的consumer重试了rebalance_max_retries次以后就会因为获取不到partition而抛出PartitionOwnedError异常。
这里也解释了为什么我之前没法遇到这个问题,因为我之前的队列一直都有数据,所以consume()每次都能及时的返回然后释放_rebalancing_lock锁。
四、如何解决问题
找到原因要解决就好办了,最简单的方式就是把consumer_timeout_ms设置成一个非-1的值,如我之前设置的5000ms。
但是我觉得这应该算是pykafka的一个bug,我已经在github提了一个issue。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

一行代码引发的"血案" 的相关文章

随机推荐

  • CMAKE基础使用

    1 目录结构 xff1a 2 顶层cmake内容 xff1a span class token function cmake minimum required span span class token punctuation span V
  • URL格式

    一 URL基本格式 一个完整的url包含方案 用户名 密码 主机名 端口 路径 参数 查询和片段 xff0c 格式如下 xff1a lt scheme gt lt user gt lt password gt 64 lt host gt l
  • __IO uint16_t

    STM32里的类型定义 xff0c 见如下说明 xff1a typedef volatile unsigned short vu16 typedef IO uint16 t vu16 IO definitions access restri
  • 串口波形分析(TTL,RS232,RS485)

    TTL xff0c RS232 xff0c RS485波形分析 本文转自 xff1a http blog 163 com qiu zhi2008 blog static 60140977201092651854445 http www cn
  • Java数字类型转byte数组

    文章目录 方法1 自己写int转byte数组byte数组转int参考 xff1a https blog csdn net qq 41054313 article details 88424454 方法2 使用java NIO包的功能int转
  • 头文件和库函数的区别

    1 头文件中有函数的申明 xff0c 库文件实现函数的定义 比如 xff0c printf函数 使用时应包括stdio h xff0c 打开stdio h你只能看到 xff0c printf这 个函数的申明 却看不到printf具体是怎么实
  • C语言--字符串的截取

    今天碰到了一个字符串截取的功能实现问题 xff0c 比较常见所以就做下记录 一般的实现是这样的 xff1a include lt stdio h gt include lt string h gt int main void char de
  • 使用 JWT 让你的 RESTful API 更安全

    传统的 cookie session 机制可以保证的接口安全 xff0c 在没有通过认证的情况下会跳转至登入界面或者调用失败 在如今 RESTful 化的 API 接口下 xff0c cookie session 已经不能很好发挥其余热保护
  • CAN报文解析—案例

    1 CAN报文定义 CAN报文是指发送单元向接受单元传送数据的帧 我们通常所说的CAN报文是指在CAN线 xff08 内部CAN 整车CAN 充电CAN xff09 上利用ECU和CAN卡接收到的十六进制报文 2 CAN协议中CAN报文种类
  • 单片机中,intrins.h头文件中各函数详解:空指令_nop_(),移位函数_crol_、_cror_

    intrins h 在 C51单片机编程中 xff0c 我们经常使用到 nop 延时一个机器周期 如果晶振是12M xff0c 则延时1 us xff0c 该空函数在头文件intrins h中 头文件 INTRINS H 中的函数使用很方便
  • Linux 下模拟Http 的get or post请求(curl和wget两种方法)

    一 get请求 xff1a 1 使用curl命令 xff1a curl 34 http www baidu com 34 如果这里的URL指向的是一个文件或者一幅图都可以直接下载到本地 curl i 34 http www baidu co
  • QT入门基础认知(三个常用类、三种对话框类型、信号和槽)

    1 简单介绍 xff1a 1 1 三个常用类 xff08 Qwidget类 QDialog类 QMainwindow类 xff09 Qwidget类 xff1a 继承与QObject类和QPaintdevice类 xff0c 所有用户界面对
  • socket函数的domain、type、protocol解析

    socket 函数的 domain type protocol 解析 lxg 64 2015 04 09 内核中的 socket 概览 图一 xff1a socket 概览 内核中套接字是一层一层进行抽象展示的 xff0c 把共性的东西抽取
  • code的用法

    今天写程序的时候用了const xff0c 想到之前遇到的code的用法 xff0c 那是第一次遇到code的那样的用法 xff0c 查了一下 xff0c 解释如下 xff1a 在单片机使用C语言进行编程的时候 xff0c 经常使用到cod
  • Linux的system()和popen()差异

    Linux的system 和popen 差异 1 system 和popen 简介 在linux中我们可以通过system 来执行一个shell命令 xff0c popen 也是执行shell命令并且通过管道和shell命令进行通信 sys
  • HTTP的303、307状态码

    之前在 http权威指南 中看到了HTTP的307状态码 xff0c 当时因为没有找到可以实验的网站所以没有比较深的印象 xff0c 今天在排查一个问题的时候恰巧遇到了HTTP 1 1 307 TemporaryRedirect xff0c
  • 如果获得页面跳转的最终URL

    最近做一个小功能 xff0c 就是获取一个页面经过跳转后的最终页面URL xff0c 这里的跳转方式包含但不仅限于HTTP 301 302 js meta refresh 下面是我想到的三种可能的解决方式 xff0c 可能会有更优的方法 x
  • pykafka的NoBrokersAvailableError原因

    今天在使用pykafka的时候遇到一个问题 xff0c 我的kafka和zookeeper运行在一台机器上面 xff0c 然后应用程序跑在另外一台机器上面 当我调用pykafka中的KafkaClient zookeeper host 61
  • pyspark的pickle.PicklingError

    今天在用pyspark的时候在一个类中调用rdd的map的时候报错 xff0c 代码如下 xff1a rdd 61 df filter size df emission gt span class hljs number 50 span r
  • 一行代码引发的"血案"

    昨天在使用pykafka的时候又遇到了之前我遇到过的PartitionOwnedError ConsumerStoppedException异常 xff0c 关于这个异常我之前写过一篇分析的文章 链接在这里 xff0c 我自认为之前应该是把