PySpark - RDD 到 JSON

2024-05-30

我有一个 Hive 查询返回以下格式的数据:

ip, category, score
1.2.3.4, X, 5
10.10.10.10, A, 2
1.2.3.4, Y, 2
12.12.12.12, G, 10
1.2.3.4, Z, 9
10.10.10.10, X, 3

在 PySpark 中,我通过以下方式得到这个hive_context.sql(my_query).rdd

每个 IP 地址可以有多个分数(因此有多行)。我想以 json/array 格式获取此数据,如下所示:

{
    "ip": "1.2.3.4",
    "scores": [
        {
            "category": "X",
             "score": 10
        },
        {
            "category": "Y",
             "score": 2
        },
        {
            "category": "Z",
             "score": 9
        },
    ],
    "ip": "10.10.10.10",
    "scores": [
        {
            "category": "A",
             "score": 2
        },
        {
            "category": "X",
             "score": 3
        },
    ],
     "ip": "12.12.12.12",
    "scores": [
        {
            "category": "G",
             "score": 10
        },
    ],
}

请注意,RDD 不一定已排序,并且 RDD 可以轻松包含数亿行。我是 PySpark 的新手,因此任何有关如何有效解决此问题的指示都会有所帮助。


groupBy ip然后将分组的 RDD 转换为您需要的:

rdd.groupBy(lambda r: r.ip).map(
  lambda g: {
    'ip': g[0], 
    'scores': [{'category': x['category'], 'score': x['score']} for x in g[1]]}
).collect()

