Spark:如何使用动态嵌套数组转置和分解列

2024-04-10

我应用了问题中的算法Spark:如何转置和分解具有嵌套数组的列 https://stackoverflow.com/questions/69418239/spark-how-to-transpose-and-explode-columns-with-nested-arrays使用动态数组转置和分解嵌套 Spark 数据框。

我已添加到数据框"""{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}""",有新列c,其中数组有新的val_dynamic可以随机出现的字段。

我正在寻找所需的输出 2(转置和爆炸),但即使是所需的输出 1(转置)的示例也将非常有用。

输入 df:

+------------------+--------+-----------+---+
|                 a|       b|          c| id|
+------------------+--------+-----------+---+
|[{1, 1}, {11, 11}]|    null|       null|  1|
|              null|[{2, 2}]|       null|  2|
|              null|    null|[{3, 3, 3}]|  3|   !!! NOTE: Added `val_dynamic`
+------------------+--------+-----------+---+


root
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)  !!! NOTE: Added `val_dynamic`
 |-- id: long (nullable = true)

所需输出 1 (transpose_df):

+---+------+-------------------+
| id| cols |       arrays      |
+---+------+-------------------+
|  1|  a   | [{1, 1}, {11, 11}]|
|  2|  b   | [{2, 2}]          |
|  3|  c   | [{3, 3, 3}]       | !!! NOTE: Added `val_dynamic`
+---+------+-------------------+

所需输出 2 (explode_df):

+---+----+----+---+-----------+
| id|cols|date|val|val_dynamic|
+---+----+----+---+-----------+
|  1|   a|   1|  1|   null    |
|  1|   a|  11| 11|   null    |
|  2|   b|   2|  2|   null    |
|  3|   c|   3|  3|      3    |  !!! NOTE: Added `val_dynamic`
+---+----+----+---+-----------+

当前代码:

import pyspark.sql.functions as f

