在pyspark中读取json文件

2024-04-11

我是 PySpark 的新手,下面是来自 kafka 的 JSON 文件格式。

{
        "header": {
        "platform":"atm",
        "version":"2.0"
       }
        "details":[
       {
        "abc":"3",
        "def":"4"
       },
       {
        "abc":"5",
        "def":"6"
       },
       {
        "abc":"7",
        "def":"8"
       }    
      ]
    }

我怎样才能读懂所有的价值观"abc" "def"详细信息并将其添加到这样的新列表中[(1,2),(3,4),(5,6),(7,8)]。新列表将用于创建 Spark 数据框。我如何在 pyspark 中执行此操作。我尝试了以下代码。

parsed = messages.map(lambda (k,v): json.loads(v))
list = []
summed = parsed.map(lambda detail:list.append((String(['mcc']), String(['mid']), String(['dsrc']))))
output = summed.collect()
print output

它产生错误'太多的值需要解压'

语句下方出现错误消息summed.collect()

16/09/12 12:46:10 信息弃用:mapred.task.is.map 已弃用。 相反,使用 mapreduce.task.ismap 16/09/12 12:46:10 INFO 弃用: mapred.task.partition 已弃用。相反,使用 mapreduce.task.partition 16/09/12 12:46:10 信息弃用: mapred.job.id 已弃用。相反,使用mapreduce.job.id 16/09/12 12:46:10 错误执行器:阶段 0.0 中的任务 1.0 出现异常(TID 1) org.apache.spark.api.python.PythonException:回溯(最新 最后调用):文件 “/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py”, 111行,主要 process() 文件“/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py”, 第 106 行,正在进行中 serializer.dump_stream(func(split_index, iterator), outfile) 文件 “/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py”, 第 263 行,在 dump_stream 中 vs = list(itertools.islice(iterator, batch)) File "", line 1, in ValueError: 太多值无法解压


首先,json无效。标题 a 之后,不见了。

话虽这么说,让我们看一下这个 json:

{"header":{"platform":"atm","version":"2.0"},"details":[{"abc":"3","def":"4"},{"abc":"5","def":"6"},{"abc":"7","def":"8"}]}

这可以通过以下方式处理:

>>> df = sqlContext.jsonFile('test.json')
>>> df.first()
Row(details=[Row(abc='3', def='4'), Row(abc='5', def='6'), Row(abc='7', def='8')], header=Row(platform='atm', version='2.0'))

>>> df = df.flatMap(lambda row: row['details'])
PythonRDD[38] at RDD at PythonRDD.scala:43

>>> df.collect()
[Row(abc='3', def='4'), Row(abc='5', def='6'), Row(abc='7', def='8')]

>>> df.map(lambda entry: (int(entry['abc']),     int(entry['def']))).collect()
[(3, 4), (5, 6), (7, 8)]

希望这可以帮助!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在pyspark中读取json文件 的相关文章

随机推荐