我们如何从 KafkaStream 获取主题名称和分区 id。对于任何其他 Kafka 消费者,我们可以获得主题名称和分区 ID,如下所示:
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {System.out.printf("consumed: key = %s, value = %s, partition id= %s, offset = %s%n",record.key(), record.value(), record.partition(), record.offset());}
不知道如何获取 KafkaStreams 中的记录引用。
您可以通过以下方式获取输入记录的元数据ProcessorContext
在处理器 API 中公开。您可以通过以下方式将处理器 API 嵌入到 DSL 中transform()
和类似的方法。
查看文档了解详细信息:https://docs.confluence.io/current/streams/developer-guide/processor-api.html#accessing-processor-context https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)