如何计算Spark结构化流中的滞后差?

2023-11-23

我正在编写 Spark 结构化流程序。我需要创建一个具有滞后差的附加列。

为了重现我的问题,我提供了代码片段。这段代码消耗data.json文件存储在data folder:

[
  {"id": 77,"type": "person","timestamp": 1532609003},
  {"id": 77,"type": "person","timestamp": 1532609005},
  {"id": 78,"type": "crane","timestamp": 1532609005}
]

Code:

from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("Test") \
    .master("local[2]") \
    .getOrCreate()

schema = StructType([
    StructField("id", IntegerType()),
    StructField("type", StringType()),
    StructField("timestamp", LongType())
])

ds = spark \
    .readStream \
    .format("json") \
    .schema(schema) \
    .load("data/")

diff_window = Window.partitionBy("id").orderBy("timestamp")
ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))

query = ds \
    .writeStream \
    .format('console') \
    .start()

query.awaitTermination()

我收到此错误:

pyspark.sql.utils.AnalysisException:u'非基于时间的窗口不是 支持流数据帧/数据集;;\nWindow [lag(timestamp#71L, 1, null) windowspecdefinition(host_id#68, timestamp#71L ASC 首先为空,前 1 行和 1 之间的行 前)AS prev_timestamp#129L]


pyspark.sql.utils.AnalysisException:u'流数据帧/数据集不支持基于时间的窗口

这意味着您的窗口应该基于timestamp柱子。所以如果你每秒都有一个数据点,然后你做一个30s窗口有一个stride of 10s,您生成的窗口将创建一个新的window列,与start and end包含时间戳差异的列30s.

您应该以这种方式使用该窗口:

words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))

w = F.window('date_time', '30 seconds', '10 seconds')
words = words \
   .withWatermark('date_format', '1 minutes') \
   .groupBy(w).agg(F.mean('value'))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何计算Spark结构化流中的滞后差? 的相关文章

