对 Parquet 批量格式使用压缩

2024-05-08

从 Apache Flink 1.15 版本开始,您可以使用压缩功能将多个文件合并为一个。https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#compaction

我们如何使用批量 Parquet 格式进行压缩? RecordWiseFileCompactor.Reader 的现有实现(DecoderBasedReader 和 ImputFormatBasedReader)似乎不适合 Parquet。

此外,我们找不到任何压缩 Parquet 或其他批量格式的示例。


flink 的文档中提到了两种类型的文件压缩器。

OutputStreamBasedFileCompactor :用户可以将压缩结果写入输出流。当用户不想或无法从输入文件中读取记录时,这非常有用。

RecordWiseFileCompactor :压缩器可以像 FileWriter 一样从输入文件中逐条读取记录并写入结果文件。

如果我没记错的话,Parquet 将元信息保存在文件末尾。显然我们需要使用RecordWiseFileCompactor。因为我们需要读取整个 Parquet 文件,以便我们可以获取文件末尾的元信息。然后我们可以使用元信息(行组数、模式)来解析文件。

来自java api https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java//org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html#RecordWiseFileCompactor-org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor.Reader.Factory-,要构造一个 RecordWiseFileCompactor,我们需要一个 RecordWiseFileCompactor.Reader.Factory 的实例。

RecordWiseFileCompactor.Reader.Factory接口有两个实现,分别是DecoderBasedReader.Factory和InputFormatBasedReader.Factory。

DecoderBasedReader.Factory 创建一个 DecoderBasedReader 实例,它从 InputStream 读取整个文件内容。我们可以将字节加载到缓冲区中并从字节缓冲区中解析文件,这显然是痛苦的。所以我们不使用这个实现。

InputFormatBasedReader.Factory 创建一个 InputFormatBasedReader,它使用以下方法读取整个文件内容文件输入格式 https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java//org/apache/flink/api/common/io/FileInputFormat.html我们传递给 InputFormatBasedReader.Factory 构造函数的供应商。

InputFormatBasedReader 实例使用 FileInputFormat 来逐条读取记录 https://github.com/apache/flink/blob/aec2d38710a67d90bd819bfdce66b5a5a646a882/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/InputFormatBasedReader.java#L50,并将记录传递给我们传递给 forBulkFormat 调用的 writer,直到文件末尾。

作者收到所有记录并将记录压缩到一个文件中 https://github.com/apache/flink/blob/8488368b86a99a064446ca74e775b67ffff0b94a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java#L40.

那么问题就变成了什么是FileInputFormat以及如何实现它。

尽管FileInputFormat类有很多方法和字段,但从上述InputFormatBasedReader源代码中我们知道,只有四个方法是从InputFormatBasedReader中调用的。

  • open(FileInputSplit fileSplit),打开文件
  • reachEnd(),它检查我们是否到达文件末尾
  • nextRecord(),从打开的文件中读取下一条记录
  • close(),清理站点

幸运的是,我们可以使用 org.apache.parquet.avro 包中的 AvroParquetReader。它已经实现了打开/读取/关闭。因此,我们可以将读取器包装在 FileInputFormat 中,并使用 AvroParquetReader 来完成所有脏活。

这是一个示例代码片段

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;

import java.io.IOException;

public class ExampleFileInputFormat extends FileInputFormat<GenericRecord> {

    private ParquetReader<GenericRecord> parquetReader;
    private GenericRecord readRecord;


    @Override
    public void open(FileInputSplit split) throws IOException {
        Configuration config = new Configuration();
        // set hadoop config here
        // for example, if you are using gcs, set fs.gs.impl here
        // i haven't tried to use core-site.xml but i believe this is feasible
        InputFile inputFile = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(split.getPath().toUri()), config);
        parquetReader = AvroParquetReader.<GenericRecord>builder(inputFile).build();
        readRecord = parquetReader.read();
    }

    @Override
    public void close() throws IOException {
        parquetReader.close();
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return readRecord == null;
    }

    @Override
    public GenericRecord nextRecord(GenericRecord genericRecord) throws IOException {
        GenericRecord r = readRecord;
        readRecord = parquetReader.read();
        return r;
    }
}

