Spark / Scala:用最后的良好观察填充 nan

2023-12-15

我正在使用 Spark 2.0.1,并希望用列中最后一个已知的值填充 nan 值。

我能找到的唯一关于火花的参考Spark / Scala:使用最后一次观察进行前向填充 or 用 pyspark 之前已知的良好值填充 null似乎使用RDD。

我宁愿留在数据框/数据集世界中并可能处理多个 nan 值。 这可能吗?

我的假设是,数据(最初从例如 CSV 文件加载)按时间排序,并且此顺序保留在分布式设置中,例如按关闭/最后一个已知值填充是正确的。也许对于大多数人来说,用先前的值填充就足够了记录中连续不存在 2 个或更多 nan 记录。这实际上成立吗? 重点是,一个

myDf.sort("foo").show

会破坏任何订单,例如全部null价值观将是第一位的。

一个小例子:

import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
         .toDF("foo","bar")
         .withColumn("foo", 'foo.cast("Date"))
         .as[FooBar]

结果是

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

我想用最后一个已知的值来修复该值。我怎样才能实现这个目标?

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|2016-01-02|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

edit

就我而言,填充上面行中的值就足够了,因为只有非常有限的错误值。

edit2

我尝试添加索引列

val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
    .toDF("foo", "bar")
    .withColumn("foo", 'foo.cast("Date"))
    .as[FooBar]
    .withColumn("rowId", monotonically_increasing_id())

然后填写最后一个值。

myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show

但上面写着以下警告:没有为窗口操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能下降。我怎样才能引入有意义的分区?

+----------+--------------------+-----+----------+
|       foo|                 bar|rowId|    fooLag|
+----------+--------------------+-----+----------+
|2016-01-01|               first|    0|      null|
|2016-01-02|              second|    1|2016-01-01|
|      null|       noValidFormat|    2|2016-01-02|
|2016-01-04|lastAssumingSameDate|    3|      null|
+----------+--------------------+-----+----------+

//用最后一个未知的空值填充空字段我尝试过,这确实有效!

val dftxt1 = spark.read.option("header","true").option("sep","\t").csv("/sdata/ph/com/r/ph_com_r_ita_javelin/inbound/abc.txt").toDF("line_name", "merge_key", "line_id")
dftxt2.select("line_name","merge_key","line_id").write.mode("overwrite").insertInto("dbname.tablename")

val df = spark.sql("select * from dbname.tablename")

val Df1 = df.withColumn("rowId", monotonically_increasing_id())

import org.apache.spark.sql.expressions.Window

val partitionWindow = Window.orderBy("rowId")

val Df2 = Df1.withColumn("line_id", last("line_id", true) over (partitionWindow))

Df2.show
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark / Scala:用最后的良好观察填充 nan 的相关文章

随机推荐

  • Subversion E160004 X的根节点的前身是Y但应该是Z

    我继承了一个大型 Subversion 存储库 74010 修订版 并且我正在尝试执行转储 加载以将存储库升级到 1 8 版本 以利用节省空间的功能 在尝试这个过程之前我跑了svnadmin verify对有问题的存储库进行检查 以确保该存
  • 在 Google 商店中将多个 Chrome 扩展程序作为单个项目发布

    Chrome 扩展程序和 Chrome 应用程序具有我需要实现某些功能的 API 但我无法仅使用扩展程序或仅使用应用程序或使用本机代码来实现此目的 所以我制作了一个扩展程序和一个应用程序 并使它们通过消息相互通信 一切正常 但现在我必须发布
  • 将表单提交到操作 php 文件

    我有一个表单 当用户单击 提交 时 我需要运行一个 php 文件 下面是表单和 php 文件
  • Spirit X3,如何让属性类型匹配规则类型?

    对于 Spirit X3 解析器的开发 我想使用语义操作 脚注 1 对我来说 控制如何将属性存储到 STL 容器中非常重要 这个问题是关于如何控制解析器属性 attr ctx 与规则类型 val ctx 匹配 以便可以正确分配它 也许这个问
  • 如何构建电影数据库和用户选择?

    我想创建电影数据库 用户可以在其中标记他 她观看和喜欢的电影 class Movies ndb Model watched ndb UserProperty liked ndb UserProperty 那行得通吗 我使用谷歌帐户 以后我应
  • PySpark - RDD 中对象的时间重叠

    我的目标是根据时间重叠对对象进行分组 我的每个对象rdd包含一个start time and end time 我可能效率很低 但我计划做的是根据每个对象是否与任何其他对象有任何时间重叠来为每个对象分配一个重叠 id 我有时间重叠的逻辑 然
  • 为什么Spring Boot时找不到bean?

    我以更方便的方式重新配置了 DAO 通过使用 JpaRepository 而不是手动执行所有样板代码 但现在每次我启动 Spring 应用程序时都会出现以下错误 APPLICATION FAILED TO START Description
  • React-Redux 和 Connect - 为什么我的状态在点击时没有更新?

    我是 redux 的新手 正在编写一个简单的投票前端 允许用户对他们最喜欢的框架 Angular React Vue 进行投票 当用户点击他们想要投票的框架时 我打算将商店中的投票增加一票 我在用着combineReducers and c
  • 多个条件Where子句

    我目前有一个查询 它将根据任何内容从我的数据库中提取大量信息where我想使用的条件 declare CaseNum char 7 ImportId char FormatId char SessionId char 5 set CaseN
  • 从文件共享运行 .NET 应用程序,无需代码签名

    每当从文件共享运行 NET exe 时 NET 安全模型都会引发安全错误 从本地驱动器运行时不会出现该错误 有谁知道无需签署代码即可解决此问题的方法吗 使用 CasPol 完全信任共享 更多答案在这里 为什么我的 NET 应用程序在从网络驱
  • Windows Azure 中的持续集成服务器

    我想在我的项目中使用持续集成系统 但是 我不想使用办公室中的服务器 而是希望我的持续集成服务器在 Windows Azure 上运行 有人设置过这个吗 是否有在 Azure 中托管 Hudson 或 CruiseControl Net 或任
  • 传递给函数时指向结构的指针根本不改变

    stack h define MAX STACK 10 typedef int STACK ITEM typedef struct Stack STACK stack c include stack h struct Stack STACK
  • Firebase 根视图控制器未找到警告

    我今天收到来自 Firebase Analytics 的奇怪警告 这是
  • mysql 具有多个变量的表结构

    我正在编写一个脚本 它将根据不同的条件计算票数并将计数存储在汇总表中 我无法理解如何构建我的表 因为会有超过 1 个变量 该脚本将每周运行一次 每次执行都会添加新的一周 一旦新的月份开始并且脚本执行 就会添加新的月份 一旦新年开始 就会添加
  • 将 r 中的列转换为行[重复]

    这个问题在这里已经有答案了 我使用代码形成了以下数据 test lt data frame dis c 10 20 30 40 dur c 30 40 60 90 method c car car Bicycle Bicycle to lo
  • 我可以为多个 mysql 命令行调用输入一次密码,而查询事先并不知道吗?

    您可以通过将查询放入文件来避免重新输入 mysql 命令行密码 就我而言 直到第一个查询完成后才确定后面的查询 这种情况发生在非交互式脚本中 因此运行 mysql 控制台不是一个选项 mysql 命令行交互有会话的概念吗 或者我可以将其设置
  • 如何按月分组并在特定月份没有值时返回零?

    这是我的mysql收入表 id title description date amount 1 Vehicle sales up From new sale up 2016 09 09 9999 99 2 Jem 2 Sales From
  • to_sql pyodbc count 字段不正确或语法错误

    我正在从 api 网站下载 Json 数据 并使用 sqlalchemy pyodbc 和 pandas 的 to sql 函数将该数据插入到 MSSQL 服务器中 我最多可以下载 10000 行 但是我必须将块大小限制为 10 否则会出现
  • Xcode:如何构建仅横向的 iPhone 程序

    在 Xcode 中 我尝试设计一个仅横向的用户界面 我将 UIViewController 和 UIView 控件都设置为横向模型 但是当我将控件 如按钮 图像 放置在 UIView 上时 当程序运行时 只有放置在左上角区域的控件会响应 看
  • Spark / Scala:用最后的良好观察填充 nan

    我正在使用 Spark 2 0 1 并希望用列中最后一个已知的值填充 nan 值 我能找到的唯一关于火花的参考Spark Scala 使用最后一次观察进行前向填充 or 用 pyspark 之前已知的良好值填充 null似乎使用RDD 我宁