直方图 - 以并行方式进行

2023-12-24

+----+----+--------+
| Id | M1 |  trx   |
+----+----+--------+
| 1  | M1 | 11.35  |
| 2  | M1 | 3.4    |
| 3  | M1 | 10.45  |
| 2  | M1 | 3.95   |
| 3  | M1 | 20.95  |
| 2  | M2 | 25.55  |
| 1  | M2 |  9.95  |
| 2  | M2 | 11.95  |
| 1  | M2 |  9.65  |
| 1  | M2 | 14.54  |
+----+----+--------+

通过上面的数据框,我应该能够使用下面的代码生成如下的直方图。类似的问题在这里 https://stackoverflow.com/questions/59214610/histogram-issue-in-spark-scala

val (Range,counts) = df
.select(col("trx"))
.rdd.map(r => r.getDouble(0))
.histogram(10)
// Range: Array[Double] = Array(3.4, 5.615, 7.83, 10.045, 12.26, 14.475, 16.69, 18.905, 21.12, 23.335, 25.55)
// counts: Array[Long] = Array(2, 0, 2, 3, 0, 1, 0, 1, 0, 1) 

但这里的问题是,如何根据“M1”列并行创建直方图?这意味着我需要为列值 M1 和 M2 提供两个直方图输出。


首先,你需要知道histogram生成两个独立的连续作业。一种用于检测数据的最小值和最大值,另一种用于计算实际的直方图。您可以使用 Spark UI 进行检查。

我们可以按照相同的方案在您希望的任意数量的列上构建直方图,只需两项工作。然而,我们不能使用histogram函数仅用于处理一组双精度数。需要我们自己去实现。第一份工作非常简单。

val Row(min_trx : Double, max_trx : Double) = df.select(min('trx), max('trx)).head

然后我们在本地计算直方图的范围。请注意,我对所有列使用相同的范围。它允许轻松比较列之间的结果(通过将它们绘制在同一图上)。不过,每列具有不同的范围只是对此代码的一个小修改。

val hist_size = 10
val hist_step = (max_trx - min_trx) / hist_size
val hist_ranges = (1 until hist_size)
    .scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
// I add max_trx manually to avoid rounding errors that would exclude the value

这是第一部分。然后,我们可以使用 UDF 来确定每个值的最终范围,并与 Spark 并行计算所有直方图。

