Spark 按列字数统计

2024-01-30

我们正在尝试在 Spark 中生成数据集的按列统计数据。除了使用统计库中的汇总功能之外。我们正在使用以下程序:

  1. 我们确定具有字符串值的列

  2. 生成整个数据集的键值对,使用列号作为键,列的值作为值

  3. 生成新的格式映射

    (K,V) ->((K,V),1)

然后我们使用reduceByKey来查找所有列中所有唯一值的总和。我们缓存此输出以减少进一步的计算时间。

在下一步中,我们使用 for 循环循环遍历列以查找所有列的统计信息。

我们正在尝试再次利用map reduce方式来减少for循环,但我们无法找到某种方法来实现它。这样做将使我们能够在一次执行中生成所有列的列统计信息。 for 循环方法是按顺序运行的,因此速度非常慢。

Code:

//drops the header

    def dropHeader(data: RDD[String]): RDD[String] = {
         data.mapPartitionsWithIndex((idx, lines) => {
           if (idx == 0) {
             lines.drop(1)
           }
           lines
         })
       }

    def retAtrTuple(x: String) = {
       val newX = x.split(",")
       for (h <- 0 until newX.length) 
          yield (h,newX(h))
    }



    val line = sc.textFile("hdfs://.../myfile.csv")

    val withoutHeader: RDD[String] = dropHeader(line)

    val kvPairs = withoutHeader.flatMap(retAtrTuple) //generates a key-value pair where key is the column number and value is column's value


    var bool_numeric_col = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_).sortByKey()    //this contains column indexes as key and boolean as value (true for numeric and false for string type)

    var str_cols = bool_numeric_col.filter{case (x,y) => y == false}.map{case (x,y) => x}
    var num_cols = bool_numeric_col.filter{case (x,y) => y == true}.map{case (x,y) => x}

    var str_col = str_cols.toArray   //array consisting the string col
    var num_col = num_cols.toArray   //array consisting numeric col


    val colCount = kvPairs.map((_,1)).reduceByKey(_+_)
    val e1 = colCount.map{case ((x,y),z) => (x,(y,z))}
    var numPairs = e1.filter{case (x,(y,z)) => str_col.contains(x) }

    //running for loops which needs to be parallelized/optimized as it sequentially operates on each column. Idea is to find the top10, bottom10 and number of distinct elements column wise
    for(i <- str_col){
       var total = numPairs.filter{case (x,(y,z)) => x==i}.sortBy(_._2._2)
       var leastOnes = total.take(10)
       println("leastOnes for Col" + i)
       leastOnes.foreach(println)
       var maxOnes = total.sortBy(-_._2._2).take(10)
       println("maxOnes for Col" + i)
       maxOnes.foreach(println)
       println("distinct for Col" + i + " is " + total.count)
    }

让我稍微简化一下你的问题。 (实际上很多。)我们有一个RDD[(Int, String)]我们想要找到最常见的前 10 个Strings 为每个Int(均在 0–100 范围内)。

与您的示例中所示的排序不同,使用 Spark 内置的效率更高RDD.top(n)方法。它的运行时间与数据大小成线性关系,并且需要移动的数据比排序少得多。

考虑实施top in RDD.scala https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1113。您想要执行相同的操作,但每个优先级队列(堆)Int钥匙。代码变得相当复杂:

import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private.

def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
  // A heap that only keeps the top N values, so it has bounded size.
  type Heap = BoundedPriorityQueue[(Long, String)]
  // Get the word counts.
  val counts: RDD[[(Int, String), Long)] =
    rdd.map(_ -> 1L).reduceByKey(_ + _)
  // In each partition create a column -> heap map.
  val perPartition: RDD[Map[Int, Heap]] =
    counts.mapPartitions { items =>
      val heaps =
        collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
      for (((k, v), count) <- items) {
        heaps(k) += count -> v
      }
      Iterator.single(heaps)
    }
  // Merge the per-partition heap maps into one.
  val merged: Map[Int, Heap] =
    perPartition.reduce { (heaps1, heaps2) =>
      val heaps =
        collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
      for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
        for (cv <- heap) {
          heaps(k) += cv
        }
      }
      heaps
    }
  // Discard counts, return just the top strings.
  merged.mapValues(_.map { case(count, value) => value })
}

这很有效,但也很痛苦,因为我们需要同时处理多个列。拥有一个会更容易RDD每列只需调用rdd.top(10)在各个。

不幸的是,将 RDD 分割成 N 个更小的 RDD 的简单方法需要执行 N 遍:

def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = {
  together.cache // We will make N passes over this RDD.
  (0 until columns).map {
    i => together.filter { case (key, value) => key == i }.values
  }
}

更有效的解决方案可能是按键将数据写出到单独的文件中,然后将其加载回单独的 RDD 中。这在中讨论按 Spark 键写入多个输出 - 一个 Spark 作业 https://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 按列字数统计 的相关文章

