PySpark 流式处理:窗口和转换

2024-01-22

我正在尝试从 Spark 流数据源读取数据,按事件时间对其进行窗口化,然后对窗口化数据运行自定义 Python 函数(它使用非标准 Python 库)。

我的数据框看起来像这样:

| Time                    | Value |
| 2018-01-01 12:23:50.200 | 1234  |
| 2018-01-01 12:23:51.200 |   33  |
| 2018-01-01 12:23:53.200 |  998  |
|           ...           |  ...  |

窗口似乎与 Spark SQL 配合得很好,使用如下内容:

windowed_df = df.groupBy(window("Time", "10 seconds"))

...,并且有一个部分是关于Spark 结构化流处理文档中按事件时间进行窗口化 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time所以我认为这应该适用于 Spark 结构化流。

到目前为止,一切都很好。

另外,我已经能够使用 Spark Streaming (DStream) 来应用我的自定义转换操作,该操作当前在传入流上运行(基本上,它假设数据以正确的窗口块形式出现,这是我试图摆脱的假设的)。代码看起来像这样:

def my_analysis(input_rdd):
    # convert RDD to native types (would also be possible from a DataFrame)
    # run through various Python libs
    # construct new RDD with results - 1 row, multiple values (could construct new DataFrame here instead)

my_dstream\
    .map(deserialize_from_string)\
    .transform(my_analysis)\
    .map(serialize_to_string)\
    .foreachRDD(write_to_sink)

我现在基本上想将两者结合起来,所以做类似的事情:

df\
    .groupBy(window("Time", "10 seconds"))\
    .transform(my_analysis)\  # how do I do this with pyspark.sql.group.GroupedData?
    .writeStream  # ...

# OR:

my_dstream\
    .map(deserialize_from_string)\
    .window_by_event_time("10 seconds")\  # how do I do this with a DStream?
    .transform(my_analysis)\
    .map(serialize_to_string)\
    .foreachRDD(write_to_sink)

知道我如何才能实现上述目标吗?

