flink 的文档中提到了两种类型的文件压缩器。
OutputStreamBasedFileCompactor :用户可以将压缩结果写入输出流。当用户不想或无法从输入文件中读取记录时,这非常有用。
RecordWiseFileCompactor :压缩器可以像 FileWriter 一样从输入文件中逐条读取记录并写入结果文件。
如果我没记错的话,Parquet 将元信息保存在文件末尾。显然我们需要使用RecordWiseFileCompactor。因为我们需要读取整个 Parquet 文件,以便我们可以获取文件末尾的元信息。然后我们可以使用元信息(行组数、模式)来解析文件。
来自java api https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java//org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html#RecordWiseFileCompactor-org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor.Reader.Factory-,要构造一个 RecordWiseFileCompactor,我们需要一个 RecordWiseFileCompactor.Reader.Factory 的实例。
RecordWiseFileCompactor.Reader.Factory接口有两个实现,分别是DecoderBasedReader.Factory和InputFormatBasedReader.Factory。
DecoderBasedReader.Factory 创建一个 DecoderBasedReader 实例,它从 InputStream 读取整个文件内容。我们可以将字节加载到缓冲区中并从字节缓冲区中解析文件,这显然是痛苦的。所以我们不使用这个实现。
InputFormatBasedReader.Factory 创建一个 InputFormatBasedReader,它使用以下方法读取整个文件内容文件输入格式 https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java//org/apache/flink/api/common/io/FileInputFormat.html我们传递给 InputFormatBasedReader.Factory 构造函数的供应商。
InputFormatBasedReader 实例使用 FileInputFormat 来逐条读取记录 https://github.com/apache/flink/blob/aec2d38710a67d90bd819bfdce66b5a5a646a882/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/InputFormatBasedReader.java#L50,并将记录传递给我们传递给 forBulkFormat 调用的 writer,直到文件末尾。
作者收到所有记录并将记录压缩到一个文件中 https://github.com/apache/flink/blob/8488368b86a99a064446ca74e775b67ffff0b94a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java#L40.
那么问题就变成了什么是FileInputFormat以及如何实现它。
尽管FileInputFormat类有很多方法和字段,但从上述InputFormatBasedReader源代码中我们知道,只有四个方法是从InputFormatBasedReader中调用的。
- open(FileInputSplit fileSplit),打开文件
- reachEnd(),它检查我们是否到达文件末尾
- nextRecord(),从打开的文件中读取下一条记录
- close(),清理站点
幸运的是,我们可以使用 org.apache.parquet.avro 包中的 AvroParquetReader。它已经实现了打开/读取/关闭。因此,我们可以将读取器包装在 FileInputFormat 中,并使用 AvroParquetReader 来完成所有脏活。
这是一个示例代码片段
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import java.io.IOException;
public class ExampleFileInputFormat extends FileInputFormat<GenericRecord> {
private ParquetReader<GenericRecord> parquetReader;
private GenericRecord readRecord;
@Override
public void open(FileInputSplit split) throws IOException {
Configuration config = new Configuration();
// set hadoop config here
// for example, if you are using gcs, set fs.gs.impl here
// i haven't tried to use core-site.xml but i believe this is feasible
InputFile inputFile = HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(split.getPath().toUri()), config);
parquetReader = AvroParquetReader.<GenericRecord>builder(inputFile).build();
readRecord = parquetReader.read();
}
@Override
public void close() throws IOException {
parquetReader.close();
}
@Override
public boolean reachedEnd() throws IOException {
return readRecord == null;
}
@Override
public GenericRecord nextRecord(GenericRecord genericRecord) throws IOException {
GenericRecord r = readRecord;
readRecord = parquetReader.read();
return r;
}
}
然后您可以使用示例 FileInputFormat 如下
FileSink<GenericRecord> sink = FileSink.forBulkFormat(
new Path(path),
AvroParquetWriters.forGenericRecord(schema))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.enableCompactionOnCheckpoint(10)
.build(),
new RecordWiseFileCompactor<>(
new InputFormatBasedReader.Factory<>(new SerializableSupplierWithException<FileInputFormat<GenericRecord>, IOException>() {
@Override
public FileInputFormat<GenericRecord> get() throws IOException {
FileInputFormat<GenericRecord> format = new ExampleFileInputFormat();
return format;
}
})
))
.build();
我已成功将其部署到 k8s 上的 flink 和 gcs 上的压缩文件。有一些部署注意事项。
- 您需要从以下位置下载 flink 着色的 hadoop jarhttps://flink.apache.org/downloads.html https://flink.apache.org/downloads.html(在网页中搜索 Pre-bundled Hadoop)并将 jar 放入 $FLINK_HOME/lib/
- 如果要将文件写入某些对象存储(例如 gcs),则需要遵循插件说明 https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/。请记住将插件 jar 放入插件文件夹中,而不是放入 lib 文件夹中。
- 如果您要将文件写入某些对象存储,则需要从云服务供应商下载连接器jar。例如,我正在使用 gcs 并下载 gcs-connector jar 以下GCP指令 https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage。将 jar 放入 $FLINK_HOME/lib 或 $FLINK_HOME/plugins 之外的某个文件夹中。我将连接器 jar 放入新建的文件夹 $FLINK_HOME/hadoop-lib 中
- 设置环境 HADOOP_CLASSPATH=$FLINK_HOME/lib/YOUR_SHADED_HADOOP_JAR:$FLINK_HOME/hadoop-lib/YOUR_CONNECTOR_JAR
完成所有这些步骤后,您就可以开始工作了。