当我向 pyspark 数据帧迭代添加 500 多列时,遇到了 stackoverflowerrors。所以,我包括了检查点。检查站没有帮助。因此,我创建了以下玩具应用程序来测试我的检查点是否正常工作。我在此示例中所做的就是通过一遍又一遍地复制原始列来迭代创建列。我坚持、检查点并每 10 次迭代进行计数。我注意到我的 dataframe.rdd.isCheckpointed() 总是返回 False。我可以验证检查点文件夹确实正在创建并填充在磁盘上。我正在 glcoud 上的 dataproc 上运行。
这是我的代码:
from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import sys
APP_NAME = "isCheckPointWorking"
spark = SparkSession\
.builder\
.appName(APP_NAME)\
.config("spark.sql.crossJoin.enabled","true")\
.getOrCreate()
sc = SparkContext.getOrCreate()
#set the checkpoint directory
sc.setCheckpointDir('gs://mybucket/checkpointtest/')
#create a spark dataframe with one column containing numbers 1 through 9
df4 = spark.createDataFrame(pd.DataFrame(np.arange(1,10),columns = ["A"]))
df4.show()
#create a list of new columns to be added to the dataframe
numberList = np.arange(0,40)
colNewList = ['col'+str(x) for x in numberList]
print(colNewList)
iterCount = 0
for colName in colNewList:
#copy column A in to the new column
df4 = df4.withColumn(colName,df4.A)
if (np.mod(iterCount,10) == 0):
df4 = df4.persist(StorageLevel.MEMORY_AND_DISK)
df4.checkpoint(eager=True)
df4.count()
#checking if underlying RDD is being checkpointed
print("is data frame checkpointed "+str(df4.rdd.isCheckpointed()))
iterCount +=1
当我看到检查点文件夹正在填充时,尚不清楚为什么 df4.rdd.isCheckpointed() 每次都返回 False。有什么想法吗?
checkpoint方法返回一个新的检查点Dataset,它不会修改当前的Dataset。
Change
df4.checkpoint(eager=True)
To
df4 = df4.checkpoint(eager=True)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)