组合 Spark Dataframe 中链接在一起的行

2024-01-22

我有一个数据框,其行通过各种合并相互连接。 到目前为止,我已经将 DF 转换为下面的格式,我在其中执行了 groupBy“Merge_To”并将它们收集到一个数组中,然后将其连接回我的原始 DF。看起来像这样:

df1
+---+--------+---------+
|Ref|Merge_To|   Merges|
+---+--------+---------+
|  1|      \N|[3, 2, 3]|
|  2|       1|[5, 4, 6]|
|  5|       2|   [8, 7]|
| 10|      \N|   [9, 9]|
| 12|      \N|     [13]|
| 14|      \N|     [15]|
| 16|      18|     [17]|
| 17|      16|     [19]|
| 18|      \N|     [16]|
| 19|      17|     [20]|
+---+--------+---------+ 

对于参考文献 1、2、5 和 18、16、17、19、20,它们通过一条链合并在一起。这不是通过我之前完成的 groupBy 捕获的。 最终我希望我的 DF 看起来像这样,它解释了合并链:

+---+--------+------------------------+
|Ref|Merge_To|                  Merges|
+---+--------+------------------------+
|  1|      \N|[3, 2, 3, 5, 4, 6, 8, 7]|
| 10|      \N|                  [9, 9]|
| 12|      \N|                    [13]|
| 14|      \N|                    [15]|
| 18|      \N|        [16, 17, 19, 20]|
+---+--------+------------------------+

当“Merge_To”不是 \N 时,我尝试将 df1 连接到自身过滤

val arrayCombineUDF = udf((a:Seq[String], b:Seq[String]) => a ++ b )

val df1Filter = df1.filter($"Merge_To" !== "\\N").
select("Merge_To", "Merges").withColumnRenamed("Merge_To", "Chain_Ref").
withColumnRenamed("Merges", "Chain_Merges")

val df2 = df1.join(df1Filter, $"Ref" === $"Chain_Ref", "left").
withColumn("Merges", when($"Chain_Merges".isNotNull, arrayCombineUDF($"Merges", $"Chain_Merges")).
otherwise($"Merges")).
select("Ref", "Merge_To", "Merges")

df2
+---+--------+----------+------------------+
|Ref|Merge_To|Merge_From|            Merges|
+---+--------+----------+------------------+
|  1|      \N|         3|[3, 2, 3, 5, 4, 6]|
|  2|       1|        \N|   [5, 4, 6, 8, 7]|
|  5|       2|        \N|            [8, 7]|
| 10|      \N|         9|            [9, 9]|
| 12|      \N|        13|              [13]|
| 14|      \N|        \N|              [15]|
| 16|      18|        \N|          [17, 19]|
| 17|      16|        \N|          [19, 20]|
| 18|      \N|        \N|          [16, 17]|
| 19|      17|        \N|              [20]|
+---+--------+----------+------------------+

这种给出了我正在寻找的结果,但只真正说明了合并链的一层。

我还尝试将与上面相同的连接过程放入 while 循环中,试图让它重复连接。 我还尝试将 UDF 与 If 语句一起使用,希望我可以将每一行整理为合并类型,并使用它组合作为链的行。

注意:我知道数组并不不同,但我不介意,并且可以在最后对其进行排序。

EDIT这是原来的DF

+---+--------+----------+
|Ref|Merge_To|Merge_From|
+---+--------+----------+
|  1|      \N|         3|
|  2|       1|        \N|
|  3|       1|        \N|
|  4|       2|        \N|
|  5|       2|        \N|
|  6|       2|        \N|
|  7|       5|        \N|
|  8|       5|        \N|
|  9|      10|        \N|
| 10|      \N|         9|
| 11|      \N|        \N|
| 12|      \N|        13|
| 13|      \N|        \N|
| 14|      \N|        \N|
| 15|      14|        \N|
| 16|      18|        \N|
| 17|      16|        \N|
| 18|      \N|        \N|
| 19|      17|        \N|
| 20|      19|        \N|
+---+--------+----------+

11 号条目似乎擅离职守了。反正。

查看您的基础数据,这是一个分层查询,可以 在传统的 RDBMS 中解决,具有良好的功能,例如连接方式 大多数 RDBMS 中的子句或递归 WITH 视图。

