如何使用kafka-python订阅多个kafka通配符模式的列表?

2024-01-05

我使用带有通配符的模式订阅 Kafka,如下所示。通配符代表动态客户 ID。

consumer.subscribe(pattern='customer.*.validations')

这很有效,因为我可以从主题字符串中提取客户 ID。但现在我需要扩展功能来收听类似的主题,但目的略有不同。我们就这样称呼它吧customer.*.additional-validations。代码需要位于同一个项目中,因为共享了很多功能,但我需要能够根据队列类型采取不同的路径。

In the 卡夫卡文档 http://kafka-python.readthedocs.io/en/master/#kafkaconsumer我可以看到可以订阅一系列主题。然而,这些是硬编码的字符串。不是允许灵活性的模式。

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)

所以我想知道是否有可能以某种方式将两者结合起来?有点像这样(非工作):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])

在 KafkaConsumer 代码中,它支持主题列表或模式,

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717 https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

   def subscribe(self, topics=(), pattern=None, listener=None):
        """Subscribe to a list of topics, or a topic regex pattern
        Partitions will be dynamically assigned via a group coordinator.
        Topic subscriptions are not incremental: this list will replace the
        current assignment (if there is one).

所以你可以创建一个正则表达式,使用 OR 条件|,这应该作为订阅多个动态主题正则表达式,因为它内部使用re模块进行匹配。

(customer.*.validations)|(customer.*.additional-validations)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用kafka-python订阅多个kafka通配符模式的列表? 的相关文章

随机推荐