# [{'ip': '1.2.3.4', 'scores': [{'category': 'X', 'score': 5}, {'category': 'Y', 'score': 2}, {'category': 'Z', 'score': 9}]}, {'ip': '12.12.12.12', 'scores': [{'category': 'G', 'score': 10}]}, {'ip': '10.10.10.10', 'scores': [{'category': 'A', 'score': 2}, {'category': 'X', 'score': 3}]}]
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark - RDD 到 JSON 的相关文章

  • 在 postgresql 中,如何在 jsonb 键上返回布尔值而不是字符串?

    在下面的查询中 isComplete 和 isValid 以字符串形式返回 但是 它们被保存为布尔值 如何获取要返回的这些字段的布尔表示形式 query SELECT data gt gt id AS id data gt gt name
  • 如何在Java中扩展数组而不更改其名称

    我想知道是否可以在 Java 中扩展数组而不更改其名称 因为我有多个方法链接到该数组 我正在考虑创建一个同名但两倍大的新数组 然后将第一个数组中的所有元素复制到第二个数组 这可能吗 基本上我想创建一个包含银行账户的数组 如果客户创建了太多账
  • JSON 到数据表

    我正在尝试将 JSON 文本序列化为 DataTable 如下所示 List
  • 在 Pandas UDF PySpark 中传递多列

    我想计算 PySpark DataFrame 两列之间的 Jaro Winkler 距离 Jaro Winkler 距离可通过所有节点上的 pyjarowinkler 包获得 pyjarowinkler 的工作原理如下 from pyjar
  • C 中每 N 个元素中出现次数最多的元素

    我有一个大小为 0 8388608 的大数组 A 其中包含 相对较小 的整数 A i 0 131072 我想找到每个 N 32 个元素中最常出现的元素 什么会更快 A 创建一个大小为131072的关联数组B 迭代32个元素 递增B A i
  • JavaScript 预分配数组未捕获 RangeError:数组长度无效

    我有一个小循环的代码 它抛出 Uncaught RangeError Invalid Array Length 我能够在 Google Chrome 控制台中重现它 const COUNT 100 000 000 const xValues
  • 未捕获(承诺中)DOMException:注册失败 - 清单为空或丢失

    我正在尝试使用 Polymer 实现推送通知 我跟着这个链接 https developers google com web fundamentals getting started push notifications 并能够让服务人员注
  • 如何将 JSON 对象解析为 TypeScript 对象

    我目前正在尝试将收到的 JSON 对象转换为具有相同属性的 TypeScript 类 但无法使其工作 我究竟做错了什么 员工阶层 export class Employee firstname string lastname string
  • Jackson - 将值传递给 JsonDeserializer

    我有一个现有的类层次结构 如下所示 public interface Service String getId String getName public class FooTask extends AbstractTask private
  • 如何使用 CloudFormation 覆盖容器环境变量来运行 AWS ECS 任务

    我正在寻找一种运行 ecs 任务的方法 我已经有了集群和任务定义设置 我只是想使用 CloudFormation 模板触发任务 我知道我可以通过单击控制台来运行任务并且它工作正常 对于 cfn 需要正确定义方法 检查所附的屏幕截图 我想使用
  • 找到最长的连续数字序列

    问题 H 最长自然后继者 如果自然数序列中第二个整数是第一个整数的后继 1 和 2 是自然后继 则两个连续整数是自然后继 编写一个程序 读取一个数字 N 后跟 N 个整数 然后打印连续自然后继的最长序列的长度 Example 输入 7 2
  • 将 numpy 数组转换为 numpy 数组的数组

    如何转换 numpy 数组a到 numpy 数组b以 num Pythonic的方式 理想情况下 解决方案应该适用于任意维度和数组长度 import numpy as np a np arange 12 reshape 2 3 2 b np
  • 如何将具有对象数据类型的 Numpy 2D 数组转换为常规的浮点数 2D 数组

    作为我正在开发的更广泛程序的一部分 我最终得到了包含字符串 3D 坐标等的对象数组 所有这些都混合在一起 我知道与结构化数组相比 对象数组可能不是很受欢迎 但我希望在不更改大量代码的情况下解决这个问题 假设我的数组 obj array 有
  • 如何使用 C# 在 .NET 中获取格式化 JSON?

    我正在使用 NET JSON 解析器 并希望序列化我的配置文件 以便它可读 所以而不是 blah v blah2 v2 我想要一些更好的东西 比如 blah v blah2 v2 我的代码是这样的 using System Web Scri
  • Mysql - 如何比较两个 Json 对象?

    将整个 MySql json 列与 json 对象进行比较的语法是什么 以下不起作用 select count criteria from my alerts where criteria industries 1 locations 1
  • mongoose 查询:通过 id 在数组中查找对象

    我怎样才能在此 Schema 中通过 id 找到图像 我有用户的 id 和我正在寻找的图像的 id 执行此操作的最佳方法是什么 在这种情况下 所有图像是否具有不同的 id 或者它们是否可以具有相同的 id 因为它们不属于同一用户 我的架构如
  • $.parseJSON() 在有效对象上返回 null

    jsfiddle 链接 http jsfiddle net YmUmp 1 var x Item1 1 Item2 Item3 3 alert JSON stringify x undefined 2 alert parseJSON x 第
  • 加密json数据

    如何加密从客户端到服务器来回传输的 JSON 数据 当我使用firebug时 我可以看到所有数据 内容在result d 我正在使用 ASP NET 3 5 和 C 我们的管理员已将网站设置为使用 https 但我仍然可以发送 POST 请
  • 如何在 Laravel 5 中处理嵌套的 JSON 对象请求?

    我们在 Laravel 5 和 AngularJs Ionic 中运行此 Web 服务来处理 Web 当我们将请求从 Web 客户端 发送到 Web 服务 后端 时 我们传递了嵌套的 JSON 对象 我们在读取服务器端父对象下的所有子对象时
  • 将压缩的json数据存储在本地存储中

    我想将 JSON 数据存储在本地存储中 有时存储的数据可能超过 5MB 每个域的浏览器允许的最大阈值 无论如何 我可以压缩或压缩数据并将其存储在本地存储中吗 如果对大数据进行每个 JS 函数的压缩和解压 会增加多少延迟 我正在使用这个 js

