我看到 spring Kafka 代码,我有一些疑问:
如果我们使用 1 个 @kafkaListener 和 2 个主题,那么 spring Kafka 将创建一个 MessageListenerContainer。如果我为每个主题使用单独的 @kafkaListener ,那么将创建 2 个 MessageListenerContainer 。
MessageListenerContainer 是消费者的意思吗?
如果我在 ConcurrentKafkaListenerContainerFactory 中将并发数设置为 4,那么这意味着对于每个 kafkaListener,我使用代理打开 4 个线程?这意味着协调员将他们视为 4 个不同的消费者。
轮询如何与 kafkaListener 一起使用?它每次只从broker那里获取1个ConsumerRecord吗?
请帮忙。
有两种实现方式MessageListenerContainer
- the KafkaMessageListenerContainer
(KMLC)和ConcurrentMessageListenerContainer
(CMLC)。
CMLC 只是一个或多个 KMLC 的包装,其中 KMLC 的数量由concurrency
.
@KafkaListener
始终使用 CMLC。
每个 KMLC 获得一个Consumer
(和一个线程)。线程不断poll()
是消费者,具有指定的pollTimeout
.
主题/分区如何在 KMLC 之间分布取决于
- 主题有多少个分区
- 消费者的
partition.assignment.strategy
财产
如果您有多个主题,其分区数少于并发数,则可能需要备用分区分配器,例如循环分配器,否则您将拥有没有分配的空闲容器。
- 那是对的;如果您明确希望每个主题使用不同的容器,则可以提供多个
@KafkaListener
同一方法上的注释。
- 请参阅上面我的解释。
- 这是正确的——这是与 Kafka 实现并发的唯一方法(无需添加非常复杂的逻辑来管理偏移量)。
- 每次轮询返回的记录数取决于多个消费者属性,
max.poll.records
, fetch.min.bytes
, fetch.max.wait.ms
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)