Spark Streaming:跨批次缓存 DStream 结果

2023-12-15

使用 Spark Streaming (1.6),我有一个文件流,用于读取批量大小为 2 秒的查找数据,但是文件仅每小时复制到目录中。
一旦有新文件,它的内容就会被流读取,这就是我想要缓存到内存中并保留在那里的内容 直到读取新文件。
我想加入这个数据集的另一个流,因此我想缓存。

这是一个后续问题Spark流式批量查找数据.
答案确实适用于updateStateByKey但是我不知道如何处理 KV 对的情况deleted从查找文件中,作为值的序列updateStateByKey不断增长。 还有任何关于如何做到这一点的提示mapWithState会很好。

这是我到目前为止所尝试的,但数据似乎没有被持久化:

val dictionaryStream = ssc.textFileStream("/my/dir")
dictionaryStream.foreachRDD{x => 
  if (!x.partitions.isEmpty) {
    x.unpersist(true)
    x.persist()
  }
}

DStreams可以直接使用持久化persist持久化流中每个 RDD 的方法:

dictionaryStream.persist

根据官方文档这自动适用于

基于窗口的操作,例如reduceByWindow and reduceByKeyAndWindow以及基于状态的操作,例如updateStateByKey

所以在你的情况下应该不需要显式缓存。也不需要手动取消持久化。去引用the docs再次:

默认情况下,所有输入数据和 DStream 转换生成的持久化 RDD 都会自动清除

并且保留期会根据管道中使用的转换自动调整。

关于mapWithState你必须提供一个StateSpec。一个最小的例子需要一个函数,它需要key, Option当前的value和之前的状态。假设你有DStream[(String, Long)]并且您想记录到目前为止的最大值:

val state = StateSpec.function(
  (key: String, current: Option[Double], state: State[Double]) => {
    val max  = Math.max(
      current.getOrElse(Double.MinValue),
      state.getOption.getOrElse(Double.MinValue)
    )
    state.update(max)
    (key, max)
  }
)

val inputStream: DStream[(String, Double)] = ??? 
inputStream.mapWithState(state).print()

还可以提供初始状态、超时间隔并捕获当前批处理时间。最后两个可用于对一段时间内未更新的密钥实施删除策略。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming:跨批次缓存 DStream 结果 的相关文章

