Spark - Java UDF 返回多列

2024-02-29

我正在使用 SparkSql 1.6.2 (Java API),我必须处理以下 DataFrame,该 DataFrame 在 2 列中具有值列表:

ID  AttributeName AttributeValue
 0  [an1,an2,an3] [av1,av2,av3]
 1  [bn1,bn2]     [bv1,bv2]

所需的表是:

ID  AttributeName AttributeValue
 0  an1           av1
 0  an2           av2
 0  an3           av3
 1  bn1           bv1
 1  bn2           bv2

我想我必须结合使用爆炸函数和自定义 UDF 函数。

我找到了以下资源:

  • 分解(转置?)Spark SQL 表中的多列 https://stackoverflow.com/questions/33220916/explode-transpose-multiple-columns-in-spark-sql-table
  • 如何使用 JAVA 在 Spark DataFrame 上调用 UDF? https://stackoverflow.com/questions/35348058/how-do-i-call-a-udf-on-a-spark-dataframe-using-java

我可以成功运行一个示例,读取两列并返回列中前两个字符串的串联

 UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
        public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            return col1.apply(0) + col2.apply(0);
        }
    };

 context.udf().register("combineUDF", combineUDF, DataTypes.StringType);

问题是编写返回两列的 UDF 的签名(在 Java 中)。 据我了解,我必须定义一个新的 StructType 如下所示,并将其设置为返回类型,但到目前为止我还没有设法使最终代码正常工作

StructType retSchema = new StructType(new StructField[]{
            new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
            new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
        }
    );

context.udf().register("combineUDF",combineUDF,retSchema);

任何帮助将不胜感激。

UPDATE:我试图首先实现 zip(AttributeName,AttributeValue) 所以我只需要在 SparkSql 中应用标准爆炸函数:

ID  AttName_AttValue
 0  [[an1,av1],[an1,av2],[an3,av3]]
 1  [[bn1,bv1],[bn2,bv2]]

我构建了以下 UDF:

UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
        public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            List<List<String>> zipped = new LinkedList<>();

            for (int i = 0, listSize = col1.size(); i < listSize; i++) {
                List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
                zipped.add(subRow);
            }

            return zipped;
        }

    };

但是当我运行代码时

myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);

我收到以下错误消息:

scala.MatchError: [[an1,av1],[an1,av2],[an3,av3]] (属于 java.util.LinkedList 类)

看起来组合已正确执行,但返回类型不是 Scala 中预期的类型。

有帮助吗?


最后我设法得到了我正在寻找的结果,但可能不是以最有效的方式。

基本上有2步:

  • 两个列表的 zip
  • 按行展开列表

对于第一步,我定义了以下 UDF 函数

UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() {
    public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
        ArrayList zipped = new ArrayList();

        for (int i = 0, listSize = col1.size(); i < listSize; i++) {
            String subRow = col1.apply(i) + ";" + col2.apply(i);
            zipped.add(subRow);
        }

        return scala.collection.JavaConversions.asScalaBuffer(zipped);
    }

};

缺少 SparkSession 的函数注册:

sparkSession.udf().register("concatItems",concatItems,DataTypes.StringType);

然后我用以下代码调用它:

DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue"));

在这个阶段 df2 看起来像这样:

ID  AttName_AttValue
 0  [[an1,av1],[an1,av2],[an3,av3]]
 1  [[bn1,bv1],[bn2,bv2]]

然后我调用以下 lambda 函数将列表分解为行:

 DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row"));

在这个阶段,df3 看起来像这样:

ID  AttName_AttValue
 0  [an1,av1]
 0  [an1,av2]
 0  [an3,av3]
 1  [bn1,bv1]
 1  [bn2,bv2]

最后,为了将属性名称和值拆分为两个不同的列,我将 DataFrame 转换为 JavaRDD 以便使用映射函数:

JavaRDD df3RDD = df3.toJavaRDD().map(
            (Function<Row, Row>) myRow -> {
                String[] info = String.valueOf(myRow.get(1)).split(",");
                return RowFactory.create(myRow.get(0), info[0], info[1]);
        }).cache();

