Kafka Streams 在 HDFS 上查找数据

2024-04-16

我正在使用 Kafka Streams (v0.10.0.1) 编写一个应用程序,并希望通过查找数据来丰富我正在处理的记录。该数据(带时间戳的文件)每天(或每天 2-3 次)写入 HDFS 目录。

我怎样才能将其加载到Kafka Streams应用并结合实际KStream?
当新文件到达 HDFS 时重新读取数据的最佳实践是什么?

或者切换到更好Kafka Connect并将 RDBMS 表内容写入 Kafka 主题,该主题可供所有 Kafka Streams 应用程序实例使用?

Update:
按照建议卡夫卡连接将是要走的路。因为查找数据是在 RDBMS 上更新的daily我正在考虑按计划运行 Kafka Connect一次性工作 http://docs.confluent.io/3.0.0/connect/intro.html而不是保持连接始终打开。是的,因为语义和保持连接始终打开并确保它不会被中断的开销......等等。对我来说,在这种情况下进行预定的获取看起来更安全。

查找数据不大,记录可能删除/添加/修改。我也不知道如何始终将完整转储到 Kafka 主题并截断以前的记录。启用日志压缩并为已删除的键发送空值可能不起作用,因为我不知道源系统中删除了哪些内容。另外,据我所知,当压缩发生时我无法控制。


推荐的方法确实是将查找数据也摄取到 Kafka 中——例如通过 Kafka Connect——正如您在上面所建议的那样。

但在这种情况下,如何安排 Connect 作业每天运行,而不是连续从源表中获取,这在我的情况下是不必要的?

也许您可以更新您的问题,您不想让 Kafka Connect 作业连续运行?您是否担心资源消耗(数据库上的负载),您是否担心处理的语义(如果不是“每日更新”),或者......?

Update:正如所建议的那样,Kafka Connect 将是最佳选择。由于查找数据每天都会在 RDBMS 中更新,因此我正在考虑将 Kafka Connect 作为一项计划的一次性作业运行,而不是保持连接始终打开。是的,因为语义和保持连接始终打开并确保它不会被中断的开销......等等。对我来说,在这种情况下进行预定的获取看起来更安全。

卡夫卡连接is安全,并且 JDBC 连接器的构建正是为了以健壮、容错和高性能的方式将数据库表输入 Kafka(已经有许多生产部署)。所以我建议不要仅仅因为“它看起来更安全”而退回到“批量更新”模式;就我个人而言,我认为触发每日摄取在操作上不如仅仅保持其运行以进行连续(实时!)摄取更方便,而且它还会给您的实际用例带来一些缺点(请参阅下一段)。

但当然,您的里程可能会有所不同 - 因此,如果您决定每天更新一次,那就去做吧。但是,您会失去 a) 在丰富发生时使用最新的数据库数据丰富传入记录的能力,并且相反,b) 您实际上可能会使用陈旧/旧数据丰富传入记录,直到下一天更新已完成,这很可能会导致您向下游发送/可供其他应用程序使用的数据不正确。例如,如果客户更新了她的送货地址(在数据库中),但您每天只将此信息提供给流处理应用程序(以及可能的许多其他应用程序)一次,则订单处理应用程序会将包裹运送到错误的地方直到下一次每日摄取完成为止。

查找数据不大,记录可能会被删除/添加/修改。我也不知道如何始终将完整转储到 Kafka 主题并截断以前的记录。启用日志压缩并为已删除的键发送空值可能不起作用,因为我不知道源系统中删除了哪些内容。

