我正在处理由 JSON 创建的数据帧,然后我想在数据帧上应用过滤条件。
val jsonStr = """{ "metadata": [{ "key": 84896, "value": 54 },{ "key": 1234, "value": 12 }]}"""
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)
df 的模式
root
|-- metadata: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: long (nullable = true)
| | |-- value: long (nullable = true)
现在我需要过滤我想要做的数据框
val df1=df.where("key == 84896")
这会引发错误
ERROR Executor - Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [metadata]; line 1 pos 0;
'Filter ('key = 84896)
我想使用where子句的原因是因为我想直接使用表达式字符串
例如( (key == 999, value == 55) || (key == 1234, value == 12) )
首先你应该使用explode
获得易于使用的数据帧。然后您可以选择给定输入的键和值:
val explodedDF = df.withColumn("metadata", explode($"metadata"))
.select("metadata.key", "metadata.value")
Output:
+-----+-----+
| key|value|
+-----+-----+
|84896| 54|
| 1234| 12|
+-----+-----+
这样您就可以像往常一样执行过滤逻辑:
scala> explodedDF.where("key == 84896").show
+-----+-----+
| key|value|
+-----+-----+
|84896| 54|
+-----+-----+
您可以串联您的过滤要求,下面是一些示例:
explodedDF.where("key == 84896 AND value == 54")
explodedDF.where("(key == 84896 AND value == 54) OR key = 1234")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)