Spark:强制读取模式时 Parquet DataFrame 操作失败

2024-03-17

(火花2.0.2)

当您拥有具有不同架构的镶木地板文件并在读取期间强制使用该架构时,就会出现此问题。即使您可以打印架构并运行show()好的,您无法对缺失的列应用任何过滤逻辑。

以下是两个示例架构:

// assuming you are running this code in a spark REPL
import spark.implicits._

case class Foo(i: Int)
case class Bar(i: Int, j: Int) 

So Bar包括所有领域Foo并添加一个(j)。在现实生活中,当您从模式开始时就会出现这种情况Foo后来决定您需要更多字段并最终得到模式Bar.

让我们模拟两个不同的镶木地板文件。

// assuming you are on a Mac or Linux OS
spark.createDataFrame(Foo(1)::Nil).write.parquet("/tmp/foo")
spark.createDataFrame(Bar(1,2)::Nil).write.parquet("/tmp/bar")

我们想要的是始终使用更通用的模式读取数据Bar。也就是说,在模式上写入的行Foo应该有j为空。

案例 1:我们混合读取两种模式

spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").show()
+---+----+
|  i|   j|
+---+----+
|  1|   2|
|  1|null|
+---+----+


spark.read.option("mergeSchema", "true").parquet("/tmp/foo", "/tmp/bar").filter($"j".isNotNull).show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

情况 2:我们只有 Bar 数据

spark.read.parquet("/tmp/bar").show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

情况 3:我们只有 Foo 数据

scala> spark.read.parquet("/tmp/foo").show()
+---+
|  i|
+---+
|  1|
+---+

有问题的情况是 3,我们生成的模式是类型Foo而不是Bar。由于我们迁移到架构Bar,我们希望始终获得模式Bar根据我们的数据(旧的和新的)。

建议的解决方案是以编程方式定义模式以始终Bar。让我们看看如何做到这一点:

val barSchema = org.apache.spark.sql.Encoders.product[Bar].schema
//barSchema: org.apache.spark.sql.types.StructType = StructType(StructField(i,IntegerType,false), StructField(j,IntegerType,false)) 

运行 show() 效果很好:

scala> spark.read.schema(barSchema).parquet("/tmp/foo").show()
+---+----+
|  i|   j|
+---+----+
|  1|null|
+---+----+

但是,如果您尝试过滤缺失的列 j,则会失败。

scala> spark.read.schema(barSchema).parquet("/tmp/foo").filter($"j".isNotNull).show()
17/09/07 18:13:50 ERROR Executor: Exception in task 0.0 in stage 230.0 (TID 481)
java.lang.IllegalArgumentException: Column [j] was not found in schema!
    at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:181)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:169)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:151)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:91)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:58)
    at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:63)
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
    at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
    at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
    at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:381)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:355)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:168)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

问题是由于 parquet 过滤器下推造成的,在 parquet-mr 版本

你可以检查https://issues.apache.org/jira/browse/PARQUET-389 https://issues.apache.org/jira/browse/PARQUET-389更多细节。

您可以升级 parquet-mr 版本或添加新列并基于新列进行过滤。

For eg.

dfNew = df.withColumn("new_j", when($"j".isNotNull, $"j").otherwise(lit(null))) dfNew.filter($"new_j".isNotNull)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark:强制读取模式时 Parquet DataFrame 操作失败 的相关文章

