我正在开发一个应该通过 kafka 写入 hdfs 的项目。
假设有一个在线服务器将消息写入kafka。每条消息中都包含时间戳。
我想创建一个作业,其输出将是根据消息中的时间戳的一个或多个文件。
例如如果kafka中的数据是
{"ts":"01-07-2013 15:25:35.994", "data": ...}
...
{"ts":"01-07-2013 16:25:35.994", "data": ...}
...
{"ts":"01-07-2013 17:25:35.994", "data": ...}
我想得到 3 个文件作为输出
kafka_file_2013-07-01_15.json
kafka_file_2013-07-01_16.json
kafka_file_2013-07-01_17.json
当然,如果我再次运行该作业并且队列中有新消息,例如
{"ts":"01-07-2013 17:25:35.994", "data": ...}
它应该创建一个文件
kafka_file_2013-07-01_17_2.json // second chunk of hour 17
我见过一些开源代码,但大多数都是从 kafka 读取到某些 hdfs 文件夹。
这个问题的最佳解决方案/设计/开源是什么
你绝对应该检查一下Camus API
来自 linkedIn 的实现。 Camus 是 LinkedIn 的 Kafka->HDFS 管道。它是一个 MapReduce 作业,可以从 Kafka 中加载分布式数据。看看这个post我写了一个简单的例子,它从 Twitter 流中获取并根据推文时间戳写入 HDFS。
项目可在 github 上获取 -https://github.com/linkedin/camus
Camus 需要两个主要组件来从 Kafka 读取和解码数据以及将数据写入 HDFS –
解码从 Kafka 读取的消息
Camus 有一组解码器,有助于解码来自 Kafka 的消息,解码器基本上扩展了com.linkedin.camus.coders.MessageDecoder
它实现了基于时间戳的数据分区逻辑。该目录中存在一组预定义的解码器,您可以根据这些解码器编写自己的解码器。camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/
将消息写入 HDFS
Camus 需要一组 RecordWriterProvider 类,它扩展了com.linkedin.camus.etl.RecordWriterProvider
这将告诉 Camus 应写入 HDFS 的有效负载是什么。此目录中存在一组预定义的 RecordWriterProvider,您可以根据这些编写自己的。
camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)