随机推荐

  • Docker 命令不再响应

    大多数 docker 命令永远不会结束 我必须用 CTRL C 手动中断它们 即使是简单的命令 例如docker ps or docker info不回应 然而 docker help and docker version还在工作 我认为特
  • 标准输入到 powershell 脚本

    我正在运行一个服务 它可以调用外部进程来修改文本流 然后再将其返回到服务 文本流从服务传递到 stdout 上的外部进程 并从 stdin 上的服务读取修改后的结果 换句话说 外部过程 命令 可以用作文本 过滤器 我想使用 powershe
  • AdapterView 不支持 addView(View)

    我开始进行 Android 开发 并且在标题上遇到了该错误 这是我的 Contacts java package us inevent toot import android os Bundle import android support
  • AWS 无法删除网络接口

    在AWS中 我有网络接口 但我无法删除它们 因为它们应该正在使用中 我尝试过通过强制选项将它们分开 但这不起作用 我如何识别网络接口使用的对象 查找网络接口所连接的资源的最佳方法是检查 ENI 的 描述 字段 根据资源类型 此描述字段可能会
  • 避免空指针

    我正在用 C 11 实现我自己的编程语言 我设计的数据类型之一是Token班级 它旨在存储从源文件读取的标记 以及标记的内容 类型以及遇到的行 令牌可以是单字符符号 长字符串 数字或名称 因此它需要能够存储不同的数据类型 可以是用于符号的字
  • Apache 重写规则类似于 Nginx try_files

    在 Nginx 中 我使用了 try files 它基本上接受对域上文件的任何请求 并将其通过名为 file parse php 的自定义 php 脚本传递 在 Nginx 中 它看起来像这样 try files url file pars
  • 对于具有 Float 原始值的 Swift 枚举,“枚举案例的原始值不是唯一的”

    根据Swift 编程语言 https developer apple com library mac documentation Swift Conceptual Swift Programming Language Enumeration
  • return new RedirectResult() 与 return Redirect()

    以下两个控制器 ActionResult 返回语句有什么区别 return new RedirectResult http www google com false and return Redirect http www google c
  • 如何检查“sudo 0.4.1 jurko 5”及更新版本中正在生成/接收哪些 SUD?

    这个问题与这个问题类似 如何输出正在生成 接收的肥皂水 https stackoverflow com questions 4426204 how can i output what suds is generating receiving
  • RecyclerView LinearLayout 管理器在横向模式下始终返回 -1 - findLastCompletelyVisibleItemPosition()

    我在用着findLastCompletelyVisibleItemPosition 确定 RecyclerView 中的最后一个可见项目 这是我如何设置布局的代码片段 mRecyclerView setHasFixedSize true L
  • 如何从结构中提取索引最高的专业化?

    我正在尝试进行一些模板元编程 并且发现需要 提取 某种类型的某种结构的专门化的最高索引 例如 如果我有一些类型 struct A template
  • 区分 Jersey、Jackson 和 JaxB API

    嗨 我一直在内部使用 Jackson 进行 JSON 处理 我想将这些对象作为 Json 提供给外部 API REST 现在 它们在内部存储为 java 对象 明显的实现是编写某种查询引擎来读取请求 从底层数据存储中检索对象 然后使用 Ja
  • 从 Mercurial 中的单个本地存储库推送到多个远程存储库

    我正在考虑使用 AppHarbor 托管一个轻量级网站 并正在研究他们的 Mercurial 集成 目前我使用 Kiln 作为远程存储库 但目前 AppHarbor 仅支持 BitBucket 集成 一个本地存储库可以有 2 个远程存储库吗
  • ES6 类变量替代方案

    目前 在 ES5 中 我们许多人在框架中使用以下模式来创建类和类变量 这很舒服 ES 5 FrameWork Class variable string variable2 true init function addItem functi
  • 如何删除 Firefox 按钮和链接上的虚线轮廓?

    我可以让 Firefox 不显示丑陋的虚线焦点轮廓links有了这个 a focus outline none 但我怎样才能做到这一点
  • 为什么设置 USER 环境变量需要 12 秒?

    使用以下代码 我经历了可怕的运行时 Option Explicit Dim ShellEnvironment Set ShellEnvironment CreateObject WScript Shell Environment USER
  • UIPopoverController:为什么我的弹出窗口没有出现在我想要的地方?

    简单的 一个视图 我使用presentPopoverFromRect在CGRect中呈现一个UIPopoverController 并且箭头或弹出框都没有出现在我传入的矩形中要求的坐标附近 有什么线索吗 我一直试图自己解决这个问题 但我放弃
  • 从 Java 中的其他类访问私有变量

    如果我想创建一个将人员添加到列表中的表单 我如何从另一个类访问该列表 我应该在哪里定义该列表 以便其他类可以访问成员 大小等 例如 如果我有 Foo 类 它具有适用于我的表单的 GUI 以及用于向列表中添加和删除人员的按钮 那么将列表声明为
  • crosstable() 导出到 csv

    你好 所以我需要制作一个交叉表 我发现有多种方法 但有一个函数可以使表格就像 Excel 中的数据透视表一样 它工作完美 但我无法将其导出到 csv 或 excel 因为它是 Crosstable 类 因此不能强制它 我如何设法将其导出为
  • Spark 按列字数统计

    我们正在尝试在 Spark 中生成数据集的按列统计数据 除了使用统计库中的汇总功能之外 我们正在使用以下程序 我们确定具有字符串值的列 生成整个数据集的键值对 使用列号作为键 列的值作为值 生成新的格式映射 K V gt K V 1 然后我