我正在使用 SparkSql 1.6.2 (Java API),我必须处理以下 DataFrame,该 DataFrame 在 2 列中具有值列表:
ID AttributeName AttributeValue
0 [an1,an2,an3] [av1,av2,av3]
1 [bn1,bn2] [bv1,bv2]
所需的表是:
ID AttributeName AttributeValue
0 an1 av1
0 an2 av2
0 an3 av3
1 bn1 bv1
1 bn2 bv2
我想我必须结合使用爆炸函数和自定义 UDF 函数。
我找到了以下资源:
- 分解(转置?)Spark SQL 表中的多列 https://stackoverflow.com/questions/33220916/explode-transpose-multiple-columns-in-spark-sql-table
- 如何使用 JAVA 在 Spark DataFrame 上调用 UDF? https://stackoverflow.com/questions/35348058/how-do-i-call-a-udf-on-a-spark-dataframe-using-java
我可以成功运行一个示例,读取两列并返回列中前两个字符串的串联
UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
return col1.apply(0) + col2.apply(0);
}
};
context.udf().register("combineUDF", combineUDF, DataTypes.StringType);
问题是编写返回两列的 UDF 的签名(在 Java 中)。
据我了解,我必须定义一个新的 StructType 如下所示,并将其设置为返回类型,但到目前为止我还没有设法使最终代码正常工作
StructType retSchema = new StructType(new StructField[]{
new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
}
);
context.udf().register("combineUDF",combineUDF,retSchema);
任何帮助将不胜感激。
UPDATE:我试图首先实现 zip(AttributeName,AttributeValue) 所以我只需要在 SparkSql 中应用标准爆炸函数:
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1],[bn2,bv2]]
我构建了以下 UDF:
UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
List<List<String>> zipped = new LinkedList<>();
for (int i = 0, listSize = col1.size(); i < listSize; i++) {
List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
zipped.add(subRow);
}
return zipped;
}
};
但是当我运行代码时
myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);
我收到以下错误消息:
scala.MatchError: [[an1,av1],[an1,av2],[an3,av3]] (属于 java.util.LinkedList 类)
看起来组合已正确执行,但返回类型不是 Scala 中预期的类型。
有帮助吗?