随机推荐

  • 为什么这个简单的连接查询使用子查询会明显更快?

    我有两张桌子 order details这是 100 000 行 并且outbound即 10 000 行 我需要加入他们的专栏order number 两者都是 VARCHAR 50 order number 在出站表中不唯一 CREAT
  • 如何根据返回值执行不同的查询?

    我有一个足球比赛列表 定义如下 id datetime status gameweek round id home team id 1 2019 03 31 00 00 00 1 29 12696 1243 2 2019 03 31 00
  • 如何在Python中获取所有直接子目录

    我正在尝试编写一个简单的Python脚本 它将所有子目录中的index tpl复制到index html 有一些例外 我因尝试获取子目录列表而陷入困境 import os def get immediate subdirectories a
  • 在两个不同的头文件中的两个结构中包含循环依赖项是否错误?

    我有一个非常大的程序无法编译 我怀疑它与跨结构的循环依赖关系有关 当我像下面这样编码时 它不会编译 foo h ifndef FOO define FOO include bar h typedef struct foo Foo struc
  • 创建模型时用 laravel 返回模型

    我需要将保存为 json 的新模型发送到前面 但我看不到响应中的列organizationid 这是我的模型 class Organization extends Model protected table core organizatio
  • 强制纵向模式

    好吧 由于没有人回答我之前的问题 我开始相信可能没有简单的方法可以做到这一点 但我很乐观 这是我的问题 在我的应用程序中 我使用常规 UIButton 从 ViewControllerOne 切换到 ViewControllerTwo Vi
  • 用于动态创建元素的 jQuery CSS()

    我正在使用 jQuery CSS 函数来设置一些元素的样式 element css style 这是可行的 但部分元素是在页面加载后动态创建的 这应该是 element live created function this css styl
  • Code First 一对多关系的多个外键

    我在使用 Entity Framework 6 Code First Fluent API 时遇到了一些不符合惯例的问题 一个典型的例子是我有一个名为软件的实体 我不希望数据库表被称为 Softwares 它应该被称为软件 但也有一些其他的
  • 将 ${my.property} 计算为 @Value 注释中的 SpEL 表达式

    长话短说 有没有办法解释由以下结果产生的字符串 my property 作为一个 SpEL 表达式 Value不使用转换器的注释 例如就像是 Value my property 我有一个抽象工厂 简化的 可以让我构建一些公共对象 这些对象是
  • 如何在Android即时应用程序中存储数据并在已安装的应用程序中恢复它

    据谷歌称 开发即时应用程序受到迷药 最佳实践 is 安装应用程序后保留用户状态 https developer android com topic instant apps ux best practices html keep user
  • 在 PowerShell 中通过引用传递/更新哈希表和数组

    当我了解 Perl 和 PowerShell 的不同之处时 我试图通过引用确定传递列表和更新列表之间的差异 我think我现在明白了 PowerShell 方面的 通过引用传递哈希表 调用函数时 不需要在表名前添加 ref 在函数内 表的名
  • 有什么方法可以获取模块中定义的函数列表?

    是否有任何内省的魔法可以给我一个模块中定义的函数列表 module Foo function foo foo end function bar bar end end 一些神话般的功能 例如 functions in Foo 这将返回 fo
  • 仅查看当前用户对象的列表,Django REST

    我有 2 个视图 notes 和 notes 在注释模型中models py我有所有者变量 用于存储所有者的登录信息 因为我想要有很多用户 我不想让他们看到别人的笔记 所以我创建了权限 class IsOwner permissions B
  • 删除 python 列表中的重复项但记住索引

    如何删除列表中的重复项 保留项目的原始顺序并记住列表中任何项目的第一个索引 例如 删除重复项 1 1 2 3 yields 1 2 3 但我需要记住索引 0 2 3 我正在使用Python 2 7 我会以不同的方式解决这个问题并使用Orde
  • 获取调用C#方法的实例

    我正在寻找一种算法 可以在该方法中获取调用该方法的对象 例如 public class Class1 public void Method the question object a the object that called the m
  • SVN与外部通用代码

    目前 我正在尝试使用相同的代码为多个产品设置存储库 最好的解决方案是创建共享代码的真正库并以这种方式使用它们 然而 目前这需要很长时间 这个想法是拥有一个具有以下树的单个存储库 trunk Project1 Project2 Shared
  • 在android中比较两个日期是否在同一周内

    我有两个约会 他们是从 Calendar c Calendar getInstance year c get c YEAR month c get c MONTH month date c get c DATE 其他数据分为日期 月份 2
  • 在 Curl 请求中禁用 Javascript (PHP)

    有没有办法在 PHP 的 Curl 请求中禁用 Javascript 尝试模仿浏览器请求 来自禁用 Javascript 的浏览器 这可以通过标头 用户代理 cookie 来完成吗 Thanks 有没有办法在 PHP 的 Curl 请求中禁
  • 在php中将tiff转换为jpg?

    我有一台保存 TIFF 图像的服务器 大多数客户端都可以读取和显示 TIFF 图像 因此没有问题 但是 某些客户端无法处理此格式 但可以处理 JPG 我想到使用 PHP 的 GD 库为没有 TIFF 读取能力的客户端进行服务器端转换 但我注
  • Spark:强制读取模式时 Parquet DataFrame 操作失败

    火花2 0 2 当您拥有具有不同架构的镶木地板文件并在读取期间强制使用该架构时 就会出现此问题 即使您可以打印架构并运行show 好的 您无法对缺失的列应用任何过滤逻辑 以下是两个示例架构 assuming you are running