如何在 SQL/Spark/GraphFrames 中进行此转换

2024-03-29

我有一个包含以下两列的表:

Device-Id    Account-Id
d1           a1   
d2           a1
d1           a2
d2           a3
d3           a4
d3           a5 
d4           a6
d1           a4

Device-Id 是安装我的应用程序的设备的唯一 ID,Account-Id 是用户帐户的 ID。用户可以拥有多个设备,并且可以在同一设备上创建多个帐户(例如,d1 设备设置了 a1、a2 和 a3 帐户)。

我想找到唯一的实际用户(应表示为一个新列,在生成的表中具有一些唯一的 UUID),并且我正在寻找的转换生成下表:

Unique-User-Id    Devices-Used    Accounts-Used
uuid1             [d1, d2, d3]    [a1, a2, a3, a4, a5]   
uuid2             [d4]            [a6]

上面生成的表背后的想法是,实际用户 uuid1 在其设备 d1 和 d2 上设置了一个帐户 a1,这实际上意味着这两个设备都属于 uuid 1,并且在这些 d1 和 d2 上设置了所有其他帐户设备也映射到同一用户 uuid1。同样,d1 也有一个帐户 a4,该帐户也在 d3 上设置,因此 d3 也是 uuid1 的设备,并且其上的每个帐户都应映射到 uuid1。

如何在 SQL/Spark/GraphFrames(由 DataBricks 提供)中实现上述转换,其中设备 ID 和帐户 ID 都可以以百万为单位?


我对这个解决方案并不感到自豪,因为我认为可能有一个更有效的解决方案,但无论如何我都会将其留在这里。希望能帮助到你

import org.apache.spark.sql.functions._

val flatten_distinct = (array_distinct _) compose (flatten _)

val df = Seq(
  ("d1","a1"),  
  ("d2","a1"),
  ("d1","a2"),
  ("d2","a3"),
  ("d3","a4"),
  ("d3","a5"),
  ("d4","a6")
).toDF("d_id","u_id")


val userDevices = df
  .groupBy("u_id")
  .agg(collect_list("d_id").alias("d_id_list"))

//+----+---------+
//|u_id|d_id_list|
//+----+---------+
//|  a5|     [d3]|
//|  a3|     [d2]|
//|  a4|     [d3]|
//|  a2|     [d1]|
//|  a1| [d1, d2]|
//|  a6|     [d4]|
//+----+---------+


val accountsByDevice = df
  .groupBy("d_id")
  .agg(collect_list("u_id").alias("u_id_list"))

//+----+---------+
//|d_id|u_id_list|
//+----+---------+
//|  d2| [a3, a1]|
//|  d3| [a4, a5]|
//|  d1| [a1, a2]|
//|  d4|     [a6]|
//+----+---------+


val ungroupedDf = userDevices
  .join(accountsByDevice, expr("array_contains(d_id_list,d_id)"))
  .groupBy("d_id_list")
  .agg(collect_set("u_id_list") as "set")
  .select(col("d_id_list") as "d_id", flatten_distinct(col("set")) as "u_id")
  .select(explode(col("d_id")) as "d_id", col("u_id"), size(col("u_id")) as "size")

//+----+------------+----+
//|d_id|        u_id|size|
//+----+------------+----+
//|  d2|    [a1, a3]|   2|
//|  d1|[a1, a3, a2]|   3|
//|  d2|[a1, a3, a2]|   3|
//|  d3|    [a4, a5]|   2|
//|  d1|    [a1, a2]|   2|
//|  d4|        [a6]|   1|
//+----+------------+----+


val finalDf = ungroupedDf
  .join(ungroupedDf.groupBy("d_id").agg(max(col("size")) as "size"), Seq("size","d_id"))
  .groupBy("u_id")
  .agg(collect_set("d_id") as "d_id")
  .withColumn("unique_id", monotonically_increasing_id())

