Spark / Scala:使用最后一次观察进行前向填充

2023-12-09

使用 Spark 1.4.0、Scala 2.10

我一直在试图找出一种方法来用最后一个已知的观察结果转发填充空值,但我没有看到一个简单的方法。我认为这是一件很常见的事情,但找不到显示如何执行此操作的示例。

我看到用值向前填充 NaN 的函数,或用偏移量填充或移动数据的滞后/超前函数,但没有任何东西可以获取最后一个已知值。

在网上查看,我在 R 中看到了很多关于相同问题的问答,但在 Spark / Scala 中却没有。

我正在考虑映射日期范围,从结果中过滤出 NaN 并选择最后一个元素,但我想我对语法感到困惑。

使用 DataFrames 我尝试类似的方法

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)

但这对我没有任何帮助。

过滤部分不起作用;映射函数返回spark.sql.Columns的序列,但过滤器函数期望返回布尔值,因此我需要从列中获取一个值进行测试,但似乎只有返回列的列方法。

有没有办法在 Spark 上更“简单”地做到这一点?

感谢您的输入

Edit:

简单示例示例输入:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...

预期输出:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22

Note:

  1. 我有很多列,其中许多列都有这种缺失的数据模式,但不是在同一日期/时间。如果需要,我将一次转换一列。

EDIT:

按照 @zero323 的回答,我尝试了以下方法:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
   case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap

    val toCarryBd = sc.broadcast(toCarry)

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }

    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}

广播变量最终作为不带空值的值列表。这是进步,但我仍然无法让映射工作。 但我什么也没得到,因为索引i不映射到原始数据,它映射到不带 null 的子集。

我在这里缺少什么?

编辑和解决方案(从 @zero323 的答案推断):

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))

如果您使用 RDD 而不是 DataFrame,请参阅下面 Zero323 的答案以获取更多选项。上面的解决方案可能不是最有效的,但对我有用。如果您正在寻求优化,请查看 RDD 解决方案。


初始答案(单个时间序列假设):

首先,如果您无法提供,请尝试避免窗口函数PARTITION BY条款。它将数据移动到单个分区,因此大多数时候这是不可行的。

你能做的就是填补空白RDD using mapPartitionsWithIndex。由于您没有提供示例数据或预期输出,因此请认为这是伪代码而不是真正的 Scala 程序:

  • 首先让我们订购DataFrame按日期并转换为RDD

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val rows: RDD[Row] = df.orderBy($"Date").rdd
    
  • 接下来让我们找到每个分区的最后一个非空观察

    def notMissing(row: Row): Boolean = ???
    
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
      .mapPartitionsWithIndex{ case (i, iter) => 
        Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
      .collectAsMap
    
  • 并转换这个Map广播

    val toCarryBd = sc.broadcast(toCarry)
    
  • 最后再次映射分区来填补空白:

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
      // If it is the beginning of partition and value is missing
      // extract value to fill from toCarryBd.value
      // Remember to correct for empty / only missing partitions
      // otherwise take last not-null from the current partition
    }
    
    val imputed: RDD[Row] = rows
      .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
    
  • 最后转换回DataFrame

编辑(每组数据的分区/时间序列):

魔鬼在于细节。如果您的数据毕竟是分区的,那么整个问题可以使用以下方法解决groupBy。假设您只是按类型的“v”列进行分区T and Date是一个整数时间戳:

def fill(iter: List[Row]): List[Row] = {
  // Just go row by row and fill with last non-empty value
  ???
}

val groupedAndSorted = df.rdd
  .groupBy(_.getAs[T]("k"))
  .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))

val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)

val dfFilled = sqlContext.createDataFrame(rows, df.schema)

这样您就可以同时填充所有列。

可以使用 DataFrame 来完成此操作,而不是来回转换为 RDD 吗?

这取决于情况,尽管它不太可能有效。如果最大间隙相对较小,您可以执行以下操作:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column

val maxGap: Int = ???  // Maximum gap between observations
val columnsToFill: List[String] = ???  // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed 

// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
  // Generate lag values between 1 and maxGap
  val lags = (1 to maxGap).map(lag(col(c), _)over(w))
  // Add current, coalesce and set alias
  coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}


// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))


// Finally select
val dfImputed = df.select($"*" :: lags: _*)

