如何在 Spark SQL 中压缩两个数组列

2024-03-06

我有一个 Pandas 数据框。我尝试首先将包含字符串值的两列连接到一个列表中,然后使用 zip,我用“_”连接列表的每个元素。我的数据集如下:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'

我想将这两列连接到第三列中,如下所示,为我的数据帧的每一行。

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

我已经使用下面的代码在 python 中成功完成了此操作,但是数据帧非常大,并且需要很长时间才能运行整个数据帧。为了提高效率,我想在 PySpark 中做同样的事情。我已成功读取 Spark 数据帧中的数据,但我很难确定如何使用 PySpark 等效函数复制 Pandas 函数。如何在 PySpark 中获得我想要的结果?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

我已使用以下代码将两列转换为 PySpark 中的数组

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)

现在我需要的只是使用“_”将数组的每个元素压缩到两列中。我该如何使用 zip 呢?任何帮助表示赞赏。


Spark SQL 与 Python 的等价物是pyspark.sql.functions.arrays_zip https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=arrays_zip#pyspark.sql.functions.arrays_zip:

pyspark.sql.functions.arrays_zip(*cols)

集合函数:返回一个合并的结构体数组,其中第 N 个结构体包含输入数组的所有第 N 个值。

因此,如果您已经有两个数组:

from pyspark.sql.functions import split

df = (spark
    .createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
    .toDF("column_1", "column_2")
    .withColumn("column_1", split("column_1", "\s*,\s*"))
    .withColumn("column_2", split("column_2", "\s*,\s*")))

您可以将其应用到结果上

from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
  "zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

现在要合并结果,您可以transform (如何使用变换高阶函数? https://stackoverflow.com/q/53761600/10465355, TypeError:列不可迭代 - 如何迭代 ArrayType()? https://stackoverflow.com/q/48993439/10465355):

df_zipped_concat = df_zipped.withColumn(
    "zipped_concat",
     expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 

df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+

Note:

高阶函数transform and arrays_zipApache Spark 2.4 中已引入。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Spark SQL 中压缩两个数组列 的相关文章

  • Pandas/Google BigQuery:架构不匹配导致上传失败

    我的谷歌表中的架构如下所示 price datetime DATETIME symbol STRING bid open FLOAT bid high FLOAT bid low FLOAT bid close FLOAT ask open
  • 跟踪 pypi 依赖项 - 谁在使用我的包

    无论如何 是否可以通过 pip 或 PyPi 来识别哪些项目 在 Pypi 上发布 可能正在使用我的包 也在 PyPi 上发布 我想确定每个包的用户群以及可能尝试积极与他们互动 预先感谢您的任何答案 即使我想做的事情是不可能的 这实际上是不
  • Python zmq SUB 套接字未接收 MQL5 Zmq PUB 套接字

    我正在尝试在 MQL5 中设置一个 PUB 套接字 并在 Python 中设置一个 SUB 套接字来接收消息 我在 MQL5 中有这个 include
  • 将 python2.7 与 Emacs 24.3 和 python-mode.el 一起使用

    我是 Emacs 新手 我正在尝试设置我的 python 环境 到目前为止 我已经了解到在 python 缓冲区中使用 python mode el C c C c将当前缓冲区的内容加载到交互式 python shell 中 显然使用了什么
  • 使用Python请求登录Google帐户

    在多个登录页面上 需要谷歌登录才能继续 我想用requestspython 中的库以便让我自己登录 通常这很容易使用requests库 但是我无法让它工作 我不确定这是否是由于 Google 做出的一些限制 也许我需要使用他们的 API 或
  • 使用字典映射数据帧索引

    为什么不df index map dict 工作就像df column name map dict 这是尝试使用index map的一个小例子 import pandas as pd df pd DataFrame one A 10 B 2
  • Pandas Merge (pd.merge) 如何设置索引和连接

    我有两个 pandas 数据框 dfLeft 和 dfRight 以日期作为索引 dfLeft cusip factorL date 2012 01 03 XXXX 4 5 2012 01 03 YYYY 6 2 2012 01 04 XX
  • 如何将张量流模型部署到azure ml工作台

    我在用Azure ML Workbench执行二元分类 到目前为止 一切正常 我有很好的准确性 我想将模型部署为用于推理的 Web 服务 我真的不知道从哪里开始 azure 提供了这个doc https learn microsoft co
  • 如何在 Python 中解析和比较 ISO 8601 持续时间? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在寻找一个 Python v2 库 它允许我解析和比较 ISO 8601 持续时间may处于不同单
  • 在Python中检索PostgreSQL数据库的新记录

    在数据库表中 第二列和第三列有数字 将会不断添加新行 每次 每当数据库表中添加新行时 python 都需要不断检查它们 当 sql 表中收到的新行数低于 105 时 python 应打印一条通知消息 警告 数量已降至 105 以下 另一方面
  • 如何使用python在一个文件中写入多行

    如果我知道要写多少行 我就知道如何将多行写入一个文件 但是 当我想写多行时 问题就出现了 但是 我不知道它们会是多少 我正在开发一个应用程序 它从网站上抓取并将结果的链接存储在文本文件中 但是 我们不知道它会回复多少行 我的代码现在如下 r
  • Numpy - 根据表示一维的坐标向量的条件替换数组中的值

    我有一个data多维数组 最后一个是距离 另一方面 我有距离向量r 例如 Data np ones 20 30 100 r np linspace 10 50 100 最后 我还有一个临界距离值列表 称为r0 使得 r0 shape Dat
  • Cython 和类的构造函数

    我对 Cython 使用默认构造函数有疑问 我的 C 类 Node 如下 Node h class Node public Node std cerr lt lt calling no arg constructor lt lt std e
  • Python3 在 DirectX 游戏中移动鼠标

    我正在尝试构建一个在 DirectX 游戏中执行一些操作的脚本 除了移动鼠标之外 我一切都正常 是否有任何可用的模块可以移动鼠标 适用于 Windows python 3 Thanks I used pynput https pypi or
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • 如何使用原始 SQL 查询实现搜索功能

    我正在创建一个由 CS50 的网络系列指导的应用程序 这要求我仅使用原始 SQL 查询而不是 ORM 我正在尝试创建一个搜索功能 用户可以在其中查找存储在数据库中的书籍列表 我希望他们能够查询 书籍 表中的 ISBN 标题 作者列 目前 它
  • Pandas 将多行列数据帧转换为单行多列数据帧

    我的数据框如下 code df Car measurements Before After amb temp 30 268212 26 627491 engine temp 41 812730 39 254255 engine eff 15
  • Python:XML 内所有标签名称中的字符串替换(将连字符替换为下划线)

    我有一个格式不太好的 XML 标签名称内有连字符 我想用下划线替换它 以便能够与 lxml objectify 一起使用 我想替换所有标签名称 包括嵌套的子标签 示例 XML
  • python import inside函数隐藏现有变量

    我在我正在处理的多子模块项目中遇到了一个奇怪的 UnboundLocalError 分配之前引用的局部变量 问题 并将其精简为这个片段 使用标准库中的日志记录模块 import logging def foo logging info fo
  • Pandas 每周计算重复值

    我有一个Dataframe包含按周分组的日期和 ID df date id 2022 02 07 1 3 5 4 2022 02 14 2 1 3 2022 02 21 9 10 1 2022 05 16 我想计算每周有多少 id 与上周重

随机推荐