随机推荐

  • Python:处理图像并保存到文件流

    我需要使用 python 处理图像 应用过滤器和其他转换 然后使用 HTTP 将其提供给用户 现在 我正在使用 BaseHTTPServer 和 PIL 问题是 PIL 无法直接写入文件流 因此我必须写入临时文件 然后读取该文件 以便将其发
  • 为什么 Firefox 关闭空 html 标签?

    我注意到在 Firefox 中 当我查看源代码时 它会向空标签项添加结束标签 例如 hr and img src image jpg 在 Firefox 中查看源代码我明白了 hr and img src image jpg 该文档是 HT
  • iText7 将 SVG 添加到 PdfDocument 中以及可能出现的问题

    关于问题的答案 如何使用 iText7 将 SVG 添加到 PDF 这是一个链接点击这里 https stackoverflow com questions 50059456 how to add an svg to a pdf using
  • Python 模块 BeautifulSoup 提取锚点 href

    我正在使用 BeautifulSoup 模块通过以下方式从 html 选择所有 href def extract links html soup BeautifulSoup html anchors soup findAll a print
  • Angular 2发送数组另一页

    我正在使用 Angular 开发天气应用程序 我是 Angular 的新手 我想带上我选择的城市的天气信息 但我无法将数据发送到第二页 哪里有问题 预先感谢您的帮助 export class ForecastComponent implem
  • 有没有办法提高linux管道的性能?

    我正在尝试使用 64 位将超高速数据从一个应用程序传输到另一个应用程序CentOS http en wikipedia org wiki CentOS6 我使用以下方法进行了基准测试dd发现阻碍我的是管道而不是程序中的算法 我的目标是达到
  • 将 Angularjs 动态绑定到新创建的 html 元素

    我有一个带有多个选项卡的选项卡页面 一旦单击调用服务即可返回一些数据 其中一些数据返回 html 表单并且非常随机 我想收集输入的这些值并通过服务将数据发送回服务器 我遇到的问题是我无法从我动态创建的 html 中的输入元素获取数据 我创建
  • 如何停止 PythonShell

    如何终止 停止 Node js 中 PythonShell 执行的 Python 脚本的执行 我在交互模式下运行 输出通过 socket io 发送到给定的房间 如果没有更多的客户端连接到这个房间 我想停止 python 脚本的执行 这是我
  • 私有属性,但却是一个神秘的领域

    我想将属性设为私有 但带有 pydantic 字段 from pydantic import BaseModel Field PrivateAttr validator class A BaseModel a str I want a py
  • 如何使用 Kestrel 托管 Angular 应用程序

    我正在尝试部署一个Angular 7申请与 NET Core using Kestrel使用FileProvider扩大 我已经创建了角度应用程序 我有ng build它和我复制了里面的文件dist NET项目 Startup public
  • python 硒 按名称查找元素

    查找电子邮件输入的正确代码是什么https accounts google com ServiceLogin html 是
  • 等待视图通过 IdleResource 变得可见

    我正在使用 Espresso 2 2 编写仪器测试 我想测试的流程 测试点击的单选按钮 onClick 向 API 发起请求 每次不同时间我收到回复后 积极响应触发活动中调用的接口方法 onRequestSuccess 我正在屏幕上显示名为
  • 为什么 b = (b - x) & x 会得到下一个子集?

    The 有竞争力的程序员手册 https cses fi book book pdf第 99 页建议使用以下方法来遍历集合的所有子集x 集合位代表集合中的数字 int b 0 do Process subset b while b b x
  • 如何在不同的班级中启动和停止计时器?

    我想测量从传入 HTTP 请求开始到应用程序到达某个点的时间 这两个时间点都位于不同的类中 我将如何启动和停止这些不同类别的计时器 我没有看到使用 MeterRegistry 中的 命名 计时器的方法 我该怎么办呢 您可以使用 AOP 如下
  • 具有多个 Angular 2 应用程序的 ASP.Net Core MVC [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我正在尝试为一个我知道会变得越来越复杂的项目准备结构 我想使用 ASP Net Core MVC 进行顶层导航 我想在每个主视图中放置
  • Xcode 和 Waze 集成

    我正在尝试整合我的app with waze http www waze com 有人知道如何调用位智并发送坐标吗 我没有找到任何 API 或其他相关信息 void navigateToLatitude double latitude lo
  • 按歌曲获取封面图片

    是否可以按歌曲而不是按专辑获取封面图片 因为我有一张自编的歌曲专辑 而且它们都有不同的封面图片 但是当我想查询它们时 我总是得到相同的图片 String ARG STRING MediaStore Audio Media ALBUM ID
  • 可以转换为 Swift 5

    我在 Xcode 10 2 中收到此警告 可以转换为 Swift 5 当我单击此错误时 它会打开此窗口 当我们点击Next会发生什么 swift 的当前版本是 swift5 仅在 Xcode 10 2 中受支持 而您在 Xcode 10 2
  • 如何实现可变虚拟成员函数

    所以我有这个功能 virtual void CallRemoteFunction const char pServerGameObjectId const char pFunctionName OVariant arg1 OVariant
  • PySpark - RDD 到 JSON

    我有一个 Hive 查询返回以下格式的数据 ip category score 1 2 3 4 X 5 10 10 10 10 A 2 1 2 3 4 Y 2 12 12 12 12 G 10 1 2 3 4 Z 9 10 10 10 10