Pyspark 将多个列合并为一个 json 列

2024-05-01

我不久前问过 python 的问题,但现在我需要在 PySpark 中做同样的事情。

我有一个像这样的数据框(df):

|cust_id|address    |store_id|email        |sales_channel|category|
-------------------------------------------------------------------
|1234567|123 Main St|10SjtT  |[email protected] /cdn-cgi/l/email-protection|ecom         |direct  |
|4567345|345 Main St|10SjtT  |[email protected] /cdn-cgi/l/email-protection|instore      |direct  |
|1569457|876 Main St|51FstT  |[email protected] /cdn-cgi/l/email-protection|ecom         |direct  |

我想将最后 4 个字段合并到一个 json 元数据字段中,如下所示:

|cust_id|address    |metadata                                                                                     |
-------------------------------------------------------------------------------------------------------------------
|1234567|123 Main St|{'store_id':'10SjtT', 'email':'[email protected] /cdn-cgi/l/email-protection','sales_channel':'ecom', 'category':'direct'}   |
|4567345|345 Main St|{'store_id':'10SjtT', 'email':'[email protected] /cdn-cgi/l/email-protection','sales_channel':'instore', 'category':'direct'}|
|1569457|876 Main St|{'store_id':'51FstT', 'email':'[email protected] /cdn-cgi/l/email-protection','sales_channel':'ecom', 'category':'direct'}   |

这是我在 python 中用来执行此操作的代码:

cols = [
    'store_id',
    'store_category',
    'sales_channel',
    'email'
]

df1 = df.copy()
df1['metadata'] = df1[cols].to_dict(orient='records')
df1 = df1.drop(columns=cols)

但我想将其转换为 PySpark 代码以使用 Spark 数据框;我不想在 Spark 中使用 pandas。


Use to_json创建 json 对象的函数!

Example:

from pyspark.sql.functions import *

#sample data
df=spark.createDataFrame([('1234567','123 Main St','10SjtT','[email protected] /cdn-cgi/l/email-protection','ecom','direct')],['cust_id','address','store_id','email','sales_channel','category'])

df.select("cust_id","address",to_json(struct("store_id","category","sales_channel","email")).alias("metadata")).show(10,False)

#result
+-------+-----------+----------------------------------------------------------------------------------------+
|cust_id|address    |metadata                                                                                |
+-------+-----------+----------------------------------------------------------------------------------------+
|1234567|123 Main St|{"store_id":"10SjtT","category":"direct","sales_channel":"ecom","email":"[email protected] /cdn-cgi/l/email-protection"}|
+-------+-----------+----------------------------------------------------------------------------------------+

to_json by passing list of columns:

ll=['store_id','email','sales_channel','category']

df.withColumn("metadata", to_json(struct([x for x in ll]))).drop(*ll).show()