df = spark.read.json(sc.parallelize([
  """{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
  """{"id":2,"b":[{"date":2,"val":2}]}}""",
  """{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}"""
    ]))

df.show()

cols = [ 'a', 'b', 'c']

#expr = stack(2,'a',a,'b',b,'c',c )
expr = f"stack({len(cols)}," + \
    ",".join([f"'{c}',{c}" for c in cols]) + \
    ")"


transpose_df = df.selectExpr("id", expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

transpose_df.show()

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
explode_df.show()

目前的结果

AnalysisException: cannot resolve 'stack(3, 'a', `a`, 'b', `b`, 'c', `c`)' due to data type mismatch: Argument 2 (array<struct<date:bigint,val:bigint>>) != Argument 6 (array<struct<date:bigint,val:bigint,val_dynamic:bigint>>); line 1 pos 0;
'Project [id#2304L, unresolvedalias(stack(3, a, a#2301, b, b#2302, c, c#2303), Some(org.apache.spark.sql.Column$$Lambda$2580/0x00000008411d3040@4d9eefd0))]
+- LogicalRDD [a#2301, b#2302, c#2303, id#2304L], false

ref :


stack要求所有堆积的列具有相同的类型。这里的问题是数组内部的结构具有不同的成员。一种方法是将缺少的成员添加到所有结构中,以便我的方法之前的回答 https://stackoverflow.com/a/69419416/2129801再次工作。

cols = ['a', 'b', 'c']

#create a map containing all struct fields per column
existing_fields = {c:list(map(lambda field: field.name, df.schema.fields[i].dataType.elementType.fields)) 
      for i,c in enumerate(df.columns) if c in cols}

#get a (unique) set of all fields that exist in all columns
all_fields = set(sum(existing_fields.values(),[]))

#create a list of transform expressions to fill up the structs will null fields
transform_exprs = [f"transform({c}, e -> named_struct(" + 
    ",".join([f"'{f}', {('e.'+f) if f in existing_fields[c] else 'cast(null as long)'}" for f in all_fields]) 
    + f")) as {c}" for c in cols]

#create a df where all columns contain arrays with the same struct
full_struct_df = df.selectExpr("id", *transform_exprs)

full_struct_df现在有架构

root
 |-- id: long (nullable = true)
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)

从这里开始,逻辑像以前一样工作:

stack_expr = f"stack({len(cols)}," + \
    ",".join([f"'{c}',{c}" for c in cols]) + \
    ")"

transpose_df = full_struct_df.selectExpr("id", stack_expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')

这个答案的第一部分要求

  • 中提到的每一列cols是一个结构体数组
  • 所有结构的所有成员都是long是。实行这一限制的原因是cast(null as long)创建变换表达式时。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark:如何使用动态嵌套数组转置和分解列 的相关文章

随机推荐

  • 在 Scala 3 中派生不透明类型的类型类实例

    Scala 3 有没有办法使用derives关键字与不透明类型别名结合使用 最好有一种无样板的方法 通过自动依赖基础类型 如果有 的相同类型类的实例来为给定的不透明类型别名提供类型类实例 如果能够表达类似的东西就好了 opaque type
  • 将逗号分隔的字符串解析为某种可以循环访问各个值的对象的最简单方法?

    将逗号分隔的字符串值列表解析为可以循环的某种对象的最简单方法是什么 以便我可以轻松访问各个值 示例字符串 0 10 20 30 100 200 我对 C 有点陌生 所以请原谅我问这样一个简单的问题 谢谢 这有一些问题 但最终最简单的方法是使
  • 子类如何访问父类的属性?

    我有一个关于 Javascript 对象的问题 如何访问父类的属性 function randomObj for example button obj this text this is obj function parentClass t
  • dreamhost 上的 SSH 密钥

    我正在尝试设置与 dreamhost 和我的本地计算机配对的 SSH 密钥 我使用 git bash 作为我的终端 使用 mingw32 我可以通过 ssh 电子邮件受保护 cdn cgi l email protection并要求我提供密
  • rspec,未知属性问题

    我正在 优秀的 railstutorial org 网站上工作 有一个关于 rspec 的基本问题 当我在新用户模型上运行以下测试时 我收到 未知属性 用户名 消息和失败的测试 before each do attr lname e gt
  • 从 IE EPM BHO 内访问命名管道服务器

    我正在尝试对我们的旧产品进行一些更改 以支持 BHO 上的 IE EPM 我已经设法加载它并调用各种方法 SetSite DocumentComplete 等 当我尝试连接到 Windows 服务中运行的命名管道服务器时 我似乎遇到了障碍
  • 如何在 Clojure 中遍历一棵树,同时收集每个节点节点的值?

    我想创建一个函数来收集二叉树中每个节点的值 在 ClojureDocs 中 我发现了几个用于遍历树 图的函数 例如 tree seq prewalk 和 postwalk https clojuredocs org clojure core
  • 为什么在 MySQL 中使用 CAST 时出现语法错误?

    我正在使用 MySQL Workbench v5 2 44 CE 我正在针对本地 MySQL 5 5 安装运行它 我正在尝试使用CAST函数 但不断出现以下错误 语法错误 意外的 INT SYM 源日期和目标日期类型是什么并不重要 唯一不给
  • Google oauth2 与 devise 和omniauth 处理为失败

    我正在尝试配置一个新的rails4 2应用程序来针对Google Oauth2进行身份验证 我似乎成功地完成了这个过程 但它被视为失败 最初的授权似乎进展顺利 直到谷歌发送回调 那么似乎就被错误地认定为失败了 给出的错误信息是 Could
  • 查找信标的两个地理位置之间的点

    假设我们有两个beacons放置在道路两侧 我们知道他们的latitude and longitude它们所在的位置 我们将它们视为一个位置 我们还知道distance两者之间以米为单位beacons 使用半正矢公式测量 我们的设备正在这两
  • 如何修复警告:视图不在窗口层次结构中

    我正在使用标准设置程序Mobclix in an iOS应用程序 我正在调用requestAndDisplayAdFromViewController 从内部开始的方法viewWillAppear void viewWillAppear B
  • 吞没的消息:错误:未捕获(承诺):[对象未定义]

    我的登录组件会短暂显示 然后被有关承诺中未定义对象的错误消息删除 这是承诺的定义 static init Promise
  • 如何在 ActiveAdmin 中使用 ActiveStorage `has_many_attached` 编辑多个附加图像

    我有一个简单的模型 可以通过附加多个图像ActiveStorage处理文件存储 我在用ActiveAdmin编辑我的模型并上传 附加图像 到目前为止没有问题 问题是 当我想编辑模型并添加新图像时 以前的图像会被删除 只添加新图像 我可以预览
  • Maven 内存不足构建失败

    截至今天 我的 Maven 编译失败 INFO ERROR Unexpected INFO java lang OutOfMemoryError Java heap space INFO at java util Arrays copyOf
  • 将 Hive 表导出到 hdfs 中的 csv

    我知道在 Hive 中将表保存到 csv 或其他文本文件 时 分隔符存在一个已知问题 所以我想知道你们是否可以帮助我解决这个问题 我有一个现有的表 表 A 我想将其以 csv 格式保存到 hdfs 通过阅读其他回复 我相信我必须首先创建一个
  • java中如何复制文件

    我正在尝试复制 java 中的文件并将其移动到新文件夹 这是我一直在使用的代码 但我总是收到此错误 访问被拒绝 在指定目录中 有没有办法解决这个问题或者有更好的方法来复制文件 谢谢 try File f1 new File fpath Fi
  • 来自 URI 的图像路径

    我正在尝试从图库中获取图像文件 Intent intent new Intent intent setType image intent setAction Intent ACTION GET CONTENT startActivityFo
  • 行范围内的替换

    I have
  • 如何使用 Spring 和 Active Directory 实现单点登录

    我有一个基于 Spring 的 Web 应用程序 我想在其上实现单点登录解决方案 基本流程是 1 用户登录 Windows 工作站 台式电脑 根据组织的 Active Directory 进行身份验证 2 用户打开浏览器并导航到 Sprin
  • Spark:如何使用动态嵌套数组转置和分解列

    我应用了问题中的算法Spark 如何转置和分解具有嵌套数组的列 https stackoverflow com questions 69418239 spark how to transpose and explode columns wi