我正在尝试从 Spark 流数据源读取数据,按事件时间对其进行窗口化,然后对窗口化数据运行自定义 Python 函数(它使用非标准 Python 库)。
我的数据框看起来像这样:
| Time | Value |
| 2018-01-01 12:23:50.200 | 1234 |
| 2018-01-01 12:23:51.200 | 33 |
| 2018-01-01 12:23:53.200 | 998 |
| ... | ... |
窗口似乎与 Spark SQL 配合得很好,使用如下内容:
windowed_df = df.groupBy(window("Time", "10 seconds"))
...,并且有一个部分是关于Spark 结构化流处理文档中按事件时间进行窗口化 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time所以我认为这应该适用于 Spark 结构化流。
到目前为止,一切都很好。
另外,我已经能够使用 Spark Streaming (DStream) 来应用我的自定义转换操作,该操作当前在传入流上运行(基本上,它假设数据以正确的窗口块形式出现,这是我试图摆脱的假设的)。代码看起来像这样:
def my_analysis(input_rdd):
# convert RDD to native types (would also be possible from a DataFrame)
# run through various Python libs
# construct new RDD with results - 1 row, multiple values (could construct new DataFrame here instead)
my_dstream\
.map(deserialize_from_string)\
.transform(my_analysis)\
.map(serialize_to_string)\
.foreachRDD(write_to_sink)
我现在基本上想将两者结合起来,所以做类似的事情:
df\
.groupBy(window("Time", "10 seconds"))\
.transform(my_analysis)\ # how do I do this with pyspark.sql.group.GroupedData?
.writeStream # ...
# OR:
my_dstream\
.map(deserialize_from_string)\
.window_by_event_time("10 seconds")\ # how do I do this with a DStream?
.transform(my_analysis)\
.map(serialize_to_string)\
.foreachRDD(write_to_sink)
知道我如何才能实现上述目标吗?
我尝试过的事情:
- 我可以在 windowed_df 上运行的功能似乎非常有限,基本上 IPython 建议我只能在这里进行聚合(
min
/max
/avg
/agg
with pyspark.sql.函数 http://spark.apache.org/docs/2.2.1/api/python/pyspark.sql.html#module-pyspark.sql.functions). agg
似乎最有用,但迄今为止我在该领域发现的最好的方法是使用collect_list
,像这样:
windowed_df.agg(collect_list("Value")).sort("window").show(20, False)
...但这意味着我失去了时间戳。
- PySpark 不支持自定义聚合函数 (UDAF)(SPARK-10915 https://issues.apache.org/jira/browse/SPARK-10915)
我看过的其他事情:
-
Apache Spark 结构化流中的任意状态处理 https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html- mapGroupWithState 听起来好像它可以做我想要的(甚至更多),但在 PySpark 中尚不可用。
-
Spark:如何将 Python 与 Scala 或 Java 用户定义函数映射? https://stackoverflow.com/q/33233737/1298153- 在这种情况下,用 Scala/Java 编写 UADF 不是一个选择(我需要使用特定的 Python 库)
-
如何在 PySpark 2.1.0 中的事件时间窗口上定义 UDAF https://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0- 类似,但没有答案
-
引入 PySpark 的矢量化 UDF https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html- 这可能有效,并且使用“分组”UDF 的“普通最小二乘线性回归”示例看起来很有希望。但是,它需要 Spark 2.3.0(我可以编译它),并且吉拉门票 https://issues.apache.org/jira/browse/SPARK-21190说 UADF 显然是一个非目标(我不清楚 UDAF 和 GUDF(?)s 究竟有何不同)