所以,我下面要做的是删除一列A
from a DataFrame
因为我想应用一个转换(这里我只是json.loads
JSON 字符串)并将旧列替换为转换后的列。转换后,我只需连接两个结果数据框。
df = df_data.drop('A').join(
df_data[['ID', 'A']].rdd\
.map(lambda x: (x.ID, json.loads(x.A))
if x.A is not None else (x.ID, None))\
.toDF()\
.withColumnRenamed('_1', 'ID')\
.withColumnRenamed('_2', 'A'),
['ID']
)
我不喜欢的当然是我所面临的开销,因为我必须做withColumnRenamed
运营。
对于 pandas All 我会做这样的事情:
pdf = pd.DataFrame([json.dumps([0]*np.random.randint(5,10)) for i in range(10)], columns=['A'])
pdf.A = pdf.A.map(lambda x: json.loads(x))
pdf
但以下内容在 pyspark 中不起作用:
df.A = df[['A']].rdd.map(lambda x: json.loads(x.A))
那么有没有比我在第一个代码片段中所做的更简单的方法呢?
I do not think you need to drop the column and do the join. The following code should* be equivalent to what you posted:
cols = df_data.columns
df = df_data.rdd\
.map(
lambda row: tuple(
[row[c] if c != 'A' else (json.loads(row[c]) if row[c] is not None else None)
for c in cols]
)
)\
.toDF(cols)
*I haven't actually tested this code, but I think this should work.
但要回答您的一般问题,您可以使用以下方法就地转换列withColumn()
.
df = df_data.withColumn("A", my_transformation_function("A").alias("A"))
Where my_transformation_function()
可以是一个udf
or a pyspark sql function
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)