如何使用我的相等比较器对 Spark DataFrame 进行 GroupBy?

2024-03-21

我想在 DataFrame 上使用 GroupBy 运算符和我自己的相等比较器。

假设我想执行类似的操作:

df.groupBy("Year","Month").sum("Counter")

在此数据框中:

Year    | Month      | Counter  
---------------------------
2012    | Jan        | 100          
12      | January    | 200       
12      | Janu       | 300       
2012    | Feb        | 400       
13      | Febr       | 500

我必须实现两个比较器:

1) 对于“年份”栏:p.e. “2012”==“12”

2) 对于月份列:p.e. “一月”==“一月”==“一月”

假设我已经实现了这两个比较器。我怎样才能调用它们?如在this https://stackoverflow.com/questions/55128213/how-to-sort-dataframe-with-my-comparator-using-scala例如,我已经知道我必须将 DataFrame 转换为 RDD 才能使用我的比较器。

我想过使用RDD分组依据 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@groupBy%5BK%5D(f:T=%3EK,p:org.apache.spark.Partitioner)(implicitkt:scala.reflect.ClassTag%5BK%5D,implicitord:Ordering%5BK%5D):org.apache.spark.rdd.RDD%5B(K,Iterable%5BT%5D)%5D.

注意我真的需要使用比较器来做到这一点。我无法使用 UDF、更改数据或创建新列。未来的想法是拥有密文列,其中我有函数可以让我比较两个密文是否相同。我想在我的比较器中使用它们。

Edit:

此时此刻,我尝试仅用一列来完成此操作,例如:

df.groupBy("Year").sum("Counter")

我有一个包装类:

class ExampleWrapperYear (val year: Any) extends Serializable {
      // override hashCode and Equals methods
}

然后,我正在这样做:

val rdd = df.rdd.keyBy(a => new ExampleWrapperYear(a(0))).groupByKey()

我的问题是如何进行“求和”,以及如何将 keyBy 与多个列一起使用以使用 ExampleWrapperYear 和 ExampleWrapperMonth。


这个解决方案应该可行。这里是实现 hashCode 和 equals 的案例类(我们可以将它们称为比较器)。

可以根据不同的密文修改/更新hashCode和equals

  case class Year(var year:Int){

    override def hashCode(): Int = {
      this.year = this.year match {
        case 2012 => 2012
        case 12 => 2012
        case 13 => 2013
        case _ => this.year
      }
      this.year.hashCode()
    }

    override def equals(that: Any): Boolean ={
      val year1 = 2000 + that.asInstanceOf[Year].year % 100
      val year2 = 2000 + this.year % 100
      if (year1 == year2)
        true
      else
        false
    }
  }

  case class Month(var month:String){

    override def hashCode(): Int = {
      this.month = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      this.month.hashCode
    }

    override def equals(that: Any): Boolean ={
      val month1 = this.month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => this.month
      }
      val month2 = that.asInstanceOf[Month].month match {
        case "January" => "Jan"
        case "Janu" => "Jan"
        case "February" => "Feb"
        case "Febr" => "Feb"
        case _ => that.asInstanceOf[Month].month
      }
      if (month1.equals(month2))
        true
      else
        false
    }
  }

这是分组键的重要比较器,它仅使用单独的 col 比较器

  case class Key(var year:Year, var month:Month){

    override def hashCode(): Int ={
      this.year.hashCode() + this.month.hashCode()
    }

    override def equals(that: Any): Boolean ={
      if ( this.year.equals(that.asInstanceOf[Key].year) && this.month.equals(that.asInstanceOf[Key].month))
        true
      else
        false
    }
  }

  case class Record(year:Int,month:String,counter:Int)

  val df = spark.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("data.csv").as[Record]

  df.rdd.groupBy[Key](
      (record:Record)=>Key(Year(record.year), Month(record.month)))
      .map(x=> Record(x._1.year.year, x._1.month.month, x._2.toList.map(_.counter).sum))
      .toDS().show()

这使

+----+-----+-------+
|year|month|counter|
+----+-----+-------+
|2012|  Feb|    800|
|2013|  Feb|    500|
|2012|  Jan|    700|
+----+-----+-------+

for this input in data.csv

Year,Month,Counter
2012,February,400
2012,Jan,100
12,January,200
12,Janu,300
2012,Feb,400
13,Febr,500
2012,Jan,100

请注意,对于案例类“年”和“月”,还将值更新为标准值(否则无法预测它选择哪个值)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用我的相等比较器对 Spark DataFrame 进行 GroupBy? 的相关文章

