将文件分区为日期从 kafka 写入 hdfs 的最有效方法是什么

2023-11-22

我正在开发一个应该通过 kafka 写入 hdfs 的项目。 假设有一个在线服务器将消息写入kafka。每条消息中都包含时间戳。 我想创建一个作业,其输出将是根据消息中的时间戳的一个或多个文件。 例如如果kafka中的数据是

 {"ts":"01-07-2013 15:25:35.994", "data": ...}
 ...    
 {"ts":"01-07-2013 16:25:35.994", "data": ...}
 ... 
 {"ts":"01-07-2013 17:25:35.994", "data": ...}

我想得到 3 个文件作为输出

  kafka_file_2013-07-01_15.json
  kafka_file_2013-07-01_16.json
  kafka_file_2013-07-01_17.json 

当然,如果我再次运行该作业并且队列中有新消息,例如

 {"ts":"01-07-2013 17:25:35.994", "data": ...}

它应该创建一个文件

  kafka_file_2013-07-01_17_2.json // second  chunk of hour 17

我见过一些开源代码,但大多数都是从 kafka 读取到某些 hdfs 文件夹。 这个问题的最佳解决方案/设计/开源是什么


你绝对应该检查一下Camus API来自 linkedIn 的实现。 Camus 是 LinkedIn 的 Kafka->HDFS 管道。它是一个 MapReduce 作业,可以从 Kafka 中加载分布式数据。看看这个post我写了一个简单的例子,它从 Twitter 流中获取并根据推文时间戳写入 HDFS。

项目可在 github 上获取 -https://github.com/linkedin/camus

Camus 需要两个主要组件来从 Kafka 读取和解码数据以及将数据写入 HDFS –

解码从 Kafka 读取的消息

Camus 有一组解码器,有助于解码来自 Kafka 的消息,解码器基本上扩展了com.linkedin.camus.coders.MessageDecoder它实现了基于时间戳的数据分区逻辑。该目录中存在一组预定义的解码器,您可以根据这些解码器编写自己的解码器。camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/

将消息写入 HDFS

Camus 需要一组 RecordWriterProvider 类,它扩展了com.linkedin.camus.etl.RecordWriterProvider这将告诉 Camus 应写入 HDFS 的有效负载是什么。此目录中存在一组预定义的 RecordWriterProvider,您可以根据这些编写自己的。

camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将文件分区为日期从 kafka 写入 hdfs 的最有效方法是什么 的相关文章