val range_index = udf((x : Double) => hist_ranges.lastIndexWhere(x >= _))
val hist_df = df
    .withColumn("rangeIndex", range_index('trx))
    .groupBy("M1", "rangeIndex")
    .count()
// And voilà, all the data you need is there.
hist_df.show()
+---+----------+-----+
| M1|rangeIndex|count|
+---+----------+-----+
| M2|         2|    2|
| M1|         0|    2|
| M2|         5|    1|
| M1|         3|    2|
| M2|         3|    1|
| M1|         7|    1|
| M2|        10|    1|
+---+----------+-----+

作为奖励,您可以使用 RDD API 或通过收集数据帧并在 scala 中修改它来调整数据以在本地(在驱动程序内)使用它。

这是使用 Spark 的一种方法,因为这是一个关于 Spark 的问题;-)

val hist_map = hist_df.rdd
    .map(row => row.getAs[String]("M1") ->
             (row.getAs[Int]("rangeIndex"), row.getAs[Long]("count")))
    .groupByKey
    .mapValues( _.toMap)
    .mapValues( hists => (1 to hist_size)
                    .map(i => hists.getOrElse(i, 0L)).toArray )
    .collectAsMap

编辑:如何为每列值构建一个范围:

我们不是计算 M1 的最小值和最大值,而是为列的每个值计算它groupBy.

val min_max_map = df.groupBy("M1")
    .agg(min('trx), max('trx))
    .rdd.map(row => row.getAs[String]("M1") ->
      (row.getAs[Double]("min(trx)"), row.getAs[Double]("max(trx)")))
    .collectAsMap // maps each column value to a tuple (min, max)

然后我们调整 UDF 以便它使用这个映射,我们就完成了。

// for clarity, let's define a function that generates histogram ranges
def generate_ranges(min_trx : Double, max_trx : Double, hist_size : Int) = {
    val hist_step = (max_trx - min_trx) / hist_size
    (1 until hist_size).scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
}
// and use it to generate one range per column value
val range_map = min_max_map.keys
    .map(key => key ->
        generate_ranges(min_max_map(key)._1, min_max_map(key)._2, hist_size))
    .toMap

val range_index = udf((x : Double, m1 : String) =>
                       range_map(m1).lastIndexWhere(x >= _))

最后只需替换即可range_index('trx) by range_index('trx, 'M1)每列值都有一个范围。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

直方图 - 以并行方式进行 的相关文章

随机推荐

  • Laravel:在哪里存储全局数组数据和常量?

    我刚刚开始使用 Laravel 我需要重写几年前制作的整个系统 使用 Laravel 4 作为基础框架 在我的旧系统中 我曾经有一个constant php声明了一些常量的文件 以及globals php文件包含大量数组集 例如 类别状态
  • Babel:replaceWithSourceString 给出意外的标记 (1:1)

    我正在尝试替换动态 导入 语句 下面是一个检查导入是否以加号结尾的示例 module exports function babel return visitor ImportDeclaration function path state i
  • 如何获取Appium服务器日志

    有没有办法在测试脚本中获取 Appium 服务器日志 driver manage logs get appium server 或将 appium 服务器日志重定向到控制台 我的主要目的是单独获取仪器日志而不是所有日志 info debug
  • 有什么办法可以限制 Twitter 时间轴小部件中的推文吗?

    我正在使用 Twitter 的时间轴小部件 如下所示 并且希望将推文数量限制为 5 默认情况下为 20 该网站是为使用屏幕阅读器的视障人士而嵌入的 屏幕阅读器被困在小部件内 用户被迫通过 Tab 浏览所有 20 条推文才能退出 Twitte
  • 如何克隆 MemoryStream 对象?

    我有一个MemoryStream经过的对象Stream类型参数 Stream是 C 中的抽象类 我想克隆它以创建另一个单独的MemoryStream对象与原始对象的当前位置并创建一个新的XMLReader出来 所以我将能够阅读其内容 这就是
  • 取消引用 void 指针时的reinterpret_cast 行为

    在与某人争论他在评论中提出的建议时这个答案 https stackoverflow com a 21177728 241631 我遇到了一些 gcc4 8 和 VS2013 拒绝编译的代码 但 clang 很高兴地接受它并显示正确的结果 i
  • 如何强制 GHC 内联 FFI 调用?

    我制作了小型 C 模块来提高性能 但 GHC 不内联外部函数 并且调用成本消除了加速 例如 test h int inc int x test c include test h int inc int x return x 1 Test h
  • Haskell 树木地图

    我的树定义为 data Tree a Leaf a Node Tree a Tree a deriving Show 我还声明了一个测试树 myTree Node Node Leaf 1 Leaf 2 Leaf 3 我想要做的是创建一个函数
  • 接连显示 UIMenuController 的问题

    我正在使用 UIMenuController 的新自定义功能将 复制 以外的内容添加到菜单中 以便剪切并粘贴到 Web 视图中 我所做的就是获取对共享 UIMenuController 的引用 将 UIMenuItems 的 NSArray
  • 确定从 spacy 中提取的文本是否是一个完整的句子

    我们正在研究从 PDF 中提取的句子 问题是它包括标题 页脚 目录等 有没有办法确定我们将文档传递给spacy时得到的句子是否是一个完整的句子 有没有办法过滤句子的某些部分 例如标题 一个完整的句子至少包含一个主语 一个谓语 一个宾语 并以
  • 如何使用 Dagger 2 在 Activity 或 Fragment 范围内交换测试双精度?

    编辑 小心 我已经删除了这个问题中提到的旧存储库 请参阅我自己对问题的回答以获取可能的解决方案 并随时改进它 我指的是我的帖子here https stackoverflow com questions 40405839 dagger 2
  • 如何将文件夹上传到 Google Colab?

    我想运行一个使用目录中定义的许多头文件的笔记本 所以基本上我想将整个目录上传到 Google Colab 以便我可以运行笔记本 但我无法找到任何此类选项 只能上传文件而不是完整的文件夹 那么有人可以告诉我如何将整个目录上传到 google
  • 如何阻止 Maven 重命名已安装的 jar

    我正在使用 mvn install install 命令将我们的内部文件安装到 Maven 存储库 以这种方式安装的所有 Jars 都会自动添加版本名称作为后缀 由于我们有许多带有 Jars 名称的批处理脚本 这对我们来说非常不方便 如何关
  • Backbone 和 TypeScript,一段不幸的婚姻:构建类型安全的“get”?

    我正在尝试将 TypeScript 与 Backbone js 一起使用 它 有效 但是 Backbone 的 get 和 set 失去了大部分类型安全性 我正在尝试编写一个可以恢复类型安全的辅助方法 像这样的事情 我会将其放入我的模型中
  • Alsa全双工通信

    我想使用alsa实现全双工通信 我首先编写了捕获和回放程序 并使用 UDP 通信将数据从捕获的进程传输到回放进程 当我运行两个进程时工作正常 其中一个正在捕获 另一个正在播放 将其视为从 A 到 B 的半双工 当我尝试实现另一个半双工 从
  • TortoiseSVN:移动文件不保留历史记录

    我试图将文件移动到 TortoiseSVN 存储库内的另一个文件夹 但修订历史记录未保留 我尝试使用 Repo 浏览器移动文件 右键单击拖动文件并选择 将项目移动到此处 以及工作文件夹中的文件 右键单击拖动 选择 SVN 将版本化文件移动到
  • 我可以在 .net 3.5 中调用 .net 2.0 dll 吗?

    我正在迁移到 net 3 5 我想知道我是否可以从 3 5 应用程序调用旧的 net 2 0 dll 的方法 一些外部 API 位于 net 2 0 dll 中 所以我需要那些 请告诉我 谢谢 是的 你可以这么做
  • LaTeX 矩阵在 Github Markdown 上无法正确渲染

    我试图表示一个基本向量 以下代码适用于 Visual Studio Code 我正在使用以下行 begin bmatrix X Y end bmatrix 所有空格均已删除 它应该看起来像这样 https i stack imgur com
  • RabbitMQ 在 CreateModel() 方法上关闭了连接

    我正在尝试运行教程中的 Producer 示例 但 RabbitMQ 关闭了连接 但出现异常 抛出异常 RabbitMQ Client dll 中的 RabbitMQ Client Exceptions OperationInterrupt
  • 直方图 - 以并行方式进行

    Id M1 trx 1 M1 11 35 2 M1 3 4 3 M1 10 45 2 M1 3 95 3 M1 20 95 2 M2 25 55 1 M2 9 95 2 M2 11 95 1 M2 9 65 1 M2 14 54 通过上面的