我尝试过的事情:

  • 我可以在 windowed_df 上运行的功能似乎非常有限,基本上 IPython 建议我只能在这里进行聚合(min/max/avg/agg with pyspark.sql.函数 http://spark.apache.org/docs/2.2.1/api/python/pyspark.sql.html#module-pyspark.sql.functions). agg似乎最有用,但迄今为止我在该领域发现的最好的方法是使用collect_list,像这样:
    windowed_df.agg(collect_list("Value")).sort("window").show(20, False)

...但这意味着我失去了时间戳。

  • PySpark 不支持自定义聚合函数 (UDAF)(SPARK-10915 https://issues.apache.org/jira/browse/SPARK-10915)

我看过的其他事情:

  • Apache Spark 结构化流中的任意状态处理 https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html- mapGroupWithState 听起来好像它可以做我想要的(甚至更多),但在 PySpark 中尚不可用。
  • Spark:如何将 Python 与 Scala 或 Java 用户定义函数映射? https://stackoverflow.com/q/33233737/1298153- 在这种情况下,用 Scala/Java 编写 UADF 不是一个选择(我需要使用特定的 Python 库)
  • 如何在 PySpark 2.1.0 中的事件时间窗口上定义 UDAF https://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0- 类似,但没有答案
  • 引入 PySpark 的矢量化 UDF https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html- 这可能有效,并且使用“分组”UDF 的“普通最小二乘线性回归”示例看起来很有希望。但是,它需要 Spark 2.3.0(我可以编译它),并且吉拉门票 https://issues.apache.org/jira/browse/SPARK-21190说 UADF 显然是一个非目标(我不清楚 UDAF 和 GUDF(?)s 究竟有何不同)

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark 流式处理:窗口和转换 的相关文章

随机推荐

  • 如何在 ASP.Net Core Razor 页面中返回带有模型的页面

    如何重定向到页面并传递其模型 就像我们在 MVC 中所做的那样return View model MyModel 我尝试过的 return RedirectToPage Notify new Model notifierVM 注意 我要返回
  • 如何立即关闭 C 程序?

    我正在编写 C 代码 在其中分析一些数据 我已将程序设置为仅处理 100 个数据输入 当输入超过 100 个时 就会出现分段错误 我想创建一种方法 以便当输入数量超过 100 时 用户将收到警告并且程序将终止 我知道如何从主要功能中简单地做
  • MyGroups 未在 Communicator.UIAutomation 中实现

    我正在开发一个浏览器外的 Silverlight 应用程序 它提供了一些 MS Office Communicator 2007 控件 我正在使用 与 SDK 一起安装的文档指出 IMessenger2 界面中有一个 MyGroups 属性
  • 测试 swift 异步函数超时

    如何编写一个单元测试来检查异步函数是否不会超时 我正在尝试常规XCTestExpectation 但是因为await暂停一切 是等不到期待 在下面的代码中 我正在检查loader perform 执行时间不超过1秒 func testLoa
  • 在 KnockoutJS 中获取可观察的多维数组(对象)

    我正在使用 Knockout 构建一个应用程序 发现它非常有用 虽然 我在获取多维数组 对象 可观察时遇到问题 目前我正在使用以下结构 self form ko observableArray ko utils arrayMap initi
  • /bin/sh: python: 找不到命令

    我刚刚安装了 Python3 和 Komodo 我正在尝试运行一个简单的脚本 但收到 py 命令未找到的错误 我对 Komodo 和 Python 都是新手 所以不知道该去哪里寻找 我看到另一篇文章也有同样的问题 但没有提供有帮助的解决方案
  • 虚拟继承的内部机制

    C 示例代码 class A public A int class B public virtual A public B int b A b class C virtual public A public C int c A c clas
  • 在 jQuery Mobile 中的 Ajax 调用上显示页面加载微调器

    我正在使用 ajax 填充我的移动网络应用程序中的列表 我想做的是让 jQuery 移动加载微调器在执行此调用时出现 并在列表填充后消失 当前版本的 JQM 使用 mobile showPageLoadingMsg and mobile h
  • 如何使操作栏图标在单击时发生变化

    我在 Sherlock ActionBar 中有一组紫色背景的白色图标 我想让它们在被点击时变成深紫色 我有相同的深紫色图标 所以我想让这些可绘制对象显示在按下状态 现在 我知道如何在整个应用程序主题中执行此操作 但这意味着我必须对所有图标
  • 阻止本地网站在 Chrome 上强制使用 HTTPS?

    Chrome 已更新为在某些保留的域名上强制使用 HTTPS 不幸的是我的本地计算机之一出现在列表中 我的机器名称是 dev Chrome 现在自动重定向 http dev http dev 到 https dev https dev 我在
  • Java获取本地IP [重复]

    这个问题在这里已经有答案了 我正在尝试获取本地IP 它应该与 System out println Inet4Address getLocalHost getHostAddress or InetAddress addr InetAddre
  • jquery map函数对表的使用

    我有这张表 table thead tr th UtstyrsID th th Navn th th Utlevert th th Kommentar th tr thead tbody tr td 1 td tr tbody table
  • Java - 帮助在任意锚点绘制文本的最佳库[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我熟悉如何使用各种 FontMetrics 函数使文本垂直 水平居中等 但是 我正在寻找一个支持在相对
  • 使用组合键中的一列作为外键

    我试图看看是否可以使用复合键中的一列作为外键 我得到了奇怪的结果 CREATE TABLE TESTPARENT PK1 INT PK2 INT PRIMARY KEY PK1 PK2 Query OK 0 rows affected 0
  • C# 属性名称缩写

    C 属性的名称中怎么可能有 Attribute 例如DataMemberAttribute 但初始化时没有这个后缀 例如 DataMember private int i 根据C 语言规范 http msdn microsoft com e
  • 使用 Akka 进行依赖注入

    我在我的应用程序中经常使用 Guice 最近我开始学习 akka actor 并想用它重构我的应用程序 然而 一开始我就想知道我的所有技巧将如何与演员合作 我继续在谷歌上搜索 结果有点混乱 我找到的关于该主题的最新文档是 http leti
  • python 中的文本语言检测

    我正在尝试检测可能由未知数量的语言组成的文本的语言 下面的代码给了我不同的语言作为答案注意 我减少了评论 因为它在发布 不允许时给出错误 print detect print detect 的马来西亚 print detect Vi hav
  • 屏幕旋转后不会调用 onSaveInstanceState

    我知道有很多关于 onSaveInstanceState 的问题 但我无法找到问题的答案 我有一个扩展 AppCompatActivity 的活动 此活动使用 3 个片段 它有一个变量 int currentStep 来跟踪正在显示的片段
  • 从网页中打开查找器/资源管理器中的文件夹?

    如果我有文件系统路径 我可以在资源管理器 在 Windows 上 或 Finder 在 OS X 上 中打开一个窗口 显示该路径指向的文件夹吗 跨平台和 或无插件答案的 Cookie 点 For 节点 webkit http docs nw
  • PySpark 流式处理:窗口和转换

    我正在尝试从 Spark 流数据源读取数据 按事件时间对其进行窗口化 然后对窗口化数据运行自定义 Python 函数 它使用非标准 Python 库 我的数据框看起来像这样 Time Value 2018 01 01 12 23 50 20