可以轻松调整以使用每列不同的最大间隙。

在最新的 Spark 版本中实现类似结果的更简单方法是使用last with ignoreNulls:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"k").orderBy($"Date")
  .rowsBetween(Window.unboundedPreceding, -1)

df.withColumn("value", coalesce($"value", last($"value", true).over(w)))

虽然有可能掉落partitionBy条款并在全球范围内应用此方法,对于大型数据集来说,其成本将非常昂贵。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark / Scala:使用最后一次观察进行前向填充 的相关文章

  • R和spark:比较不同地理点之间的距离

    我正在处理纽约市出租车数据集 该数据集的列包括日期时间 接送纬度 经度 下车纬度 经度等 现在我想对纬度 经度进行反向地理编码以找到行政区 社区 我有两个数据框 1 第一个数据框包含我想要用最近的纽约社区名称进行分类的所有点 2 第二个数据
  • 作为单例集合的选项 - 现实生活中的用例

    标题几乎概括了这一点 Option作为单例集合有时会令人困惑 但有时它允许一个有趣的应用程序 我脑子里有一个例子 并且想了解更多这样的例子 我唯一的例子是运行for对的理解Option List T 我们可以执行以下操作 val v Som
  • 从 Monoids 的 HList 类型派生 0 的 HList

    我正在学习 Shapeless 目前我正在尝试创建一个执行以下操作的函数 给定一个类型HList它返回HList of Nones 与Option对应于给定的类型HList type 例如 create String Int HNil re
  • scala 元组拆包

    我知道这个问题已经以不同的方式出现过很多次 但我仍然不清楚 有没有办法达到以下目的 def foo a Int b Int foo a b right way to invoke foo foo getParams is there a w
  • 通过Listener获取Spark thrift服务器查询中读取的行数

    我正在尝试为我们的 ST 服务器构建一个监控系统 到目前为止 诸如记录查询 检索的行 红色和花费的时间之类的事情都很好 我已经实现了一个自定义侦听器 我能够毫无问题地检索查询和时间 侦听SparkListenerSQLExecutionSt
  • 为什么 Apache Spark 会读取嵌套结构中不必要的 Parquet 列?

    我的团队正在构建一个 ETL 流程 以使用 Spark 将原始分隔文本文件加载到基于 Parquet 的 数据湖 中 Parquet 列存储的承诺之一是查询将仅读取必要的 列条带 但我们看到意外的列被读取以获取嵌套模式结构 为了进行演示 下
  • 抽象类型与类型参数

    在什么情况下抽象类型应该优先于类型参数 添加到我的之前关于抽象类型与参数的回答 https stackoverflow com questions 1154571 scala abstract types vs generics 11547
  • Spark中RDD转换的结果是什么?

    谁能解释一下 结果是什么RDD 转换 它是新的数据集 数据副本 还是只是新的指针集 用于过滤旧数据块 RDD 转换允许您在 RDD 之间创建依赖关系 依赖关系只是产生结果 程序 的步骤 谱系链 依赖字符串 中的每个 RDD 都有一个计算其数
  • Scala Spark:将数据框中的双列转换为日期时间列

    我正在尝试编写代码来将日期时间列 date 和 last updated date 转换为 mm dd yyyy 格式以进行显示 它们实际上是 unix 时间转换为双精度数 我该怎么做呢 import org joda time impor
  • Spark UDF 错误 - 不支持 Any 类型的架构

    我正在尝试创建一个 udf 它将列中的负值替换为 0 我的数据框名为 df 包含一列名为 avg x 这是我创建 udf 的代码 val noNegative udf avg acc x Double gt if avg acc x lt
  • Spark sql 每组前 n 个

    我怎样才能获得每组的前n名 比如说前10名或前3名 spark sql http www xaprb com blog 2006 12 07 how to select the firstleastmax row per group in
  • Pyspark 应用程序仅部分利用 dataproc 集群资源

    我的 pyspark 应用程序在 106 36 MB 数据集 817 270 条记录 上运行 UDF 使用常规 python lambda 函数大约需要 100 小时 我创建了一个 Google Dataproc 集群 其中包含 20 个工
  • mssql 的 UUID 疯狂

    我的数据库条目有一个 UUID 及其值 使用 Microsoft SQL Server Management Studio 提取 CDF86F27 AFF4 2E47 BABB 2F46B079E98B 将其加载到我的 Scala 应用程序
  • 特征/类类型参数优先于方法类型参数的规则是什么

    我已经使用 scala 一段时间了 我认为我真的开始理解一切 好吧 大多数事情 但我发现自己对 Map 类中的许多方法定义感到困惑 我知道 FoldLeft 等如何工作 但我感到困惑的是 Map 函数中使用的类型参数 我们以 FoldLef
  • JavaFX 控制器如何访问其他服务?

    我将 JavaFX 2 与 Scala 一起使用 我有class Application extends javafx application Application它执行诸如读取应用程序配置等操作 然后它会启动主窗口 该主窗口需要连接到一
  • Spark 写入 S3 V4 SignatureDoesNotMatch 错误

    我遇到S3SignatureDoesNotMatch尝试使用 Spark 将 Dataframe 写入 S3 时 症状 尝试过的事情 代码失败有时但有效有时 代码可以read从 S3 没有任何问题 并且能够不时写入 S3 这排除了错误的配置
  • 如何在 Scala 中跳过可选参数?

    给定以下带有可选参数的函数 def foo a Int 1 b Int 2 c Int 3 我想保留默认值a但将新值传递给b and c仅通过位置赋值 而不是通过命名赋值 即以下任何语法都可以 foo 5 7 foo 5 7 Scala 可
  • Zeppelin:如何在 zeppelin 中重新启动 SparkContext

    我正在使用 zeppelins Spark 解释器的隔离模式 在这种模式下 它将为 Spark 集群中的每个笔记本启动一项新工作 我想在笔记本执行完成后通过 zeppelin 终止该作业 为此我做了sc stop这停止了 sparkCont
  • 使用新的反射API,如何找到类的主构造函数?

    您可以像这样获取类的所有构造函数 import scala reflect runtime universe val ctor typeOf SomeClass declaration nme CONSTRUCTOR asTerm alte
  • 使用 Scala 在 Apache Spark 中拆分字符串

    我有一个数据集 其中包含以下格式的行 制表符分隔 Title lt t gt Text 现在对于每个单词Text 我想创建一个 Word Title 一对 例如 ABC Hello World gives me Hello ABC Worl