您所做的这次尝试在级别 1 停止,这是问题的症结所在 问题。另外这个问题也不好解决 Spark 的并行化分区方法。分区什么 在?任何分区都可能包含与您正在查找的集合相关的数据 为了。

  1. 更好的建议是在那里进行处理并 sqooping 到 Hive 表中,或者使用 JDBC 通过 Spark 进行读取。

  2. 您可以按照这种记录不足的方法进行模拟https://sqlandhadoop.com/how-to-implement-recursive-queries-in-spark/ https://sqlandhadoop.com/how-to-implement-recursive-queries-in-spark/这里,这就是我有时用于 BI 后台处理的方法。

  3. 如果您必须在 Spark 领域中执行此操作,那么如何使用 graphFrames 方法,但运行速度相当慢,如下所示,使用数据子集并稍微改变您的方法 - 看看您的想法:

导入 org.apache.spark.rdd.RDD

 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.graphframes._  
 sc.setCheckpointDir("/checkpoints")

// Subset of your data
val rdd = sc.parallelize( Array(("A", 1, None), ("B", 2, Some(1)), ("C", 3, Some(1)), ("D", 4, Some(2)), ("E", 5, Some(2)), ("F", 6, Some(2)),
                                ("G", 7, Some(5)), ("H", 8, Some(5)), ("X", 9, Some(10)), ("Y", 10, None), ("X2", 12, Some(13)), ("Y3", 13, None)  
                               ))
val df = rdd.toDF("v", "c", "p")

val dfV = df.select($"c".as("id"))
val dfE = df
  .withColumnRenamed("c", "src")
  .withColumnRenamed("p", "dst")

val nGraph = GraphFrame(dfV, dfE)
dfE.cache()
dfV.cache()
val res = nGraph.connectedComponents.run()
val res2 = res.join(df, res("id") === df("c"), "inner")
val res3 = res2.filter("p is not null").groupBy("component").agg(collect_list("id") as "group")
val res4 = res3.join(res2, res3("component") === res2("component") && res2("p").isNull , "inner")
res4.select($"id", $"group").show(false)

returns:

+---+---------------------+
|id |group                |
+---+---------------------+
|1  |[2, 3, 4, 5, 6, 7, 8]|
|10 |[9]                  |
|13 |[12]                 |
+---+---------------------+

我的建议:在 RDBMS 中这样做。通过这种方法使用 graphFrames 需要很长时间。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