随机推荐

  • $@ 和“$@”有什么区别吗? [复制]

    这个问题在这里已经有答案了 有什么区别吗 and 我知道非特殊字符可能存在差异 但是 用输入参数签名 Yes cat a sh echo echo 让我们运行一下 a sh 2 3 4 5 2 3 4 5 output for 2 3 4
  • 如何使用透明背景的画布获取 CSS 样式元素的 png 图像?

    我想使用 CSS 为网页上的元素设置样式 然后将该元素用作静态 png 是否可以在例如上绘制html节点 画布并将此类透明图像保存到文件中 我想找到一种方法 使用 CSS 获取现有的 HTML 并将其渲染为 PNG 文件并保持透明度 将 H
  • clang++ -stdlib=libc++ 导致未定义的引用

    为什么在将 clang 与 libc 一起使用时出现以下链接器错误 clang stdlib libc po cxx lpoppler tmp po QqlXGY o In function main po cxx text 0x33 un
  • 仅对 UITableView 中可用的 CellForRow 显示分隔符

    我正在将 UITableView 与自定义单元格一起使用 它工作正常 但问题是当 UITableView 中只有一两个单元格时 它还为空电池提供了分隔符 是否可以仅为使用自定义单元格加载的单元格显示分隔符 您需要添加一个空页脚视图来隐藏表中
  • 如何从多个 template_folder 加载 Flask 蓝图?

    我学习了如何创建 Flask 蓝图 并且可以为使用 Jinja2 模板的非 Flask 产品创建蓝图并在 Flask 项目中使用它们 我做了这样的事情 blueprint code from flask import Blueprint f
  • 如何在没有 jQuery 的情况下切换元素可见性?

    我正在为 eBay 编写一个拍卖模板 希望 eBay 能够允许 显然他们没有 因为 jquery 有 string replace 等东西 该代码非常基础 document ready function function changeIma
  • Pandas Dataframe选择多个不连续的列/切片

    我有超过 100 列的数据框 我正在尝试选择第 0 32 列和 83 列 看来 1 slice 与下面的代码配合得很好 df new df df columns 0 32 但它不适用于下面的 2 片代码 我该如何解决这个问题 df new
  • 更改 ComboBox 项目的格式

    是否可以在 C 中格式化 ComboBox 项 例如 如何将某个项目设为粗体 更改其文本的颜色等 尽管这篇文章很老 我发现它作为搜索的起点很有用 但最终使用所示的方法得到了更好的结果here由 保罗 这是我用来有条件地使组合框中的项目显示为
  • 如何为 httpclient getasync 方法创建模拟?

    我正在使用 Moq 为单元测试创 建模拟 但是当我必须为 httpclient 的 getasync 方法创建模拟时 我陷入了困境 以前我使用 SendAsync 方法 为此我可以使用以下代码 var mockResponse new Ht
  • 从word文档中提取标题文本

    我正在尝试提取text来自 MS Word 文档 docx 文件 中的 任何级别 标题 目前我正在尝试解决使用python docx 但不幸的是 读完后我仍然无法弄清楚它是否可行 也许我错了 我尝试在网上寻找解决方案 但没有发现任何适合我的
  • 我可以替换或修改 jQuery UI 小部件上的函数吗?如何? (猴子补丁)

    如果我想通过替换其中一个函数来调整 jQuery UI 对象的某些功能 我该怎么做呢 示例 假设我想修改 jQuery 自动完成小部件呈现建议的方式 自动完成对象上有一个方法 如下所示 renderItem function ul item
  • 如何使用 scikit-learn 评估预测的置信度得分

    我写下了一个简单的代码 它采用一个参数 query seq 进一步的方法计算描述符 最后可以使用 LogisticRegression 或该函数提供的任何其他算法 算法作为 0 给定情况为负 进行预测 或 1 给定情况为正 def main
  • 从 NSArray 中检索 NSDictionary,其中字典键的值为 X

    我有一个NSArray with NSDictionaries 数组之一中的字典键之一包含一个值 我想找回NSDictionary具有该值 我的阵列 Array DisplayName level InternalName Number 2
  • 如何在 podfile 中为 Xcode 项目指定多个目标?

    我在 Xcode 4 项目中使用 CocoaPods 我的项目有三个目标 默认目标 一个用于构建精简版本 一个用于构建演示版本 所有目标都使用相同的库 但 CocoaPods 仅将静态库和搜索路径添加到主要目标 我的 podfile 看起来
  • R 中的动态 selectInput 闪亮

    我有 3 个 selectInput 框和一组 4 个选项 可以通过这 3 个框进行选择 我希望 selectInputs 显示的选项在选择其他 selectInputs 时动态更改 不过 我希望所有三个框在任何时间点都可以使用 无 选项
  • Javascript 对象属性是否按顺序分配?

    假设我有一个对象 它根据函数的返回值分配属性 var i 0 var f function return i var foo a f b f c f 是否保证 foo a 为 1 foo b 为 2 foo c 为 3 我知道 JS 不保证
  • Python Paramiko(客户端)多重身份验证

    我正在尝试使用 Paramiko 在 Python 2 7 上 连接到使用多重身份验证 用户名 密码 一次性密码 的主机 这transport auth interactive函数似乎是执行此操作的方法 根据我从文档中理解的内容 但执行从未
  • 对 .net 混淆代码进行逆向工程有多容易?

    市场上有一些程序可以用来混淆您的 net 代码 我的问题是 如果您的代码被 所谓 混淆了 那么别人获取您的 IP 有多容易 混淆 net 代码仅仅是橡皮鸡安全吗 或者说它足以真正保护您的知识产权吗 混淆就像门锁 它让诚实的人保持诚实
  • 插入值语句只能包含 SQL 数据仓库中的常量文字值或变量引用

    考虑这个表 CREATE TABLE t i int j int 我想将一组数据插入到表中SELECT声明 我的查询的简化版本是 INSERT INTO t VALUES SELECT 1 SELECT 2 真正的查询可能要复杂得多 并且各
  • 如何计算Spark结构化流中的滞后差?

    我正在编写 Spark 结构化流程序 我需要创建一个具有滞后差的附加列 为了重现我的问题 我提供了代码片段 这段代码消耗data json文件存储在data folder id 77 type person timestamp 153260