我正在为大容量高速分布式应用程序编写 Kafka Consumer。我只有一个主题,但收到的消息率非常高。拥有多个分区来服务更多消费者将适合此用例。最好的消费方式是拥有多个流读取器。根据文档或可用示例,ConsumerConnector 给出的 KafkaStream 数量基于主题数量。想知道如何[基于分区]获得多个 KafkaStream 读取器,以便我可以为每个流跨一个线程,或者在多个线程中从同一个 KafkaStream 读取将实现从多个分区的并发读取?
非常感谢任何见解。
想分享我从邮件列表中找到的内容:
您在主题映射中传递的数字控制将主题划分为多少个流。在你的例子中,如果你传入 1,所有 10 个分区的数据将被输入到 1 个流中。如果传入2,则2个流中的每一个都会从5个分区获取数据。如果你传入 11 个,其中 10 个将分别从 1 个分区获取数据,而 1 个流将什么也得不到。
通常,您需要在其自己的线程中迭代每个流。这是因为如果没有新事件,每个流都可能永远阻塞。
示例片段:
topicCount.put(msgTopic, new Integer(partitionCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = connector.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(msgTopic);
for (final KafkaStream stream : streams) {
ReadTask task = new ReadTask(stream, msgTopic);
task.addObserver(this.msgObserver);
tasks.add(task); executor.submit(task);
}
参考:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCA+sH[电子邮件受保护]%3E http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCA+sHyy_Z903dOmnjp7_yYR_aE2sRW-x7XpAnqkmWaP66GOqf6w@mail.gmail.com%3E
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)