#result
+-------+-----------+----------------------------------------------------------------------------------------+
|cust_id|address    |metadata                                                                                |
+-------+-----------+----------------------------------------------------------------------------------------+
|1234567|123 Main St|{"store_id":"10SjtT","email":"[email protected] /cdn-cgi/l/email-protection","sales_channel":"ecom","category":"direct"}|
+-------+-----------+----------------------------------------------------------------------------------------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pyspark 将多个列合并为一个 json 列 的相关文章

  • TensorFlow:带有轴选项的 bincount

    在 TensorFlow 中 我可以使用 tf bincount 获取数组中每个元素的计数 x tf placeholder tf int32 None freq tf bincount x tf Session run freq feed
  • 在Python中如何获取字典的部分视图?

    是否有可能获得部分视图dict在Python中类似于pandasdf tail df head 说你有很长一段时间dict 而您只想检查某些元素 开头 结尾等 dict 就像是 dict head 3 To see the first 3
  • 无故运行测试时 PyCharm 抛出“AttributeError: 'module' object has no attribute”

    因此 我有一个 Django REST Framework 项目 有一天它无法在 PyCharm 中运行测试 从命令行我可以使用它们来运行它们paver or the manage py直接地 曾经有一段时间 当我们没有在文件顶部导入类的超
  • 将二维数组放入 Pandas 系列中

    我有一个 2D Numpy 数组 我想将其放入 pandas 系列 而不是 DataFrame 中 gt gt gt import pandas as pd gt gt gt import numpy as np gt gt gt a np
  • 如何在VIM中设置文件的正确路径?

    每当我击中 pwd在 vim 中命令总是返回路径C Windows system32 即使我在桌面上的 Python 文件中 所以每当我跑步时 python 命令返回 python can t open file Users myname
  • 根据 Pandas 中的列表对多列进行排序

    感谢有关如何根据 pandas 中的倍数列表对给定多列进行排序的任何提示 如下所示 import pandas as pd sort a a d e sort b s1 s3 s6 sort c t1 t2 t3 df pd DataFra
  • 如何使用 Bokeh 动态隐藏字形和图例项

    我正在尝试在散景中实现复选框 其中每个复选框应显示 隐藏与其关联的行 我知道可以通过图例来实现这一点 但我希望这种效果同时在两个图中发生 此外 图例也应该更新 在下面的示例中 出现了复选框 但不执行任何操作 我显然不明白如何更新用作源的数据
  • 如何从 JSON 响应重定向?

    所以我尝试使用 Flask 和 Javascript 上传器 Dropzone 上传文件并在上传完成后重定向 文件上传正常 但在烧瓶中使用传统的重定向 return redirect http somesite com 不执行任何操作 页面
  • 如何在 Django 中使用基于类的视图创建注册视图?

    当我开始使用 Django 时 我几乎使用 FBV 基于函数的视图 来处理所有事情 包括注册新用户 但当我更深入地研究项目时 我意识到基于类的视图通常更适合大型项目 因为它们更干净且可维护 但这并不是说 FBV 不是 无论如何 我将整个项目
  • Python、subprocess、call()、check_call 和 returncode 来查找命令是否存在

    我已经弄清楚如何使用 call 让我的 python 脚本运行命令 import subprocess mycommandline lumberjack sleep all night work all day subprocess cal
  • Pandas 堆积条形图中元素的排序

    我正在尝试绘制有关某个地区 5 个地区的家庭在特定行业赚取的收入比例的信息 我使用 groupby 按地区对数据框中的信息进行排序 df df orig groupby District Portion of income value co
  • 使用 Conda 更新特定模块会删除大量软件包

    我最近开始使用 Anaconda Python 发行版 因为它提供了许多开箱即用的数据分析库 使用 conda 创建环境和安装软件包也轻而易举 但是当我想更新 Python 本身或任何其他模块时 我遇到了一些严重的问题 我事先被告知我的很多
  • 为什么我应该使用 WSGI?

    使用 mod python 一段时间了 我读了越来越多关于 WSGI 有多好的文章 但没有真正理解为什么 那么我为什么要切换到它呢 有什么好处 这很难吗 学习曲线值得吗 为了用 Python 开发复杂的 Web 应用程序 您可能会使用更全面
  • `pyqt5'错误`元数据生成失败`

    我正在尝试安装pyqt5使用带有 M1 芯片和 Python 3 9 12 的 mac 操作系统 我怀疑M1芯片可能是原因 我收到一个错误metadata generation failed 最小工作示例 directly in the t
  • Python对象初始化性能

    我只是做了一些快速的性能测试 我注意到一般情况下初始化列表比显式初始化列表慢大约四到六倍 这些可能是错误的术语 我不确定这里的行话 例如 gt gt gt import timeit gt gt gt print timeit timeit
  • 导入错误:无法导入名称“时间戳”

    我使用以下代码在 python 3 6 3 中成功安装了 ggplot conda install c conda forge ggplot 但是当我使用下面的代码将其导入笔记本时 出现错误 from ggplot import Impor
  • 如何绘制堆积比例图?

    我有一个数据框 x lt data frame id letters 1 3 val0 1 3 val1 4 6 val2 7 9 id val0 val1 val2 1 a 1 4 7 2 b 2 5 8 3 c 3 6 9 我想绘制一个
  • 通过 Web 界面执行 python 单元测试

    是否可以通过 Web 界面执行单元测试 如果可以 如何执行 EDIT 现在我想要结果 对于测试 我希望它们是自动化的 可能每次我对代码进行更改时 抱歉我忘了说得更清楚 EDIT 这个答案此时已经过时了 Use Jenkins https j
  • 如何使用Python保存“完整的网页”而不仅仅是基本的html

    我正在使用以下代码来使用 Python 保存网页 import urllib import sys from bs4 import BeautifulSoup url http www vodafone de privat tarife r
  • 使用 Python 将对象列表转为 JSON

    我在转换时遇到问题Object实例到 JSON ob Object list name scaping myObj base url u number page for ob in list name json string json du

随机推荐

  • 获取正在运行的程序的属性

    我想开发一个程序 其 ID 是一张牌 因为它在另一个正在运行的程序 例如扑克或红心游戏或其他程序 中播放 我首先尝试获取有关已运行的游戏程序的所需信息 但我从一开始就遇到了问题 我正在运行 MSVC 2013 并开发 MFC 应用程序 现在
  • Safari 中的日期无效

    alert new Date 2010 11 29 chrome ff 对此没有问题 但 safari 会喊 无效日期 为什么 编辑 好的 根据下面的评论 我使用了字符串解析并尝试了这个 alert new Date 11 29 2010
  • 如何使用speakhere示例中的AQRecorder

    我已经从 talkHere 示例中复制了 AQRecorder 以及所有其他所需的文件 这样做之后 由于编译错误 我将链中使用它的所有类重命名为 mm 现在似乎已解决 但是我仍然不知道如何使用 AQRecorder 类 该示例中还有另一个名
  • Java 8 不兼容类型

    这是简单的代码 import java util ArrayList import java util Collections import java util HashMap import java util Map public cla
  • 我们如何从团队项目中的所有 Git 存储库连接到 TFS?

    我正在运行最新的 Visual Studio 2013 Ultimate 和 Update 1 我们也有最新的 Team Foundation Server 2013 我们中的一些人对新的 Git 存储库集成感到非常兴奋 但似乎有一个相当大
  • 使用 Java 中的映射实现的队列数据结构,大小限制为 5

    我有带有一些记录的地图 我想将该映射限制为仅 5 个元素 并且每当添加新元素时 应删除第一个元素 并应在映射的最后位置添加新元素 类似于 FIFO 的东西 任何人都可以建议我使用一个数据结构或解决方案本身 E g Map
  • 苹果拒绝应用程序,因为它在未经用户许可的情况下传输 MAC 地址

    我们最近开发的一款应用程序被苹果拒绝了 这是他们的解释 我们发现您的应用在收集数据之前未征得用户同意 根据应用程序商店审查指南的要求 用户的个人数据 具体来说 您的应用程序会发送设备的 MAC 地址 而无需 用户的许可 您的应用程序还会发送
  • 检测 Chrome 打包应用程序中的底层操作系统

    既然打包的应用程序可以在不同的平台上运行 那么有没有办法区分Windows和Mac呢 I would like to show some help vis vis keyboard shortcuts and being able to d
  • SSE (EventSource) 在 1 小时 22 分钟后超时。有什么办法可以让它持续下去吗?

    我的页面中有一个区域 当数据库发生更改时 消息将发送到该区域 现在 有时数据库会发生很大的变化 以至于每 10 分钟就会显示一条新消息 其他日子它只会改变几次 我遇到的问题是 EventSource 似乎在 1 小时 22 分钟后超时 浏览
  • 分享图片在 Viber 和 Facebook 中不起作用

    我使用下面的代码来共享图像 但不幸的是它仅适用于Line 不是为了Facebook and Viber Code Intent share new Intent android content Intent ACTION SEND shar
  • 从类详细信息 Visual Studio 导出

    我发现在 Visual Studio 中我们可以通过创建类图 添加类和打开类详细信息轻松地将摘要添加到代码中 现在我想知道是否可以将其导出到 Excel 文件中 检查 MD对此答案的评论https stackoverflow com a 3
  • 带汇总总计和小计

    我有一个脚本可以生成几乎已经存在的结果集 我正在尝试获取小计和总计 我在年份栏中得到了小计 在最后得到了总计 我的目标是让最终结果显示 总计 而不是小计 请注意 由于汇总函数 我的最后一行 位置 也返回为空 SELECT YEAR COUN
  • 从特定的 setup.py 进行 pip 安装

    我在 RedHat 的 Openshift 云服务上创建了一个 python 3 3 应用程序 默认情况下 它有我的项目的 setup py 我正在学习名为 使用 Flask 构建 SaaS 应用程序 的 Udemy 课程 源代码 http
  • 如何将数据动态分配给jqGrid?

    这是我创建 jqGrid 的代码 ptDataGrid jqGrid datatype local data arrSpecData colModel colmod rowNum 10 rowList 10 pager ptPager gr
  • 如何在 XAML 中定义变量?

    我在 XAML 中有以下两个按钮
  • svn 1.7 错误 E200009 无法添加所有目标,因为某些目标已经版本化

    我对存储库进行了彻底的检查 然后每天我都有一个 hudson 工作来运行脚本来备份配置 脚本的一部分是添加 xml svn add xml svn warning W150002 data hudson config xml is alre
  • difflib python 格式化

    我使用此代码来查找两个 csv 列表之间的差异并提出一些格式问题 这可能是一个简单的解决方法 但我是新手 正在尝试学习 但遇到了很多问题 import difflib diff difflib ndiff open test1 csv rb
  • 检测碰撞方向

    一块方形瓷砖与另一块方形瓷砖碰撞 调酒师说 I have 两个图块的高度 宽度 x 和 y 引起碰撞的运动的 2D 矢量 我需要知道碰撞发生在哪一侧 例如顶部 底部 左侧 右侧 以便适当地重置位置 我会给任何能回答这个问题的人一个心理饼干
  • 用于阻止大于 20MB 的提交的预提交挂钩

    是否可以为 SVN 1 8 编写 prcommit 挂钩以避免提交大于 20MB 的文件 任何建议 将不胜感激 谢谢 我尝试过 但这不适用于二进制文件或其他文件扩展名 filesize SVNLOOK cat t TXN REPOS f w
  • Pyspark 将多个列合并为一个 json 列

    我不久前问过 python 的问题 但现在我需要在 PySpark 中做同样的事情 我有一个像这样的数据框 df cust id address store id email sales channel category 1234567 1