使用以下代码在数据帧中使用联合来并发访问附加行是否可以正常工作?目前显示类型错误
from pyspark.sql.types import *
schema = StructType([
StructField("owreg", StringType(), True),StructField("we", StringType(), True)
,StructField("aa", StringType(), True)
,StructField("cc", StringType(), True)
,StructField("ss", StringType(), True)
,StructField("ss", StringType(), True)
,StructField("sss", StringType(), True)
])
f = sqlContext.createDataFrame(sc.emptyRDD(), schema)
def dump(l,jsid):
if not l.startswith("<!E!>"):
f=f.unionAll(sqlContext.read.json(l))
savedlabels.limit(10).foreach(lambda a: dump(a.labels,a.job_seq_id))
假设 sqlContext.read.json(l) 将读取 json 并输出具有相同架构的 RDD
模式是我想尽可能高效地将存储在 RDD 的列中的多个 json 表“减少”为 RDD 表。
def dump(l,jsid):
if not l.startswith("<!E!>"):
f=f.unionAll(sc.parallelize(json.loads(l)).toDF())
当工作线程调用 sc.parallelize 时,上述代码也将不起作用。那么如何解决这个问题呢?