使用 Spark Streaming (1.6),我有一个文件流,用于读取批量大小为 2 秒的查找数据,但是文件仅每小时复制到目录中。
一旦有新文件,它的内容就会被流读取,这就是我想要缓存到内存中并保留在那里的内容
直到读取新文件。
我想加入这个数据集的另一个流,因此我想缓存。
这是一个后续问题Spark流式批量查找数据.
答案确实适用于updateStateByKey
但是我不知道如何处理 KV 对的情况deleted从查找文件中,作为值的序列updateStateByKey
不断增长。
还有任何关于如何做到这一点的提示mapWithState
会很好。
这是我到目前为止所尝试的,但数据似乎没有被持久化:
val dictionaryStream = ssc.textFileStream("/my/dir")
dictionaryStream.foreachRDD{x =>
if (!x.partitions.isEmpty) {
x.unpersist(true)
x.persist()
}
}
DStreams
可以直接使用持久化persist
持久化流中每个 RDD 的方法:
dictionaryStream.persist
根据官方文档这自动适用于
基于窗口的操作,例如reduceByWindow
and reduceByKeyAndWindow
以及基于状态的操作,例如updateStateByKey
所以在你的情况下应该不需要显式缓存。也不需要手动取消持久化。去引用the docs再次:
默认情况下,所有输入数据和 DStream 转换生成的持久化 RDD 都会自动清除
并且保留期会根据管道中使用的转换自动调整。
关于mapWithState
你必须提供一个StateSpec
。一个最小的例子需要一个函数,它需要key
, Option
当前的value
和之前的状态。假设你有DStream[(String, Long)]
并且您想记录到目前为止的最大值:
val state = StateSpec.function(
(key: String, current: Option[Double], state: State[Double]) => {
val max = Math.max(
current.getOrElse(Double.MinValue),
state.getOption.getOrElse(Double.MinValue)
)
state.update(max)
(key, max)
}
)
val inputStream: DStream[(String, Double)] = ???
inputStream.mapWithState(state).print()
还可以提供初始状态、超时间隔并捕获当前批处理时间。最后两个可用于对一段时间内未更新的密钥实施删除策略。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)