随机推荐

  • 将 LocalDate 转换为 LocalDateTime 或 java.sql.Timestamp

    我正在使用 JodaTime 1 6 2 我有一个LocalDate我需要转换为 Joda LocalDateTime or a java sqlTimestamp用于或映射 这样做的原因是我已经弄清楚如何在LocalDateTime an
  • 比较 2 个 JSON 对象 [重复]

    这个问题在这里已经有答案了 可能的重复 JavaScript 中的对象比较 是否有任何方法可以接受 2 个 JSON 对象并比较这两个对象以查看是否有任何数据发生更改 Edit 审查完评论后 需要进行一些澄清 JSON 对象定义为 一组无序
  • Flask 在请求之前获取 url 变量?

    在 Flask 中 我有带有变量的 url 规则 例如 my blueprint add url rule
  • 为 iOS 7 编译 x264

    我在为 iOS 编译 x264 时遇到错误 我有 Xcode 版本 5 0 5A1413 和 Apple LLVM 版本 5 0 clang 500 2 75 基于 LLVM 3 3svn 我正在编译 x264 snapshot 20130
  • PHPhotoLibrary 保存 gif 数据

    我在新的 PHPhotoLibrary 中找不到与 ALAssetsLibrary gt writeImageDataToSavedPhotosAlbum 类似的方法 因为 ALAssetsLibrary 在 iOS 9 中已弃用 我无法保
  • 组合 git `continue` 命令

    我可能需要以不同的方式运行 git rebase continue git cherry pick continue git revert continue 在每种情况下 我的命令行都会提醒我 我正处于中间状态 rebase cp reve
  • 如何在audio_service flutter中传递和播放播放列表中特定队列位置的媒体项目?

    我正在使用颤动音频服务 and 只是音频音乐播放器包 我想在初始化音乐播放器时播放播放列表中特定队列位置的媒体项目 当我调用 AudioService start 方法时 它始终播放播放列表的第一项 当我启动音频服务时 如何传递并播放播放列
  • 读取文件的前 4 个字节

    我习惯了 C 但我试图制作一个将前 4 个字节读入数组的应用程序 但我没有成功 我还需要反转文件的 Endian 我不知道在 Java 中如何 在 C 中是这样Array Reverse bytes 我尝试将文件读入 Int32 但由于某种
  • Gradle sonarqube 无法识别常规测试

    我有一个多语言项目 使用 Java JUnit 和 Groovy Spock 实现测试 plugins id org sonarqube version 2 2 1 apply plugin idea apply plugin java a
  • Jquery - IE 未实现错误(在 IE 8 中)

    我遇到了 IE 未实现 JavaScript 错误 它似乎与我到目前为止在该网站上阅读的内容没有任何关系 我使用 jquery simplemodal 插件 它在除 IE 之外的所有浏览器中都可以正常工作 除非我将 doctype 更改为
  • 如何在卸载时应用 Msi 转换?

    我正在尝试修复已发布的基于 Windows Installer 的设置 该修复针对卸载软件包时发生的错误 为此 我想提供一个在卸载之前应用的 mst 转换文件 产品安装后是否可以使用转换 或者可以在卸载开始之前将转换应用于已安装的 msi
  • 如何在 OpenCV Java 中从 HoughLines 变换检测矩形

    我知道这是重复的帖子 但仍然在实施过程中陷入困境 我遵循互联网上的一些指南 了解如何使用 OpenCV 和 Java 检测图像中的文档 我想出的第一个方法是在预处理一些图像处理 如模糊 边缘检测 后使用 findContours 在获得所有
  • `print_r($mysqli,1)` 更改 `$mysqli->affected_rows`

    我正在使用用户断言函数 例如 debug assert gettype ob object Not an object pre print r ob 1 pre or exit 但我发现 print r 在调用 mysqli 时更改了 my
  • 如何从SSIS包向SSRS报告传递参数?

    我正在编写我的第一个 SSIS pkg 但我陷入了困境 任何见解将不胜感激 我正在运行一个 sql 代理作业来启动 SSRS 报告 该作业是通过预定订阅生成的 此报告依赖于 2 个存储过程 它们需要参数 何时 日期类型 并将报告的 PDF
  • 查询在 phpmyadmin 中有效,但在 PHP 脚本中无效

    我发现了类似的问题 但还不能解决我的问题 这是相关代码 query SELECT FROM conceptos WHERE descripcion descripcion if result mysql query query connec
  • 如何确定 .NET 程序集是否是使用目标平台 AnyCPU、AnyCPU Prefer32 位、x86、x64 构建的,而不使用反射和第三方软件

    我对如何通过正确读取 PE 标头直接从文件中读取程序集平台目标信息的方式感兴趣 我知道可以将程序集加载到新的中AppDomain by Assembly ReflectionOnlyLoad rawAssembly 并通过以下方式进行调查a
  • 向 MVC3 添加视图时出现异常

    我正在使用 Visual Studio 2012 Express for Web 当我尝试添加视图时 出现错误 The templates had the following 1 error s C Program Files Micros
  • JAXB 绑定 - “无法执行此转换自定义”

    我在架构中有自己的复杂类型 应该查看XML像这样的东西
  • 使用 f 字符串输出 LaTeX 符号

    请耐心等待 因为我不太明白 f 字符串的可能用途和不可能用途 取代码 pi 3 14159265 print f pi on 2 decimals is pi 2f 显然输出 pi on 2 decimals is 3 14 是否有可能得到
  • Spark Streaming:跨批次缓存 DStream 结果

    使用 Spark Streaming 1 6 我有一个文件流 用于读取批量大小为 2 秒的查找数据 但是文件仅每小时复制到目录中 一旦有新文件 它的内容就会被流读取 这就是我想要缓存到内存中并保留在那里的内容 直到读取新文件 我想加入这个数