首先我想说我被迫使用 Spark 1.6
我正在生成一个DataFrame
来自这样的 JSON 文件:
{"id" : "1201", "name" : "satish", "age" : "25"},
{"id" : "1202", "name" : "krishna", "age" : "28"},
{"id" : "1203", "name" : "amith", "age" : "28"},
{"id" : "1204", "name" : "javed", "age" : "23"},
{"id" : "1205", "name" : "mendy", "age" : "25"},
{"id" : "1206", "name" : "rob", "age" : "24"},
{"id" : "1207", "name" : "prudvi", "age" : "23"}
The DataFrame
好像:
+---+----+-------+
|age| id| name|
+---+----+-------+
| 25|1201| satish|
| 28|1202|krishna|
| 28|1203| amith|
| 23|1204| javed|
| 25|1205| mendy|
| 24|1206| rob|
| 23|1207| prudvi|
+---+----+-------+
我用这个做什么DataFrame
就是按年龄分组,按id排序,过滤所有年龄组中学生人数超过1人的。我使用以下脚本:
import sqlContext.implicits._
val df = sqlContext.read.json("students.json")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val arrLen = udf {a: Seq[Row] => a.length > 1 }
val mergedDF = df.withColumn("newCol", collect_set(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("newCol","age")
val filterd = mergedDF.filter(arrLen(col("newCol")))
现在当前的结果是:
[WrappedArray([28,1203,amith], [28,1202,krishna]),28]
[WrappedArray([25,1201,satish], [25,1205,mendy]),25]
[WrappedArray([23,1204,javed], [23,1207,prudvi]),23]
我现在想要的是将这两个学生行合并到WrappedArray
合而为一,例如id
第一个学生和name
第二个学生的。
为了实现这一点,我编写了以下函数:
def PrintOne(List : Seq[Row], age : String):Row ={
val studentsDetails = Array(age, List(0).getAs[String]("id"), List(1).getAs[String]("name"))
val mergedStudent= new GenericRowWithSchema(studentsDetails .toArray,List(0).schema)
mergedStudent
}
我知道这个函数可以解决问题,因为当我使用 foreach 测试它时,它会打印出预期值:
filterd.foreach{x => val student = PrintOne(x.getAs[Seq[Row]](0), x.getAs[String]("age"))
println("merged student: "+student)
}
OutPut:
merged student: [28,1203,krishna]
merged student: [23,1204,prudvi]
merged student: [25,1201,mendy]
但是,当我尝试在地图内执行相同的操作来收集返回值时,问题就开始了。
如果我在没有编码器的情况下运行:
val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}
我得到以下异常:
线程“main”中的异常 java.lang.UnsupportedOperationException:否
找到 org.apache.spark.sql.Row 的编码器
- 字段(类:“org.apache.spark.sql.Row”,名称:“_2”)
- 根类:“scala.Tuple2”
当我尝试生成一个Econder
就我自己而言,我也失败了:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
implicit val encoder = RowEncoder(filterd.schema)
val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}(encoder)
类型不匹配;成立 :
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row]
必需:org.apache.spark.sql.Encoder[(字符串,
org.apache.spark.sql.Row)]
我怎样才能提供正确的编码器,或者更好的是避免它?
我被告知要避免使用映射+自定义函数,但我需要应用的逻辑比仅从每一行中选取一个字段更复杂。将多个字段组合起来,检查行的顺序以及值是否为空将更加重要。据我所知,只需使用自定义函数就可以解决它。