组合 Spark Dataframe 中链接在一起的行 的相关文章

  • 方法返回类型的类型推断

    当存在显式方法时 为什么 Scala 无法推断方法的返回类型return方法中使用的语句 例如 为什么下面的代码可以编译 object Main def who 5 def main args Array String println wh
  • 如何跟踪通过elastic4s客户端发送到Elasticsearch的json请求?

    假设我使用这样的代码 ElasticClient client client execute search in places gt cities query paris start 5 limit 10 如何查看发送到 Elasticse
  • 通过Listener获取Spark thrift服务器查询中读取的行数

    我正在尝试为我们的 ST 服务器构建一个监控系统 到目前为止 诸如记录查询 检索的行 红色和花费的时间之类的事情都很好 我已经实现了一个自定义侦听器 我能够毫无问题地检索查询和时间 侦听SparkListenerSQLExecutionSt
  • 通过相邻行的差异过滤 pandas 数据框

    我有一个按日期时间索引的数据框 我想根据行的索引与前一行的索引之间的差异来过滤行 因此 如果我的标准是 删除比前一行晚一小时以上的所有行 则应删除下面示例中的第二行 2005 07 15 17 00 00 2005 07 17 18 00
  • Scala 修饰符和类型参数化

    我正在创建一个记忆类 每个类都会记忆一个函数类型并具有以下定义 class MemoizedFunction1 T1 R f T1 gt R private this val cache mutable Map T1 R def apply
  • 在 Scala 中定义具有多个隐式参数的函数

    如何定义具有多个隐式参数的函数 def myfun arg String implicit p1 String implicit p2 Int doesn t work 它们必须全部放入一个参数列表中 并且该列表必须是最后一个 def my
  • 根据连续行值差异拆分数据框

    我有一个这样的数据框 df col1 col2 col3 1 2 3 2 5 6 7 8 9 10 11 12 11 12 13 13 14 15 14 15 16 现在我想当两个连续行的 col1 差异大于 1 时从上面创建多个数据框 所
  • Scala 中简单表达式的非法开始

    我刚刚开始学习scala 在尝试实现递归函数时 我在 Eclipse 中收到错误 简单表达式的非法开始 def foo total Int nums List Int if total nums sorted head 0 0 else r
  • Scala Spark:将数据框中的双列转换为日期时间列

    我正在尝试编写代码来将日期时间列 date 和 last updated date 转换为 mm dd yyyy 格式以进行显示 它们实际上是 unix 时间转换为双精度数 我该怎么做呢 import org joda time impor
  • 在使用 Phoenix 4.5 的 CDH 5.4 上运行 Spark 作业时未找到 PhoenixOutputFormat

    我通过重新编译源代码设法在 Cloudera CDH 5 4 上配置 Phoenix 4 5 sqlline py效果很好 但火花有问题 spark submit class my JobRunner master yarn deploy
  • pySpark 映射多列

    我需要能够使用多列比较两个数据帧 pySpark尝试 get PrimaryLookupAttributeValue values from reference table in a dictionary to compare them t
  • Python:按组计算数据框中的特定出现次数

    假设我有一个 df df pd DataFrame id 12 35 37 67 99 78 product banana apple banana pear banana apple reordered 1 0 0 1 1 1 id pr
  • Spark中分布式读取CSV文件

    我正在开发一个 Spark 处理框架 它读取大型 CSV 文件 将它们加载到 RDD 中 执行一些转换 最后保存一些统计数据 相关 CSV 文件平均大小约为 50GB 我正在使用 Spark 2 0 我的问题是 当我使用sparkConte
  • 使用具有多个元素的字典过滤数据框

    我已经尝试了几个小时来在这里找到答案 但我无法在我的特定情况下找到任何答案 我能找到的最接近的是 使用字典将多个字符串包含过滤器应用于 pandas 数据框 https stackoverflow com questions 4338916
  • 仅使用 Spark ML Pipelines 进行转换

    我正在开发一个项目 其中可配置的管道和 Spark DataFrame 更改的沿袭跟踪都是必不可少的 该管道的端点通常只是修改后的 DataFrame 将其视为 ETL 任务 对我来说最有意义的是利用现有的 Spark ML Pipelin
  • mssql 的 UUID 疯狂

    我的数据库条目有一个 UUID 及其值 使用 Microsoft SQL Server Management Studio 提取 CDF86F27 AFF4 2E47 BABB 2F46B079E98B 将其加载到我的 Scala 应用程序
  • 我需要比较两个数据帧以进行类型验证并发送非零值作为输出

    我正在比较两个数据帧 基本上 这些是两个不同数据源的模式 一个来自 hive 另一个来自 SAS9 2 我需要验证两个数据源的结构 因此我将模式转换为两个数据帧 它们是 SAS 架构将采用以下格式 scala gt metadata sho
  • JavaFX 控制器如何访问其他服务?

    我将 JavaFX 2 与 Scala 一起使用 我有class Application extends javafx application Application它执行诸如读取应用程序配置等操作 然后它会启动主窗口 该主窗口需要连接到一
  • SQL 类似于 PySpark 数据帧的 NOT IN 子句

    例如 在 SQL 中 我们可以这样做select from table where col1 not in A B 我想知道是否有一个与此等效的 PySpark 我能够找到isin类似于 SQL 的函数IN条款 但没有任何内容NOT IN
  • 什么是 data.frame 可以做而 data.table 不能做的事情?

    我刚刚开始使用 R 并遇到了 data table 我发现它很棒 一个非常天真的问题 我可以忽略 data frame 来使用 data table 以避免两个包之间的语法混淆吗 来自数据表常见问题解答 http datatable r f