随机推荐

  • Pandas 根据列中的值将数据帧拆分为多个 csv

    我有个问题与此类似但我需要采取进一步的措施 问题是我的文件包含 50k 多行 每行有 4 个值 Indicator Country Date 和 value 我想根据国家 地区拆分我的 CSV 我不知道有多少个国家 地区 因此所有名称相似的
  • 如何从 Pyspark 中的 Spark 数据帧创建边缘列表?

    我在用graphframes在 pyspark 中进行某些图形类型的分析 并想知道从顶点数据框架创建边列表数据框架的最佳方法是什么 例如 下面是我的顶点数据框 我有一个 id 列表 它们属于不同的组 id group a 1 b 2 c 1
  • 如何在 Python 中解释离散傅里叶变换 (FFT) 的结果

    关于这个主题有很多问题 我已经循环浏览了其中很多问题 获得了有关处理频率的概念性指导 here and here 有关 numpy 函数的文档 here 有关提取幅度和相位的操作信息 here 并走出站点 例如this or this 然而
  • 如何在 C# 中将行筛选的 DataGridView 设置为 DataTable

    我有 DataGridview 我过滤了其中的一些行 我需要将新数据源保存到新的 DataTable 由于某种原因我当前的代码不起作用 这里我如何尝试转换它 LogGridView DataSource as DataTable Defau
  • 通过子项无限嵌套 ngFor

    我发现了一些关于 Angular2 中嵌套 ngFor 循环的问题 但不是我正在寻找的问题 我想在列表中显示类别 我的 JSON 看起来像这样 Categories Title Categorie A Children Title Sub
  • 如何检索 LoaderExceptions 属性?

    我在更新服务参考时收到错误消息 自定义工具警告 无法加载一种或多种请求的类型 检索 LoaderExceptions 属性以获取更多信息 如何检索 LoaderExceptions 属性 Update 当我重新导入域对象项目时 我的错误消失
  • 在张量流中将 1 通道掩模应用于 3 通道张量

    我正在尝试将掩码 二进制 仅一个通道 应用于 RGB 图像 3 个通道 标准化为 0 1 我当前的解决方案是 我将 RGB 图像分割成它的通道 将其与蒙版相乘 然后再次连接这些通道 with tf variable scope apply
  • CodeIgniter 的 CAS 身份验证库

    我正在尝试在 CodeIgniter 应用程序中实现 CAS 身份验证 但我找不到当前是否有为其设置的库 我通过只包含类并添加一些肮脏的修复来进行管理 但如果有人知道合适的库 我认为这将是一个更干净的解决方案 我一直在浏览这里以及谷歌上的一
  • PHP:帮助处理此日期格式

    我正在使用 CodeIgniter 构建一个应用程序 我的 SQL Server 数据库中有包含日期 时间字段的记录 我正在从 m d Y 文本字段中输入的日期查询这些记录 这对应于数据库中的日期格式 不幸的是我在英国 所以我想输入日期 例
  • 如何在创建新计时器之前检查计时器是否处于活动状态

    我在另一个线程上遇到了这个计时器代码 当您按下RaisedButton同时进行多次 每次点击都会增加 1 秒 从而增加减少的速度 有关检查计时器是否已处于活动状态以及是否不让计时器处于活动状态的最简单方法的任何想法RaisedButton创
  • 如何从私有 Docker 注册表中删除镜像?

    我运行一个私人 docker 注册表 我想删除除latest来自存储库 我不想删除整个存储库 只想删除其中的一些图像 这API docs没有提到做到这一点的方法 但肯定有可能吗 目前您无法使用注册表 API 来执行该任务 它只允许您删除存储
  • wamp上安装magento的问题

    大家好 谁能帮我解决安装 magento 时遇到的问题 我的问题是我已经在 wamp 上下载了 magento 在安装过程中我收到了错误 它给出的消息是致命错误 超过了 60 秒的最大执行时间 c wamp www magento lib
  • signalr 我如何从服务器向呼叫者发布消息

    我正在使用 Signalr 1 1 4 因为我仍在使用 net4 所以无法升级到 signalr 2 基本上我想从服务器向调用者发布消息 以避免消息发送到任何未启动进程的客户端 我的集线器类看起来像这样 public class Updat
  • VBScript 中的文件名字符串空格问题

    当我运行此命令时出现错误 但我不确定原因 运行 VBScript 来执行 bat 文件 我想将任何错误消息输出到日志文件 为此 我有以下代码 Set WshShell CreateObject WScript Shell WshShell
  • 如何过滤 Quickblox 用户?

    我想根据应用程序用户的电话号码或电子邮件过滤他们 但我不希望完全匹配 而是用户应返回的部分电子邮件或部分号码作为响应 Quickblox iOS SDK 有办法吗 假设我有一些 Quickblox 用户 如下所示 ID NAME Email
  • Swift 类中的静态与类函数/变量?

    以下代码在 Swift 1 2 中编译 class myClass static func myMethod1 class func myMethod2 static var myVar1 func doSomething myClass
  • 素数生成器逻辑

    我应该去上课PrimeNumberGenerator其中有一个方法nextPrime这将打印出用户输入的数字之前的所有质数 Ex Enter a Number 20 2 3 5 7 11 13 17 19 我们的老师告诉我们应该使用嵌套fo
  • VBScript“类型不匹配”问题与“[in, out] BSTR *”参数

    我正在使用第三方 COM 对象 该对象的一些方法将值作为 BSTR 指针传回 由于 VBscript 仅支持 Variant 类型 尝试以类似 Object Method sMyString 的方式使用会合理地以 类型不匹配 错误结束 我怀
  • 如何使用切换添加和删除必需的属性

    我的用户可以访问表单 为了简化任务 我放置了一个可选择的列表 但如果答案不在列表中 他们可以手动添加原因 默认情况下需要选择列表 但如果用户访问文本字段 则该文本字段将成为必需的 并且不再需要该列表 反之亦然 HTML div class
  • Spark / Scala:使用最后一次观察进行前向填充

    使用 Spark 1 4 0 Scala 2 10 我一直在试图找出一种方法来用最后一个已知的观察结果转发填充空值 但我没有看到一个简单的方法 我认为这是一件很常见的事情 但找不到显示如何执行此操作的示例 我看到用值向前填充 NaN 的函数