Spark Streaming中如何处理旧数据并删除处理后的数据

2024-01-07

我们正在运行一个 Spark 流作业,从目录中检索文件(使用 textFileStream)。 我们担心的一个问题是作业已停止但文件仍在添加到目录中的情况。 一旦作业再次启动,这些文件就不会被拾取(因为它们在作业运行时不是新的或已更改),但我们希望处理它们。

1)有解决办法吗?有没有办法跟踪已处理的文件以及我们可以“强制”拾取旧文件吗?

2)有没有办法删除处理过的文件?


下面的文章几乎涵盖了您的所有问题。

https://blog.yanchen.ca/2016/06/28/fileinputdstream-in-spark-streaming/ https://blog.yanchen.ca/2016/06/28/fileinputdstream-in-spark-streaming/

1)有解决办法吗?有没有办法跟踪已处理的文件以及我们可以“强制”拾取旧文件吗?

启动作业/应用程序时,流读取器使用系统时钟启动批处理窗口。显然之前创建的所有文件都会被忽略。尝试启用检查点.

2)有没有办法删除处理过的文件?

删除文件可能是不必要的。如果检查点起作用,Spark 会识别未处理的文件。如果由于某种原因要删除文件,请实现自定义输入格式和阅读器(请参阅文章)来捕获文件名并酌情使用此信息。但我不推荐这种方法。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming中如何处理旧数据并删除处理后的数据 的相关文章

随机推荐

  • 对 Microsoft Graph API 执行 POST 请求以将成员添加到 AD 组

    我正在尝试通过 Azure 函数将成员添加到调用 Microsoft Graph API 的 AD 组 通过 Graph API 执行 GET 请求非常简单直接 但我找不到任何示例如何执行 Graph API 的 post 请求 我确实有一
  • 数组和可观察数组有什么区别?

    在 TypeScript 中 主要区别是什么any and Observable
  • Scrapy - 设置 TCP 连接超时

    我正在尝试通过 Scrapy 抓取网站 然而 该网站有时非常慢 浏览器第一次请求时几乎需要 15 20 秒才能响应 不管怎样 有时 当我尝试使用 Scrapy 抓取网站时 我不断收到 TCP 超时错误 即使该网站在我的浏览器上打开得很好 这
  • 测试期间的 EF Core 内部缓存和许多 DbContext 类型

    我有很多个测试班 每个班有几十个测试 我想隔离测试 而不是大型上下文MyDbContext I use MyDbContextToTestFoo MyDbContextToTestBar MyDbContextToTestBaz等等 所以我
  • SSLContext 和 SSLSocketFactory createSocket 线程安全吗?

    在我的测试中 我能够毫无问题地使用两者 但我找不到说明 SSLSocketFactory createSocket 是否线程安全的文档 可以在多个线程中使用同一个 SSLSocketFactory 来创建 SSL 套接字吗 我的应用程序使用
  • 根据当前视图处理 ViewExpiredException

    我在我的项目中使用 JSF 2 0 和 Primefaces 我有两个 xhtml 页面 即 Cars xhtml 和 Bikes xhtml 我正在使用 ViewScoped 支持 bean 目前 如果从两个页面中的任何一个获取视图过期异
  • AudioQueueBuffers 之间的爆裂噪音

    我正在尝试使用 Core Audio AudioQueue Swift 3 播放纯正弦波音调 它播放得很好 但每次调用 AudioQueueOutputCallback 用音频数据填充新缓冲区时 我都会听到爆裂声 我的 AudioStrea
  • Eclipse 的“Google Maps API v3 for GWT”项目示例

    Google 在此发布了 GWT 的官方地图 v3 APIhttps groups google com forum topic gwt google apis 6SO5kCDqb k https groups google com for
  • 识别最近的网格点

    我有三个数组 lat 15 15 25 15 75 16 30 long 91 91 25 91 75 92 102 data array 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 99 9 99 9 99 9
  • 我的 pdf 文件是否采用 UTF-8 编码?

    我想知道 pdf 文件是否以 UTF 8 编码 如何检查pdf文件中使用了哪种字符编码 PDF 是二进制文件 而不是文本文件 像 UTF 8 这样的字符编码仅在文本文件 txt html xml csv 的上下文中才有意义 因此 PDF 绝
  • 为什么 MongoDB 配置服务器必须只有一个或三个?

    在阅读了 MongoDB 分片架构的官方文档后 我还没有找到为什么需要一到三个配置服务器 而不是其他数量 The 有关配置服务器的 MongoDB 文档 https docs mongodb org v3 0 core sharded cl
  • 栈帧和作用域之间有什么关系?

    最近我正在学习Python 中的范围界定 我了解什么是堆栈框架 但我对堆栈框架和作用域之间的关系和区别感到困惑 我通过 Python计算与编程简介 这本书来学习Python 它没有具体阐明这两个术语 范围只是 LEGB 之一 本地 封闭 全
  • PowerMockito 在尝试存根私有重载方法时抛出 NullPointerException

    我 仍在 尝试检查是否bar Alpha Baz called bar Xray Baz 使用 PowerMockito 如bar Xray Baz is private 考虑到我的 MCVE 课程 实际上没有调用后者Foo以下 我上过同一
  • ARM 汇编器中的寄存器操作数是如何编码的?

    我反编译了一些ARM ELF文件并阅读了汇编代码 但是 我不明白一些代码是如何翻译成助记符的 例如我得到这样的代码 hex code mnemonic binary 0xb480 push r7 1011 0100 1000 0000 0x
  • 有没有办法正确模拟重新选择选择器以进行单元测试?

    我的项目中有一个非常复杂的选择器结构 某些选择器可能最多有 5 层嵌套 因此其中一些很难通过传递输入状态进行测试 我想改为模拟输入选择器 然而我发现这实际上是不可能的 这是最简单的例子 selectors1 js export const
  • PHP 的 create_function() 与仅使用 eval()

    在 PHP 中 您有 create function 函数 它创建一个唯一的命名 lambda 函数 如下所示 myFunction create function foo return foo myFunction bar Returns
  • Dapper 批量插入返回序列 ID

    我正在尝试使用 Dapper 通过 Npgsql 执行批量插入 这会返回新插入行的 id 我的两个示例中都使用了以下插入语句 var query INSERT INTO MyTable Value VALUES Value RETURNIN
  • Java 数组索引越界异常

    当我需要将 5 个用户输入的值存储到一个数组中 将其发送到一个方法 并查找并显示最低值时 我一直在研究这个基本的 java 程序 该程序很简单 并且可以运行 但是当我输入最后一个数字时 出现错误 线程 main 中的异常 java lang
  • 调用 setCenter 后 OpenLayers,地图仍处于 0,0 位置

    我尝试通过 setCenter 方法设置地图中心 但仍然不起作用 地图不动 我尝试使用从投影到地图投影的变换 但没有成功 这是代码的一部分 谢谢
  • Spark Streaming中如何处理旧数据并删除处理后的数据

    我们正在运行一个 Spark 流作业 从目录中检索文件 使用 textFileStream 我们担心的一个问题是作业已停止但文件仍在添加到目录中的情况 一旦作业再次启动 这些文件就不会被拾取 因为它们在作业运行时不是新的或已更改 但我们希望