随机推荐

  • 报告本地时间而不是 UTC 服务器时间

    我创建了一个页面 其中向用户显示服务器报告的天气数据 时间保存为 UTC 如何从 Blazor 服务器应用程序显示本地用户或浏览器的时间 我遇到了类似的问题并创建了一个名为的库布拉佐尔时间 https github com dustout
  • List 和 IEnumerable 的区别

    在实现这个通用的同时归并排序 http en wikipedia org wiki Merge sort 作为一种代码卡塔 http en wikipedia org wiki Kata 28programming 29 我偶然发现了 IE
  • Shopify:错误:[API] 此操作需要商家批准 write_themes 范围

    我是 Shopify 新手 我正在尝试在本地设置 Shopify 主题套件 我已经创建了一个私人应用程序并设置了我的商店 但是在尝试使用主题套件访问商店时出现此错误 Errors API This action requires merch
  • Silverlight:强制画布失效或重新绘制自身?

    我有一个 Silverlight 应用程序 上面有一个 Canvas 在该画布上 我动态地 绘制 了一堆东西 但向画布添加了控件 我在画布区域之外有一个按钮可以清除内容 对象已被删除 成功 然而 Canvas 区域不会立即刷新 目前 画布本
  • Memcached 最佳实践 - 小对象和大量键还是大对象和少量键?

    I use memcached http www danga com memcached 存储复杂计算的整数结果 我有数百个可以缓存的整数对象 我应该将它们缓存在更复杂的对象中的单个键下 还是应该为对象使用数百个不同的键 我正在缓存的对象不
  • 如何使用 Angular2 将服务注入动态组件

    我有一个使用 DynamicComponentLoader 动态加载另一个组件的组件 然而 动态组件需要注入服务 但是 我不确定如何解决这个问题 我从 Angular io 文档中看到 DynamicComponentLoader 接受 R
  • JavaScript 中变音符号的标题大小写(非 ASCII)

    是否可以创建一个 JavaScript 函数 可以将字符串转换为标题大小写 但可以处理非 ASCII Unicode 字符 例如 使用以下字符 etc 例如 如果字符串是 anders ngstr m 则应将其转换为 Anders ngst
  • 耳机上听不到 SuperCollider 的声音

    我刚刚开始使用超级对撞机学习音频编程 当我播放声音时 我可以在扬声器上听到它 但在耳机上听不到 我在启动服务器时收到以下消息 启动 57110本地主机JackDriver 客户端名称是 SuperCollider SC AudioDrive
  • C#:如何打开 Windows 资源管理器窗口并选择多个文件

    在 Windows Media Player 的库中 您可以选择一个或多个音乐文件 然后 您可以右键单击并在上下文菜单中选择打开文件所在位置 这将为文件所在的每个目录打开一个 Windows 资源管理器窗口 并且将为您选择文件 假设我们的库
  • 尝试使用屏蔽输入对 LSTM Seq2Seq 执行推理时出现 CUDNN_STATUS_BAD_PARAM

    我正在使用 keras 层tensorflow 2 0建立一个简单的基于 LSTM 的 Seq2Seq 文本生成模型 versions我正在使用 Python 3 6 9 Tensorflow 2 0 0 CUDA 10 0 CUDNN 7
  • 如何通过jmx在运行时修改ThreadPoolTask​​Executor

    我在通过 JConsole 修改 MBean 属性时遇到问题 我有一个 Threading bean 它通过以下方式调用 public static void main String args throws Exception JMX ne
  • 尝试使用 localnotification 呈现其视图不在窗口层次结构中的 UIAlertController [重复]

    这个问题在这里已经有答案了 我试图在用户单击本地通知后呈现 AlertView AlertView 有取消或确定选项 extension ViewController UNUserNotificationCenterDelegate fun
  • 我对 Haskell 的重击感到困惑

    The wikibook https en wikibooks org wiki Haskell Laziness说 在这个表达中 let z length 1 5 reverse olleh in z是一声重击 但是这个堆栈溢出 http
  • 如何知道 MySQLnd 是否是活动驱动程序?

    也许这是一个显而易见的问题 但我想确定一下 我如何知道 MySQLnd 是否是活动驱动程序 我正在运行 PHP 5 3 和 MySQL 5 1 37 在 phpinfo 中列出了 mysqlnd 但仅此我无法确定我使用的是 MySQLnd
  • 构造函数中带有 const 的 & 符号

    有人可以告诉我为什么我们通常将 const 和 与在构造函数中传递的某些对象一起使用吗 Book Book const Date date 我在这里遇到的困惑是 通常在 some 函数中使用 符号 因为该值是通过引用传递的 并且函数中该变量
  • 使用ViewPager时如何处理ActionBarActivity Fragments中的AsyncTask?

    我正在使用 ActionBarActivity 创建 5 个选项卡 我使用 ViewPager 在 5 个选项卡之间滑动 使用了扩展 FragmentPagerAdapter 的SectionsPagerAdapter 每个选项卡都有一个片
  • Python 中导入所花费的时间

    我想知道内置模块和用户定义模块的导入需要多长时间 从Python3 7版本开始 新增 X importtime选项可用 要测量导入时间 只需使用该选项执行脚本即可 例如python X importtime my script py 以供参
  • 在 Swift 3 中格式化数字

    我想将数字格式设置为 123234234234 中的 123 234 234 234 取决于用户在文本字段中输入的内容 我不想管理货币 这与货币无关 而是用户必须输入一个数字 并且该数字的格式应该正确以便于阅读 不是用逗号 而是用点 我在整
  • 学习一种 Lisp 有助于学习另一种 Lisp 吗?

    学习不同的 Lisp 语言之间有协同作用吗 我目前正在学习 Emacs Lisp 因为它在我的日常 Emacs 使用中立即有用 但是我对所有 Lisp 都很着迷 所以也许有一天我会学习和使用其他语言 当我开始深入研究 Common Lisp
  • 组合 Spark Dataframe 中链接在一起的行

    我有一个数据框 其行通过各种合并相互连接 到目前为止 我已经将 DF 转换为下面的格式 我在其中执行了 groupBy Merge To 并将它们收集到一个数组中 然后将其连接回我的原始 DF 看起来像这样 df1 Ref Merge To