Kafka Connect 的 JDBC 连接器已自动为您处理此问题:1. 它确保数据库插入/更新/删除正确反映在 Kafka 主题中,2. Kafka 的日志压缩确保目标主题不会超出范围。也许您可能想阅读文档中的 JDBC 连接器,以了解您可以免费获得哪些功能:http://docs.confluence.io/current/connect/connect-jdbc/docs/ http://docs.confluent.io/current/connect/connect-jdbc/docs/ ?

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Streams 在 HDFS 上查找数据 的相关文章

  • 我可以通过在 Android Activity 中声明适当的成员“静态”来提高效率吗

    如果一个 Activity 在实践中是单例 我认为我可以通过声明适当的成员 静态 来获得一些效率 且风险为零 是的 The Android 文档说 http developer android com guide topics fundam
  • WCF 服务主机配置 - 请尝试将 HTTP 端口更改为 8732

    我的 PC 上运行着一个复杂的基于 WCF 服务的解决方案 但由于安装 Windows 8 1 时出现问题 我不得不 刷新 我的 PC 现在我已经重新安装了 Visual Studio 2012 我的项目不再正常运行 当我调试单元测试时 w
  • Java中单例的其他方式[重复]

    这个问题在这里已经有答案了 只是我在考虑编写单例类的其他方法 那么这个类是否被认为是单例类呢 public class MyClass static Myclass myclass static myclass new MyClass pr
  • Spark scala 模拟 Spark.implicits 用于单元测试

    当尝试使用 Spark 和 Scala 简化单元测试时 我使用 scala test 和mockito scala 以及mockito Sugar 这只是让你做这样的事情 val sparkSessionMock mock SparkSes
  • Chrome 调试器注入 javascript

    我有这样的好奇心 是否可以以某种方式在我的页面中注入 javascript 并执行它并调试它 正如您在控制台中所做的那样 但在控制台中您无法暂停并观察变量 是否可以调试我通过控制台输入的代码 为什么无法调试通过 XHR 接收的代码 Than
  • 使用 Ruby aws-sdk 跟踪文件到 S3 的上传进度

    首先 我知道SO中有很多与此类似的问题 在过去的一周里 我读了大部分 如果不是全部 但我仍然无法让这项工作为我工作 我正在开发一个 Ruby on Rails 应用程序 允许用户将 mp3 文件上传到 Amazon S3 上传本身工作正常
  • Matplotlib loglog 的错误刻度/标签(双轴)

    我正在使用 matplotlib 创建对数图 如下图所示 默认刻度选择得很糟糕 充其量是这样 右边的 y 轴甚至根本没有 在线性等效中确实如此 而两个 x 轴都只有一个 有没有办法获得合理数量的带有标签的刻度 without为每个情节手动指
  • Jquery 以编程方式更改

    文本

    编辑 解决方案是将其添加到个人资料页面而不是性别页面 profile live pageinit function event p pTest text localStorage getItem gender 我在列表视图中有一个带有一些文
  • 将客户端库添加到 Razor 类库

    我正在学习 Blazor 我注意到创建 Razor 类库是一个很好的做法 您将在其中定义大部分组件 这样您就可以在客户端或服务器中使用它们 而不会出现太多问题 在不同的框架中 我习惯于以 SASS 形式包含库作为引导程序 这样我就可以在我的
  • 区分 NaN 输入和输入类型为“number”的空输入

    我想使用 type number 的表单输入 并且只允许输入数字
  • 在DialogFragment中,onCreate应该做什么?

    我目前正在摆弄 DialogFragment 以学习使用它 我假设相比onCreateView onCreate 可以这样做 public void onCreate Bundle savedInstanceState super onCr
  • 即使在急切加载之后,belongs_to 关联也会单独加载

    我有以下关联 class Picture lt ActiveRecord Base belongs to user end class User lt ActiveRecord Base has many pictures end 在我的
  • 进程被杀死后不会调用 onActivityResult

    我有一个主要活动 Main 和另一个活动 Sub 由 Main 调用 startActivityForResult new Intent this SubActivity class 25 当我在 Sub 时 我终止该进程 使用任务管理器或
  • 在成为FirstResponder或resignFirstResponder的情况下将对象保持在键盘顶部?

    我目前在键盘顶部有一个 UITextField 当您点击它时 它应该粘在键盘顶部并平滑地向上移动 我不知道键盘的具体时长和动画类型 所以确实很坎坷 这是我所拥有的 theTextView resignFirstResponder UIVie
  • Rails - 渲染:目标锚标记的操作?

    我希望像这样使用渲染 render action gt page form 我也尝试过这个 render template gt site page form 那也没用 这个特定页面上的表单位于最底部 如果提交时发生任何错误 我不希望用户被
  • java中void的作用是什么?

    返回类型 方法返回值的数据类型 如果方法不返回值 则返回 void http download oracle com javase tutorial java javaOO methods html http download oracle
  • 使用 paramiko 运行 Sudo 命令

    我正在尝试执行sudo使用 python paramiko 在远程计算机上运行命令 我尝试了这段代码 import paramiko ssh paramiko SSHClient ssh set missing host key polic
  • 对象指针值作为字典的键

    我想使用对象的引用值作为字典的键 而不是对象值的副本 因此 我本质上想在字典中存储与另一个对象的特定实例关联的对象 并稍后检索该值 这可能吗 是不是完全违背了NSDictionary的理念 我可以看出我可能以错误的方式处理这个问题 因为字典
  • HTML 表格 - 固定列宽和多个可变列宽

    我必须建立一个有 5 列的表 表格宽度是可变的 内容宽度的 50 有些列包含固定大小的按钮 因此这些列应该有一个固定大小 例如 100px 有些列中有文本 所以我希望这些列具有可变的列宽 例如 Column1 tablewidth sum
  • R data.table 1.9.2 关于 setkey 的问题

    这似乎是 1 8 10 后引入的一个错误 与包含列表的 DT 的 setkey 相关 运行下面两个代码来查看问题 library data table dtl lt list dtl 1 lt data table scenario 1 p

