我在用pySpark
,并设置了我的数据框,其中两列代表每日资产价格,如下所示:
ind = sc.parallelize(range(1,5))
prices = sc.parallelize([33.3,31.1,51.2,21.3])
data = ind.zip(prices)
df = sqlCtx.createDataFrame(data,["day","price"])
我申请后得到df.show()
:
+---+-----+
|day|price|
+---+-----+
| 1| 33.3|
| 2| 31.1|
| 3| 51.2|
| 4| 21.3|
+---+-----+
这很好。我想要另一列包含价格列的日常回报,即类似
(price(day2)-price(day1))/(price(day1))
经过大量研究后,我得知,通过应用以下方法可以最有效地实现这一目标pyspark.sql.window
功能,但我不知道如何实现。
您可以使用前一天的列lag函数,并添加额外的列来从两列中执行实际的日常返回,但是您可能必须告诉 Spark 如何对数据进行分区和/或命令它进行滞后,如下所示:
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import lit
dfu = df.withColumn('user', lit('tmoore'))
df_lag = dfu.withColumn('prev_day_price',
func.lag(dfu['price'])
.over(Window.partitionBy("user")))
result = df_lag.withColumn('daily_return',
(df_lag['price'] - df_lag['prev_day_price']) / df_lag['price'] )
>>> result.show()
+---+-----+-------+--------------+--------------------+
|day|price| user|prev_day_price| daily_return|
+---+-----+-------+--------------+--------------------+
| 1| 33.3| tmoore| null| null|
| 2| 31.1| tmoore| 33.3|-0.07073954983922816|
| 3| 51.2| tmoore| 31.1| 0.392578125|
| 4| 21.3| tmoore| 51.2| -1.403755868544601|
+---+-----+-------+--------------+--------------------+
这里有更长的介绍Spark 中的窗口函数.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)