随机推荐

  • RadGrid 底部的水平滚动空白

    我正在使用 RadGrid 从数据库检索数据 我的 RadGrid 中有更多列 因此我需要显示 RadGrid 水平滚动以防止页面扩展 但禁用垂直滚动 因此网格的高度应扩展以始终显示网格中的所有行 我得到了结果 但 RadGrid 底部有空
  • 使用 Chosen 链接选择

    我正在尝试将选择与Chosen https github com harvesthq chosen and Chained http www appelsiini net projects chained但我不确定我是否正确实现了 chos
  • 文件观察器创建事件

    我正在使用 net 文件监视程序监视文件夹中的某些类型的文件 mbxml 我正在使用 filewatcher 创建的事件 一旦创建的事件触发 我必须将此文件移动到另一个文件夹 这种方法的问题在于 一旦文件复制开始 就会触发创建的事件 因此
  • 存储用户时区的最佳实践 - TSQL/.Net

    我需要跟踪用户的时区 以便在他们指定的特定时间 在他们自己的时区 处理他们的信息 或不处理 显而易见的答案是将时区及其个人资料信息存储在用户数据库中 有点棘手的是夏令时 从下图中请注意 大多数北部和南部地区使用夏令时偏移 因此 存储时区偏移
  • 防止 PHP 脚本在运行时耗尽所有资源?

    我有一个每日 cron 作业 运行大约需要 5 分钟 它会收集一些数据 然后更新各种数据库 它工作正常 但问题是 在这 5 分钟内 该站点完全没有响应任何请求 无论是 HTTP 还是其他请求 看起来 cron 作业脚本在运行时会占用所有资源
  • 使用基数排序实现 std::sort 重载是否合法?

    对于适用的数据类型 良好的基数排序可以大幅击败比较排序 但是std sort通常作为 introsort 实现 有没有理由不使用基数排序来实现std sort 基数排序不足以实现std sort因为std sort仅要求类型具有可比性 但对
  • Flutter:固定高度容器内的可滚动列子项

    我有一些容器里面一个ListView这将导致可滚动内容在一个页面内 每个容器都有一个 Column 作为子容器在列中 我有一个标题和一个分隔线 然后是实际内容 我希望其中一个容器是这样的 Title divider Scrollable c
  • Windows8:设备标识符

    我目前正在尝试检索唯一的设备标识符 这是我的代码 var token Windows System Profile HardwareIdentification getPackageSpecificToken null var reader
  • 如何在新页面上显示 AJAX 响应

    我正在phonegap中开发移动应用程序并使用intel xdk 我想在新的html页面上显示ajax响应我在google上搜索并找到了这个解决方案window open 但这种方法对我不起作用并显示空白 白屏 我想显示我的数据search
  • pyqt中GUI的模型视图实现错误

    当我关闭应用程序时 以下示例代码因此错误而崩溃 QBasicTimer start QBasicTimer can only be used with threads started with QThread 这是我的代码 import s
  • 如何使用 Modelform 和 jquery 在 django 中获取相互依赖的下拉菜单?

    我是 django 和 jquery 的新手 我正在开发一个基于 django 的应用程序 其中表单中有 3 个下拉列表 1 校园 2 学校 3 中心 层次结构是校园有学校 学校有中心 我想将这些下拉菜单相互链接 例如 我有 3 个校区 即
  • 不同分区中的 COM+ 对象激活

    我创建了一个 COM 域分区 然后将其映射到 Windows 2008 服务器计算机 并将 COM 应用程序导入其中 我尝试使用以下 C 代码远程激活服务器上特定分区中的对象 partition guid Guid guidMyPartit
  • Spring Data JPA - 多对多查询

    我有两个实体 人物 和 电影 Entity public class Person some fields ManyToMany fetch FetchType LAZY mappedBy actors OrderBy id private
  • React Jest/Enzyme 测试:useHistory Hook 破坏测试

    我对 React 还很陌生 所以请原谅我的无知 我有一个组件 const Login FunctionComponent gt const history useHistory extra logic that probably not n
  • Pandas groupby 自定义组

    假设我有一个像这样的数据框 df pd DataFrame A 1 2 3 4 5 6 B a a b b c c print df A B 0 1 a 1 2 a 2 3 b 3 4 b 4 5 c 5 6 c 如何按列分组B使得这些组是
  • 未使用 c# 在 Windows 中设置环境。我哪里出错了?

    string path System Environment GetEnvironmentVariable Path Console WriteLine path if path Contains C ccstg if path EndsW
  • 通过配置管理器从 AppSettings 中获取 StringCollection

    我正在像这样访问程序集的配置 ExeConfigurationFileMap map new ExeConfigurationFileMap map ExeConfigFilename Assembly GetExecutingAssemb
  • 带有自定义函数的 window.opener 在 Safari 中不起作用

    我在使用 Safari 时遇到问题 特别是没有从父窗口中找到 window opener 函数 我调用的函数在 Chrome 和 Firefox 中运行良好 有人有什么建议吗 窗口 1 父窗口 打开窗口 2 其中包含以下内容 window
  • pod install [!] 错误:由于解析错误,解析无法继续:

    当尝试安装 pod 时 显示此错误 pod install ERROR Parsing unable to continue due to parsing error contained in the file located at Use
  • 如何使用我的相等比较器对 Spark DataFrame 进行 GroupBy?

    我想在 DataFrame 上使用 GroupBy 运算符和我自己的相等比较器 假设我想执行类似的操作 df groupBy Year Month sum Counter 在此数据框中 Year Month Counter 2012 Jan