随机推荐

  • 使用 Laravel 5 (Lumen) 中的基本路径

    我在一个项目中使用 laravel 在我的本地计算机上 我必须访问的服务器只是 laraveltest dev 当我打开这个 URL 时 项目运行正常 没有任何问题 但是 当我将其上传到测试服务器上时 这些内容位于子文件夹中 如下所示 la
  • xdebug.so:未定义的符号:zend_ce_error

    我需要调试旧的 php 版本 PHP 5 6 22 cli built Jun 29 2016 14 26 09 Copyright c 1997 2016 The PHP Group Zend Engine v2 6 0 Copyrigh
  • 如何加载之前存储的svm分类器?

    我正在 Visual Studio 中使用 openCV SVM OpenCV 2 4 4 0 我训练它 mySVM train trainingDataMat labelsMat Mat Mat params 已保存 mySVM save
  • 用通用函数替换普通函数

    我想将 elt nth 和 mapcar 等名称与我正在原型设计的新数据结构一起使用 但这些名称指定普通函数 因此我认为需要将其重新定义为通用函数 重新定义这些名称可能是一种不好的形式 有没有办法告诉 defgeneric 不要生成程序错误
  • 如何在范围末尾自动调用 Pop-Location

    假设我有一个简单的范围 以 Push Location 和 Pop Location 结尾 Function MyFunction Location Push Location Location do other stuff here Po
  • 如何从JTable中获取图标

    我已经更改了单元格渲染JTable使用以下代码显示图像而不是文本 base table getColumnModel getColumn 3 setCellRenderer new TableCellRenderer Override pu
  • 正则表达式从文本文件中提取文本块?

    我需要使用正则表达式从 Python 文本文件中提取标题及其下方的文本块 但我发现这很困难 我转换了这个PDF https www docdroid net rduS8oC pdfsam doc pdf文本 现在看起来像这样 到目前为止 我
  • 递归地更改绘图类型(带线、带点)

    我正在尝试为基于 Julia 的 gnuplot 创建一个包装器来自动化我的绘图 我的目标是为 Julia 提供要绘制的文件名 要使用的线条样式类型以及要绘制的列 例如 如果我有文件test1 and test2 都有 3 列和标题 tim
  • 计算程序启动的次数

    如何在不保留文件和统计的情况下获取程序先前在 C 中运行的次数 c 中是否有应用程序类或其他内容来检查计数 请给出详细的解释 因为我对此一无所知 这是一个 Windows 控制台应用程序 而不是 Windows 窗体 您可以在以下位置创建一
  • 当 ObservableCollection 中的项目更新时更新 ItemsControl

    问题 你声明一个ItemsControl 或派生自的控件ItemsControl 在里面 看法 您绑定ItemsControl ItemsSource财产给ObservableCollection在你的视图模型中 当项目添加到 删除时 您的
  • 播放框架根据环境覆盖“application.conf”值

    玩 2 6 x Scala 我有一个默认的application conf文件夹内 project conf 但我想根据环境通过传递相应的文件作为命令行参数来覆盖一些值 如文档中详述 https www playframework com
  • 窗口调整大小处理事件

    我的应用程序的某些元素具有自定义调整大小事件 这些事件都有效 然而 他们却被一件事搞砸了 当将鼠标悬停在窗口边框上时 光标将成为调整大小手柄 然后单击 但不要拖动 元素的大小调整不正确 并且我的处理程序不会被触发 我已经寻找过这样的事件 但
  • 如何在下面给出的数字数组中找到锯齿状数组中的最大数字?

    如何在下面给出的数字数组中找到锯齿状数组中的最大数字 const array 2 4 10 12 4 100 99 4 3 2 99 0 如果您知道嵌套的最大深度 那么您可以flat数组并找到最大值 Math max array flat
  • JQ:如何将被识别为字符串的值相乘?

    我正在尝试从交换网络套接字获取一些贸易信息 在我从套接字获取的 JSON 中 值 p 和 q 都用双引号括起来 当我尝试将两个值相乘时 它表示我正在尝试将两个字符串相乘 因此 我通过 tonumber 过滤器传递这些字符串 并且错误消息发生
  • C# 中的内部设置属性是什么?

    我刚刚遇到了一个未知的 C 概念 谁能告诉我内部设置属性的目的是什么 它有什么用 我知道内部关键字用于在程序集中工作 如果您有一个带有内部 set 访问器 和公共 get 访问器 的属性 则意味着程序集中的代码可以读取 获取 和写入 设置
  • VS Code 突出显示了我所有的 WordPress 函数名称

    我正在使用 PHP Intelephense 版本 1 3 7 这是最新版本 我的 VS Code 是最新的 之前没有问题 但是几天前 它一直高亮我所有的wordpress函数名称 我尝试降级我的 PHP Intelephense 但情况仍
  • 对 div 进行动画处理并从中心展开

    我有一个简单的代码 可以从中心水平和垂直扩展 div 但它只扩展到左侧或底部 我希望它从中心扩展到两侧 左 50px 右 50px 或 顶部 50px 底部 50px 总计等于100px 这里是代码
  • 如何以 UTF-8 打开文件并以 UTF-16 写入另一个文件

    如何打开 UTF 8 格式的文件并写入 UTF 16 格式的另一个文件 我需要一个例子 因为我对 和 a 等某些字符有疑问 当写 m dic 时 我发现文件中写着 m dic 您可以按如下方式创建阅读器 InputStream is new
  • Android - ViewRootImpl$CalledFromWrongThreadException

    我正在使用this http savagelook com blog android display images from the internet in android 显示来自互联网的图像 但它会抛出如下错误 04 12 13 45
  • Kafka Streams 在 HDFS 上查找数据

    我正在使用 Kafka Streams v0 10 0 1 编写一个应用程序 并希望通过查找数据来丰富我正在处理的记录 该数据 带时间戳的文件 每天 或每天 2 3 次 写入 HDFS 目录 我怎样才能将其加载到Kafka Streams应