//+------------+--------+-------------+
//|        u_id|    d_id|    unique_id|
//+------------+--------+-------------+
//|[a1, a2, a3]|[d1, d2]|1228360646656|
//|    [a4, a5]|    [d3]|1297080123392|
//|        [a6]|    [d4]|1520418422784|
//+------------+--------+-------------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 SQL/Spark/GraphFrames 中进行此转换 的相关文章

  • Spark - scala - 如何检查配置单元中是否存在表

    我必须使用 Spark 1 6 2 scala 检查配置单元中是否存在表 如果没有 我必须创建一个空数据框并将其保存为配置单元表 如果存在 则覆盖现有表 我需要一个返回布尔值的函数 基于该函数我可以做出上述决定 是否创建新表或覆盖现有表 1
  • 将数组文字传递给 PostgreSQL 函数

    我有一个包含 select 语句的 Postgres 函数 我需要使用包含字符串值数组的传入变量添加条件 CREATE OR REPLACE FUNCTION get questions vcode text RETURN return v
  • 如何从spark管道逻辑模型中提取变量权重?

    我目前正在尝试学习 Spark Pipeline Spark 1 6 0 我将数据集 训练和测试 导入为 oas sql DataFrame 对象 执行以下代码后 生成的模型是oas ml tuning CrossValidatorMode
  • 在 Oracle 中创建数据库链接时出错

    我有两个数据库 需要编写跨数据库查询 所以我试图创建一个数据库链接 CREATE PUBLIC DATABASE LINK DBLink CONNECT TO SchemaName IDENTIFIED BY 123 using DBNam
  • Oracle Many OR 与 IN () 的 SQL 性能调优 [重复]

    这个问题在这里已经有答案了 我手头没有 解释计划 您能帮忙判断以下哪一个更有效吗 选项1 select from VIEW ABC where STRING COL AA OR STRING COL BB OR STRING COL BB
  • ';'预期但发现“导入” - Scala 和 Spark

    我正在尝试使用 Spark 和 Scala 来编译一个独立的应用程序 我不知道为什么会收到此错误 topicModel scala 2 expected but import found error import org apache sp
  • 显示多个表的账户余额

    我有以下两个表 其中存储有关贷记和借记记录的信息 couponCr 表包含 voucherType voucherPrefix voucherNo crparty cramount SALES S 1 1 43000 SALES S 2 1
  • 如何将模型从 ML Pipeline 保存到 S3 或 HDFS?

    我正在尝试保存 ML Pipeline 生成的数千个模型 正如答案中所示here https stackoverflow com questions 32121046 run 3000 random forest models by gro
  • 单向关系和双向关系的区别

    我想知道这两个词是什么意思 我遇到他们是在教义的文档 http www doctrine project org documentation manual 2 0 en association mapping 但我不明白他们的意思 这与常见
  • 如何重命名 SQL Server 中名称中带有方括号的内容?

    我的一张桌子上有一列 周围有方括号 Book Category 我想重命名为Book Category 我尝试了以下查询 sp rename BookPublisher Book Category Book Category COLUMN
  • 如何从字符串列中提取数字?

    我的要求是从列中的评论列中检索订单号comment并且总是开始于R 订单号应作为新列添加到表中 输入数据 code id mode location status comment AS SD 101 Airways hyderabad D
  • 规范“毒”方式真的值得吗? (3NF)

    我正处于数据库设计的早期阶段 所以还没有最终的结果 并且我正在为具有可选标签的线程使用 TOXI 3表设计 但我忍不住觉得加入是并不是真的必要 也许我只需要依赖我的简单标签列posts我可以在其中存储类似 varchar 的表
  • 删除重复的 SQL 记录以允许唯一键

    我在 MYSQL 数据库中有一个表 销售 该表理应强制执行唯一约束以防止重复 事实证明 首先删除欺骗并设置约束有点棘手 表结构 简化 id 唯一 autoinc 产品编号 目标是强制product id 的唯一性 我想要应用的重复数据删除策
  • 使用子查询与 LEFT JOIN 一起选择 MAX 值

    我有一个获取搜索结果的查询 效果很好 查询成功示例 SELECT individuals individual id individuals unique id TIMESTAMPDIFF YEAR individuals day of b
  • 如何在数据库中对 (Java) 枚举进行建模(使用 SQL92)

    您好 我正在使用名为 性别 的列对实体进行建模 在应用程序代码中 性别应该是一个 Java 枚举类型 有 2 个值 男性和女性 知道作为数据类型的枚举不是通用 SQL 语言 92 的一部分 您将如何建模它 数据模型必须是可移植的 以便由多个
  • 安全转义表名/列名

    我在 php 中使用 PDO 因此无法使用准备好的语句转义表名或列名 以下是我自己实现它的万无一失的方法 tn str replace REQUEST tn column str replace REQUEST column sql SEL
  • Oracle 查询向上或向下舍入到最近的 15 分钟间隔

    08 SEP 20 08 55 05 08 SEP 20 15 36 13 下面的查询对于 15 36 13 可以正常工作 因为它四舍五入到 15 30 但 8 55 05 向下舍入到 08 45 而它应该四舍五入到 09 00 selec
  • 如何将 LEFT JOIN 限制为 SQL Server 中的第一个结果?

    我有一些 SQL 几乎可以做我想做的事情 我正在使用三个表 Users UserPhoneNumbers 和 UserPhoneNumberTypes 我正在尝试获取用户列表及其电话号码以供导出 数据库本身很旧并且存在一些完整性问题 我的问
  • 无法将方法组“Read”转换为非委托类型“bool”

    我正在尝试使用SqlDataReader检查条目是否存在 如果存在则返回ID 否则返回false 当我尝试编译时 出现错误 无法将方法组 Read 转换为非委托类型 bool 我一直在遵循在 VB 中找到的示例 但似乎翻译可能不正确 pri
  • 在单个查询中设置和选择?

    我想知道是否可以在单个查询中设置和选择 像这样的事情 SET LOCAL search path TO 1 SET LOCAL ROLE user SELECT from posts 你可以这样做 with some set as sele

