我在 pyspark 中有一个数据框。这是它的样子,
+---------+---------+
|timestamp| price |
+---------+---------+
|670098928| 50 |
|670098930| 53 |
|670098934| 55 |
+---------+---------+
我想填补时间戳与之前状态之间的空白,这样我就可以获得一个完美的集合来计算时间加权平均值。输出应该是这样的 -
+---------+---------+
|timestamp| price |
+---------+---------+
|670098928| 50 |
|670098929| 50 |
|670098930| 53 |
|670098931| 53 |
|670098932| 53 |
|670098933| 53 |
|670098934| 55 |
+---------+---------+
最终,我想将这个新数据帧保留在磁盘上并可视化我的分析。
我如何在 pyspark 中执行此操作? (为了简单起见,我只保留了 2 列。在填补空白之前,我的实际数据框有 89 列,大约有 6.7 亿条记录。)
您可以生成时间戳范围,将其展平并选择行
import pyspark.sql.functions as func
from pyspark.sql.types import IntegerType, ArrayType
a=sc.parallelize([[670098928, 50],[670098930, 53], [670098934, 55]])\
.toDF(['timestamp','price'])
f=func.udf(lambda x:range(x,x+5),ArrayType(IntegerType()))
a.withColumn('timestamp',f(a.timestamp))\
.withColumn('timestamp',func.explode(func.col('timestamp')))\
.groupBy('timestamp')\
.agg(func.max(func.col('price')))\
.show()
+---------+----------+
|timestamp|max(price)|
+---------+----------+
|670098928| 50|
|670098929| 50|
|670098930| 53|
|670098931| 53|
|670098932| 53|
|670098933| 53|
|670098934| 55|
|670098935| 55|
|670098936| 55|
|670098937| 55|
|670098938| 55|
+---------+----------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)