如果有人有更好的解决方案,请随时发表评论。 我希望它有帮助。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark - Java UDF 返回多列 的相关文章

  • Maven:无法在 OS X 上找到 java.lang 问题

    当我尝试时遇到以下问题mvn clean install显然它无法找到运行时 jar 但我需要做什么 错误日志 ERROR COMPILATION ERROR INFO ERROR Failure executing javac but c
  • 匿名类*总是*维护对其封闭实例的引用吗?

    我正在处理一些代码 其中一个对象 foo 正在创建另一个对象 对象 bar 并将其传递给Callable 之后 foo 将返回 bar 然后我希望 foo 变得无法访问 即 可用于 垃圾收集 我最初的想法是创建Callable匿名 例如 c
  • JCombobox 字符串项(可见)和整数键(固有)

    我有一个数据库模式 它将作为 JTable 列显示在 JCombobox 中以选择名称 但我希望将 ID 字段插入 作为外键 到另一个表中 通常 在下拉列表中选择一个项目 将所选项目带到组合框的显示区域 我想要做的是 当选择组合框中的任何项
  • 使用 Bouncy Castle 重建 ED25519 按键 (Java)

    Bouncy Castle 的最新 测试版 版本 bcprov jdk15on 161b20 jar 支持 ED25519 和 ED448 EC 加密以进行签名 我设置了这个完整的工作示例 它按预期工作 我的问题 我是否正确重建了私钥和公钥
  • 如何在我的 HttpClient 执行器中遵循单一职责原则?

    我在用RestTemplate http docs spring io spring docs current javadoc api org springframework web client RestTemplate html as
  • Java生成范围内不重复的随机数

    我想生成 1 到 4 范围内的随机数 包括 4 这是我的代码 int num r nextInt 4 1 r is instance of Random 但是 我在循环中运行上述代码 并且不想重复随机数 现在发生的事情我经常得到 1 1 1
  • 通过 Session.update 和 HibernateTemplate.merge 进行 Hibernate 更新的区别

    我看到了更新操作的类型 第一的 getHibernateTemplate execute new HibernateCallback public Object doInHibernate Session session session f
  • 使用 Oracle Wallet 身份验证从 Spring-jdbc 连接到 Oracle DB

    我将 Spring jdbc 与 org apache commons dbcp BasicDataSource 结合使用 使用用户名和密码进行连接 我想使用BasicDataSource 因为我只有一个连接 我有这个代码
  • Spring Security登录返回404

    我目前正在使用 Spring 框架开发我的博客 我正在实现 Spring Security 用于登录目的 一切都按预期进行 直到我提交始终返回 404 代码的登录凭据 这是我的 web xml 代码e
  • 无法在android中使用retrofit发出@Post请求

    我正在学习如何在 android 中使用改造 但是每当我尝试从互联网检索数据时 我的应用程序不会返回任何内容我的响应没有成功 我不知道如何修复当前我正在尝试发布的错误并使用此 URL 检索数据https jsonplaceholder ty
  • 公共领域有哪些替代方案?

    我正在用 java 编写一个游戏 正如问题标题建议的那样 我在类中使用公共字段 暂且 据我所知 公共领域很糟糕 我有一些理解其中的原因 但如果有人能澄清为什么你不应该使用它们 那将不胜感激 问题是 从我所看到的来看 这似乎是合乎逻辑的 是使
  • 字节流和字符流

    请解释一下什么是字节流和字符流 这些究竟意味着什么 Microsoft Word 文档是面向字节的还是面向字符的 Thanks 流是一种顺序访问文件的方式 字节流逐字节访问文件 字节流适用于任何类型的文件 但不太适合文本文件 例如 如果文件
  • 使用 java 中的准备好的语句插入自定义 SQL 类型

    我有一些自定义类型 它们基本上都是枚举 以下是它们的外观示例 CREATE TYPE card suit AS ENUM spades clubs hearts diamonds 我在 Java 中有一些准备好的语句 看起来像这样 Setu
  • 如何告诉 Eclipse 忽略 Ant build.xml 中的错误?

    我有一个使用 Maven 构建的 Eclipse 项目 并且我在 Eclipse 中使用 m2eclipse 插件来获得 Maven 支持 然而这个项目还包含一个build xml它并不用于实际构建项目 而只是用于编写脚本功能 作为项目开发
  • 如何在 Struts 2 中访问 OGNL 跟踪评估?

    有人告诉我要优化网络应用程序 为此 我使用JProfiler https www ej technologies com products jprofiler overview html 我注意到很大一部分响应时间都花在了表示层上 特别是当
  • 在大画布上滚动

    我需要一些帮助来了解滚动绘制到 Android 画布上的项目的基础知识 假设我想创建一个时间线 其中 0 处的时间是可视化的顶部 并且随着时间的增加 时间线继续呈现在上一个点下方 如果我想在 Android 上渲染它 我知道我可以通过重写
  • Android中计算两个时间之间的差异

    我有两个字符串变量 例如 StartTime 和 EndTime 我需要通过用 StartTime 减去 EndTime 来计算 TotalTime StartTime和EndTime的格式如下 StartTime 08 00 AM End
  • 如何找到 JAR:/home/hadoop/contrib/streaming/hadoop-streaming.jar

    我正在练习有关 Amazon EMR 的复数视角视频教程 我被困住了 因为我收到此错误而无法继续 Not a valid JAR home hadoop contrib streaming hadoop streaming jar 请注意
  • 无法验证 serde:org.openx.data.jsonserde.jsonserde

    我编写了这个查询来在配置单元上创建一个表 我的数据最初是 json 格式 所以我已经下载并构建了 serde 并添加了它运行所需的所有 jar 但我收到以下错误 FAILED Execution Error return code 1 fr
  • 尝试接收 UDP 多播时出现空指针异常

    在尝试了几次让简单的 UDP 多播接收器工作后 我感到很困惑 在我自己的代码无法按预期工作后 我尝试了 vertx 文档中发布的确切示例 DatagramSocket socket vertx createDatagramSocket ne

随机推荐