随着添加Headers http://apache.spinellicreations.com/kafka/0.11.0.0/javadoc/org/apache/kafka/common/header/Header.html到记录(生产者记录 https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html & 消费者记录 http://mirror.reverse.net/pub/apache/kafka/0.11.0.0/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)在 Kafka 0.11 中,使用 Kafka Streams 处理主题时是否可以获取这些标头?当调用类似方法时map
on a KStream
它提供了以下论据key
和value
的记录,但我无法看到访问headers
。如果我们能的话那就太好了map
超过ConsumerRecord
s.
ex.
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
像这样的东西会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
从 2.0.0 版本开始可以访问记录标题(参见KIP-244 https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API了解详情)。
您可以通过处理器 API 访问记录元数据(即通过transform()
, transformValues()
, or process()
),通过给定的“上下文”对象(参见https://docs.confluence.io/current/streams/developer-guide/processor-api.html#accessing-processor-context https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).
Update
从 2.7.0 版本开始,处理器 API 得到了改进(参见KIP-478 https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API),添加一个新的类型安全api.Processor
与 一起上课process(Record)
代替process(K, V)
方法。对于这种情况,标头(和记录元数据)可以通过Record
class).
这个新功能是尚不可用在“DSL 的 PAPI 方法”中(例如KStream#process()
, KStream#transform()
和兄弟姐妹)。
+++++
在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳,但不公开在旧版本中读取时实际上被 Streams 删除的标头。
但元数据在 DSL 级别不可用。然而,通过以下方式扩展 DSL 的工作也在进行中KIP-159 https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)