我是 PySpark 的新手,下面是来自 kafka 的 JSON 文件格式。
{
"header": {
"platform":"atm",
"version":"2.0"
}
"details":[
{
"abc":"3",
"def":"4"
},
{
"abc":"5",
"def":"6"
},
{
"abc":"7",
"def":"8"
}
]
}
我怎样才能读懂所有的价值观"abc"
"def"
详细信息并将其添加到这样的新列表中[(1,2),(3,4),(5,6),(7,8)]
。新列表将用于创建 Spark 数据框。我如何在 pyspark 中执行此操作。我尝试了以下代码。
parsed = messages.map(lambda (k,v): json.loads(v))
list = []
summed = parsed.map(lambda detail:list.append((String(['mcc']), String(['mid']), String(['dsrc']))))
output = summed.collect()
print output
它产生错误'太多的值需要解压'
语句下方出现错误消息summed.collect()
16/09/12 12:46:10 信息弃用:mapred.task.is.map 已弃用。
相反,使用 mapreduce.task.ismap 16/09/12 12:46:10 INFO 弃用:
mapred.task.partition 已弃用。相反,使用
mapreduce.task.partition 16/09/12 12:46:10 信息弃用:
mapred.job.id 已弃用。相反,使用mapreduce.job.id 16/09/12
12:46:10 错误执行器:阶段 0.0 中的任务 1.0 出现异常(TID 1)
org.apache.spark.api.python.PythonException:回溯(最新
最后调用):文件
“/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py”,
111行,主要
process() 文件“/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py”,
第 106 行,正在进行中
serializer.dump_stream(func(split_index, iterator), outfile) 文件
“/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py”,
第 263 行,在 dump_stream 中
vs = list(itertools.islice(iterator, batch)) File "", line 1, in ValueError: 太多值无法解压