然后您可以使用示例 FileInputFormat 如下

FileSink<GenericRecord> sink = FileSink.forBulkFormat(
                new Path(path),
                AvroParquetWriters.forGenericRecord(schema))
        .withRollingPolicy(OnCheckpointRollingPolicy.build())
        .enableCompact(
                FileCompactStrategy.Builder.newBuilder()
                        .enableCompactionOnCheckpoint(10)
                        .build(),
                new RecordWiseFileCompactor<>(
                        new InputFormatBasedReader.Factory<>(new SerializableSupplierWithException<FileInputFormat<GenericRecord>, IOException>() {
                            @Override
                            public FileInputFormat<GenericRecord> get() throws IOException {
                                FileInputFormat<GenericRecord> format = new ExampleFileInputFormat();
                                return format;
                            }
                        })
                ))
        .build();

我已成功将其部署到 k8s 上的 flink 和 gcs 上的压缩文件。有一些部署注意事项。

  • 您需要从以下位置下载 flink 着色的 hadoop jarhttps://flink.apache.org/downloads.html https://flink.apache.org/downloads.html(在网页中搜索 Pre-bundled Hadoop)并将 jar 放入 $FLINK_HOME/lib/
  • 如果要将文件写入某些对象存储(例如 gcs),则需要遵循插件说明 https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/。请记住将插件 jar 放入插件文件夹中,而不是放入 lib 文件夹中。
  • 如果您要将文件写入某些对象存储,则需要从云服务供应商下载连接器jar。例如,我正在使用 gcs 并下载 gcs-connector jar 以下GCP指令 https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage。将 jar 放入 $FLINK_HOME/lib 或 $FLINK_HOME/plugins 之外的某个文件夹中。我将连接器 jar 放入新建的文件夹 $FLINK_HOME/hadoop-lib 中
  • 设置环境 HADOOP_CLASSPATH=$FLINK_HOME/lib/YOUR_SHADED_HADOOP_JAR:$FLINK_HOME/hadoop-lib/YOUR_CONNECTOR_JAR

完成所有这些步骤后,您就可以开始工作了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