随机推荐

  • 需要带有背景图像的空 Div 来强制高度并且必须具有响应能力

    我需要以下内容 空 div 没有内容 背景图片设置到div中 背景图像在调整大小时流畅 响应灵敏 我无法设置固定 div 上的尺寸 我尝试的所有方法都无法强制打开 div 以支持背景图像的大小 任何帮助是极大的赞赏 http www eve
  • 如何在没有插件的情况下搜索选择标签html的选项

    我用 html 制作了选择标签 其中包含所有国家 地区的名称 我想使用搜索栏搜索它们的值 而不需要任何插件或附加组件 这可能吗 Answer 是的 您可以首先在此处查看它的实际效果demo 如果您喜欢所看到的内容 请执行以下操作 HTML
  • 充分使用“if”语句或“try/catch”块?

    给我一些你的想法 关于哪种是更好的编码实践 使代码更高效 看起来更漂亮 无论如何 增加和提高你使用 if 语句来预测和捕获潜在问题的能力 或者只是充分利用 try catch 假设这是针对 Java 的 如果重要的话 Edit 我目前正在摆
  • 自动为所有 SQL Server 作业创建脚本

    目前 我正在尝试为 MS SQL2005 服务器的所有 SQL 作业自动生成创建脚本 我发现的一种方法已经完成 手动http msdn microsoft com en us library ms191450 aspx 我发现第二种方法可以
  • 始终保留n个最佳元素的数据结构

    我需要一个始终保存n迄今为止插入的最大项目 排名不分先后 So if n是 3 我们可以进行以下会话 其中我插入一些数字并且容器的内容发生变化 now insert 1 1 now insert 0 1 0 now insert 4 1 0
  • 寻找 A* 算法的启发式有哪些好方法?

    您有一张方形图块地图 您可以在其中向 8 个方向中的任意方向移动 鉴于您有名为的函数cost tile1 tile2 它告诉您从一个相邻图块移动到另一个图块的成本 您如何找到既可接受又一致的启发式函数 h y goal 给定此设置 寻找启发
  • python 中的端口转发以允许套接字连接

    我使用套接字启动服务器并希望允许客户端连接到它 self sock bind 0 0 0 0 0 0 0 0 0 will allow all connections and port 0 gt os chooses a open port
  • 如何在不传递变量的情况下获取当前异常?

    我正在寻找一种方法来检索当前异常 而不必将其作为变量传递 假设下面的代码 public void MakeItFail try throw new FailException catch Yes I m aware that this sh
  • ng-grid 自动调整列宽

    我正在使用 AngularJS ng grid 并尝试制作它 1 根据列内容自动调整列宽 2 当显示的列较少时 使最后一列宽度自动调整大小以填充空白区域 例如 我有 8 列 每列宽度 100 整个 ng grid 宽度为 800 然后如果我
  • 查找消息的作者

    如果有人写 name arg 我希望我的机器人说消息的作者 你的名字是 arg 我找不到该消息的作者 though client command async def name their name await client say 0 yo
  • Elastic Search 上可以创建的索引数量有限制吗?

    我正在使用 AWS 提供的 Elastic Search 我的网站上以及每次注册时都有一个注册页面 为新用户创建一个新索引 稍后由他的工作组使用 这意味着索引的数量不断增长 现在达到大约 4 5k 我的问题是 索引数量有性能限制吗 为每个新
  • 模板c++的模板?

    我已经成功地创建了一些 preperty 类 其中包含我们期望的所有内容 我的意思是 使用它时 您不需要仅使用来调用函数operator 将完成所有工作 但我想只有一件事如果我们能解决就好了 template
  • 将 javascript 添加到 ASP.NET 文本框控件的 OnBlur 属性

    有没有办法指定一些 JavaScript 在 ASP NET 文本框的 OnBlur 事件上执行 在我看来 如果我向 TextBox 对象添加任何事件处理程序 它们只会导致回发到服务器 而不是执行我想要的操作 基本上 我只想能够在 HTML
  • 如何使用 Jquery 在午夜使 Cookie 过期?

    我这样做了 cookie ultOS i expires 1 但它只会在第二天到期 如何让 cookie 在午夜过期 这会起作用吗 var date new Date var midnight new Date date getFullYe
  • 如何使用 \x1b[2j 清除屏幕?

    我们如何实施clrscr 谷歌搜索我发现 x1b 2j可以用来清屏 但是我们如何使用它呢 标准 C 库不提供清除屏幕的方法 为此 您需要一个依赖于操作系统的库 在 DOS 和 Windows 下 对于在 DOS 或 Windows 控制台中
  • 分发时隐藏swift框架中的源代码

    我创建了一个快速框架来分发给私人客户 这是我第一次创建 iOS 框架 所以我对很多事情都一无所知 我的源代码是否有可能被隐藏 我已经到处搜索 但找不到正确的答案 我不确定我是否解释得很彻底 但为了简化起见 我不希望开发人员能够修改或查看我的
  • ChartJS:数据标签:显示饼图中的百分比值

    我有一个带有四个标签的饼图 var data data 50 55 60 33 labels India China US Canada backgroundColor 4b77a9 5f255f d21243 B27200 borderC
  • CoffeeScript:使用instanceof与Class.constructor.name

    如果我有课 class Haha constructor lolAmount 1 gt alert lolAmount 我想检查一个对象是否属于正确的类 使用它是否总是安全的constructor name haha new Haha un
  • 如何在 ggplot2 中将地图与复杂的点显示结合起来?

    我正在尝试用非洲背景地图绘制研究地点的点 我可以独立创建这两个 但我很难将它们叠加在一起 我使用的非洲地图是来自 maplibrary org 的 Esri shapefile 它可以从我的保管箱中获得 https www dropbox
  • 将文件分区为日期从 kafka 写入 hdfs 的最有效方法是什么

    我正在开发一个应该通过 kafka 写入 hdfs 的项目 假设有一个在线服务器将消息写入kafka 每条消息中都包含时间戳 我想创建一个作业 其输出将是根据消息中的时间戳的一个或多个文件 例如如果kafka中的数据是 ts 01 07 2