随机推荐

  • 图表中的最小损坏成本

    给定一个图 G V E 其中有 N 个节点 编号从 0 到 N 1 并且恰好为 N 1 双向边缘 图中的每条边都有一个正成本 C u v 边缘权重 整个图是这样的任何一对节点之间都有唯一的路径 我认为改进的 Kruskal 是正确的选择 取
  • DateTimePicker 永远不会更新!

    我有一些DateTimePicker是一种永不更新的形式 我试过了Value and Text Invalidate 进而Update 并且Refresh 从当前日期开始 他们的价值观似乎没有任何改变 无论我设置什么 当前日期都是 相对 今
  • 自动登录其他网站的php脚本

    我想要一个脚本 通过它我可以登录其他网站而无需打开他们的登录页面 场景是这样的 我不想将登录 ID 和密码提供给他人 他们只需单击链接或按钮 脚本就会完成其工作 它将传递所需的登录 ID 和密码 并提交登录表单并登录 并且客户端拥有所有凭据
  • 如何使用单个bat文件传递动态参数来运行一个exe

    我需要运行一个 EXE 它将采用 7 个参数 其中一个参数是动态的 有人可以帮助我如何通过使用bat文件传递动态参数来运行EXE吗 谢谢 柴塔尼亚 如果您需要执行带有动态数量参数的命令 您可以使用 命令示例 foo exe options
  • C 变量名定义中使用下划线的原因是什么?

    我试图了解开发人员何时需要定义C前面带有 的变量 其原因何在 例如 uint32 t xyz 0 也许这有帮助 来自 C99 7 1 3 保留标识符 所有以下划线和大写字母或其他字母开头的标识符 下划线始终保留用于任何用途 所有以下划线开头
  • 如何解释 Google perf 工具 CPU 分析器中的地址

    我的 C 程序消耗大量 CPU 并且在运行时消耗更多 我使用 Google 性能工具来分析 CPU 使用情况 这就是我得到的结果 pprof top Total 1343 samples 1330 99 0 99 0 1330 99 0 0
  • 是否可以让ScrollView滚动到底部?

    对于一个类似聊天的应用程序 我想保留一个ScrollView组件滚动到底部 因为最新消息出现在旧消息下方 我们可以调整a的滚动位置吗ScrollView For React Native 0 41 及更高版本 您可以使用内置的scrollT
  • 如何获取 Spring Boot 应用程序建立的活动数据库连接数

    我已经使用 Oracle 数据源构建了一个 Spring Boot 应用程序 我需要在日志语句中打印总活动连接 如何获取活动连接 注意 不是最大活动连接数 它应该是 特定时间 实例的活动连接数 DataSource dataSource D
  • iOS 金属线宽

    我想设置我在金属中绘制的线条的宽度 我可以设置一个点的大小point size正如所解释的here https developer apple com library prerelease ios documentation Metal R
  • node js 从 URL 获取 Zip 并上传到 Google 云端硬盘

    我正在尝试从网址获取 zip 文件 以便在下一步中将其上传到 Google 云端硬盘 但我的代码不起作用 The method to get the zip File from the url function getFile var fi
  • python Tkinter() 如何隐藏 UI

    嗨 我正在使用 Tkinter 在 python 中开发单个登录 我只希望当用户正确登录时 登录 UI 将被隐藏 内容 UI 将显示 所以我认为 ui 可以隐藏或者可见性将被隐藏 例如我有这个代码 def httpGETCheck user
  • Magento - 重新索引过程存在问题 - 目录产品 [关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 我刚刚重新索引了运行 v1 6 的 Magenot 安装上的数据 现在我收到一条消息 指出 There was a pr
  • 图像/视频序列中的模糊检测

    我在 photo stackexchange 上问过这个问题 但认为它也可能在这里相关 因为我想在我的实现中以编程方式实现这个问题 我正在尝试为我的成像 管道实现模糊检测算法 我想要检测的模糊是 1 相机抖动 当快门速度较低时 用手移动 抖
  • Oracle 12c - 删除表和所有关联的分区

    我在 Oracle 12c 中创建了表 t1 表有数据 它在列表分区上分区 并且还有子分区 现在我想删除整个表和所有关联的分区 和子分区 这是删除全部的正确命令吗 DROP TABLE t1 PURGE 语法是正确的 但不是优选的 只需删除
  • 将字符串标记为 HTML 安全

    我正在尝试构建我的第一个 Rails 应用程序 并且正在使用瑞恩 希思 Ryan Heath 的 navigation helper https github com rpheath navigation helper插件为我提供导航中的当
  • VSCode python 扩展:如何禁用插入导入语句的自动完成功能?

    在 VS Code 的 Python 扩展中 我有时发现自动完成功能可以包含尚未导入到我正在编辑的文件中的选项 当选择这些选项之一时 导入有时会在没有通知的情况下插入到模块的顶部 虽然我可以看到此功能中的实用程序 但我不太喜欢这种行为 因为
  • x86 区分指令和数据的方法

    是否有一种或多或少可靠的方法来判断内存中某个位置的数据是处理器指令的开头还是其他数据 例如 E8 3F BD 6A 00 may be call操作说明 E8 相对偏移量为0x6ABD3F 或者它可能是属于其他指令的三个字节的数据 后跟pu
  • 如何在 Http 响应完成之前读取响应流

    当使用 HttpWebRequest 对象发出请求时 我需要调用方法 GetResponse 来发送请求并获取响应 此方法的问题是 在收到所有数据之前 它不会返回响应对象 假设我正在下载一个 100 MB 的文件 在响应完成并且所有 100
  • Java - 嵌套内嵌套的Gson解析

    我必须与 API 进行交互 并且响应格式 根据我所读到的 似乎结构很差 我发现谷歌小组回复了一个有点类似的问题here http groups google com group google gson browse thread threa
  • 如何在 SQL/Spark/GraphFrames 中进行此转换

    我有一个包含以下两列的表 Device Id Account Id d1 a1 d2 a1 d1 a2 d2 a3 d3 a4 d3 a5 d4 a6 d1 a4 Device Id 是安装我的应用程序的设备的唯一 ID Account I