对 Parquet 批量格式使用压缩 的相关文章

  • Flink 模式演化不适用于 POJO 类

    我有一个类满足被视为 POJO 的要求 这是我的流媒体工作中的主要传输类 它只包含原语和Map
  • 使用 Flink LocalEnvironment 进行生产

    我想了解本地执行环境的局限性以及它是否可以用于在生产中运行 感谢任何帮助 见解 谢谢 LocalExecutionEnvironment 启动一个 Flink MiniCluster 它在单个 JVM 中运行整个 Flink 系统 JobM
  • 处理时间窗口不适用于 Apache Flink 中的有限数据源

    我正在尝试将一个非常简单的窗口函数应用于 Apache Flink 中的有限数据流 本地 无集群 这是例子 val env StreamExecutionEnvironment getExecutionEnvironment env fro
  • 从 PySpark 中的 s3 子目录读取数据

    我想从 S3 存储桶中读取所有 parquet 文件 包括子目录中的所有文件 这些实际上是前缀 在 S3 URL 中使用通配符 仅适用于指定文件夹中的文件 例如 使用此代码将仅读取下面的镶木地板文件target folder df spar
  • Flink时间特性和AutoWatermarkInterval

    在 Apache Flink 中 setAutoWatermarkInterval interval 向下游操作员生成水印 以便他们提前事件时间 如果水印在指定的时间间隔内没有更改 没有事件到达 运行时将不会发出任何水印 另一方面 如果在下
  • 多个 Spark 作业通过分区将镶木地板数据附加到同一基本路径

    我有多个作业想要并行执行 这些作业使用分区将每日数据附加到同一路径中 e g dataFrame write partitionBy eventDate category mode Append parquet s3 bucket save
  • 从结构数组中选择 Spark DataFrames 中的特定列

    我有一个 Spark 数据框df具有以下架构 root k integer nullable false v array nullable true element struct containsNull true a integer nu
  • Flink 的简单 hello world 示例

    我正在寻找 Apache flink 的 hello world 体验的最简单的示例 假设我刚刚在一个干净的盒子上安装了 flink 那么为了 让它做某事 我需要做的最低限度是什么 我意识到这很模糊 这里有一些例子 来自终端的三个 pyth
  • Flink 检查点到 Google Cloud Storage

    我正在尝试为 GCS 中的 flink 作业配置检查点 如果我在本地运行测试作业 没有 docker 和任何集群设置 一切正常 但如果我使用 docker compose 或集群设置运行它并在 flink 仪表板中使用作业部署 fat ja
  • 使用复制命令和清单文件将 parquet 格式文件加载到 Amazon Redshift 时出错

    我正在尝试使用清单文件加载镶木地板文件并出现以下错误 查询 124138由于内部错误而失败 文件 https s3 amazonaws com sbredshift east data 000002 0 https s3 amazonaws
  • 基于流的应用程序中的受控/手动错误/恢复处理

    我正在开发一个基于的应用程序Apache Flink 它利用Apache Kafka用于输入和输出 该应用程序可能会被移植到Apache Spark 所以我也将其添加为标签 问题仍然相同 我要求通过 kafka 接收的所有传入消息必须按顺序
  • 如何将 500GB SQL 表转换为 Apache Parquet?

    也许这是有详细记录的 但我很困惑如何做到这一点 有很多 Apache 工具 当我创建 SQL 表时 我使用以下命令创建表 CREATE TABLE table name column1 datatype column2 datatype c
  • Flink Logging 获取作业名称或作业 ID

    我正在尝试设置 logback xml 以便它将包含与日志记录关联的 JobName 或 JobId 我还没有找到一种方法来做到这一点 是否可以 最终我想要实现的是能够将日志发送到 ElasticSearch 并用消息标记 JobName
  • Apache Flink 上的 zipWithIndex

    我想为我的输入的每一行分配一个id 这应该是一个数字0 to N 1 where N是输入中的行数 粗略地说 我希望能够执行以下操作 val data sc textFile textFilePath numPartitions val r
  • Flink 使用 Ceph 作为持久存储

    Flink 文档建议 Ceph 可以用作状态的持久存储 https ci apache org projects flink flink docs release 1 3 dev stream checkpointing html http
  • 创建具有通用返回类型的 FlinkSQL UDF

    我想定义函数MAX BY接受类型值T和类型的订购参数Number并根据排序从窗口返回最大元素 类型为T 我试过了 public class MaxBy
  • 如何正确处理自定义MapFunction中的错误?

    我已经实施了MapFunction对于我的 Apache Flink 流程 它正在解析传入元素并将其转换为其他格式 但有时会出现错误 即传入数据无效 我看到两种可能的处理方法 忽略无效元素 但似乎我无法忽略错误 因为对于任何传入元素 我必须
  • 我可以将 flink RocksDB 状态后端与本地文件系统一起使用吗?

    我正在探索使用 FlinkrocksDb 状态后端 文档似乎暗示我可以使用常规文件系统 例如 file data flink checkpoints 但代码 javadoc 仅在此处提到 hdfs 或 s3 选项 我想知道是否可以将本地文件
  • Apache Flink - “keyBy”中的异常处理

    由于代码错误或缺乏验证 进入 Flink 作业的数据可能会触发异常 我的目标是提供一致的异常处理方式 我们的团队可以在 Flink 作业中使用这种方式 而不会导致生产中出现任何停机 重启策略似乎不适用于此处 因为 简单的重启无法解决问题 我
  • Flink中为什么DataStream不支持聚合

    我是 Flink 的新手 有时 我想在 DataStream 上进行聚合 而不需要先执行 keyBy 为什么 Flink 不支持 DataStream 上的聚合 sum min max 等 谢谢你 艾哈迈德 Flink 支持非 keyed

随机推荐