读取多个.gz文件并识别哪一行属于哪个文件

2024-01-31

我正在读取多个 .gz 文件以使用谷歌数据流进行处理。数据的最终目的地是BigQuery。 BigQuery 表对于 .gz 文件内的 csv 文件中的每一列都有专用列。 BQ 表中还有一个附加列 file_name,它给出了该记录所属的文件名。我正在使用 TextIO.Read 读取文件并对其进行 ParDo 转换。在 DoFn 中,有一种方法可以识别传入字符串所属的文件名。

我的代码如下所示:

PCollection<String> logs = pipeline.apply(TextIO.Read.named("ReadLines")
                .from("gcs path").withCompressionType(TextIO.CompressionType.AUTO));

PCollection<TableRow> formattedResults = logs.apply(ParDo.named("Format").of(new DoFn<String, TableRow>() {}

更新1:

我现在正在尝试如下:

        PCollection<String> fileNamesCollection // this is collection of file names
        GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(options.as(GcsOptions.class));
        PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Format").of(new DoFn<String, KV<String,String>>() {
                private static final long serialVersionUID = 1L;

                @Override
                public void processElement(ProcessContext c) throws Exception {
                    ReadableByteChannel readChannel = channelFactory.open(c.element());
                    GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
                    BufferedReader br = new BufferedReader(new InputStreamReader(gzip));

                    String line = null;
                    while ((line = br.readLine()) != null) {
                        c.output(KV.of(c.element(), line));
                    }
                }
        }));

但是当我运行这个程序时,我发现 ChannelFactory 不可序列化,我有任何通道工厂正在实现可序列化接口并且可以在此处使用。

更新2:我终于能够执行程序并成功提交作业。感谢 jkff 的帮助。 下面是我的最终代码,我将其粘贴在这里,以便对其他人也有帮助。

        ProcessLogFilesOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(ProcessLogFilesOptions.class); // ProcessLogFilesOptions is a custom class
        DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
        loggingOptions.setDefaultWorkerLogLevel(Level.WARN);

        String jobName = "unique_job_name";
        options.as(BlockingDataflowPipelineOptions.class).setJobName(jobName);

        Pipeline pipeline = Pipeline.create(options);

        List<String> filesToProcess = new ArrayList<String>();
        for(String fileName : fileNameWithoutHrAndSuffix) { // fileNameWithoutHrAndSuffix has elements like Log_20160921,Log_20160922 etc
            filesToProcess.addAll((new GcsIOChannelFactory(options.as(GcsOptions.class))).match(LogDestinationStoragePath+fileName));
        }
        // at this time filesToProcess will have all logs files name as Log_2016092101.gz,Log_2016092102.gz,.........,Log_2016092201.gz,Log_2016092223.gz
        PCollection<String> fileNamesCollection = pipeline.apply(Create.of(filesToProcess));

        PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Parsing_Files").of(new DoFn<String, KV<String,String>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void processElement(ProcessContext c) throws Exception {
                    // I have to create _options here because Options, GcsIOChannelFactory are non serializable
                    ProcessLogFilesOptions _options = PipelineOptionsFactory.as(ProcessLogFilesOptions.class);
                    GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(_options.as(GcsOptions.class));
                    ReadableByteChannel readChannel = channelFactory.open(c.element());
                    GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
                    BufferedReader br = new BufferedReader(new InputStreamReader(gzip));

                    String line = null;
                    while ((line = br.readLine()) != null) {
                        c.output(KV.of(c.element(), line));
                    }

                    br.close();
                    gzip.close();
                    readChannel.close();
                }
        }));

        // Performing reshuffling here as suggested
        PCollection <KV<String,String>> withFileName = kv.apply(Reshuffle.<String, String>of());

        PCollection<TableRow> formattedResults = withFileName
                .apply(ParDo.named("Generating_TableRow").of(new DoFn<KV<String,String>, TableRow>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void processElement(ProcessContext c) throws Exception {
                    KV<String,String> kv = c.element();
                    String logLine = kv.getValue();
                    String logFileName = kv.getKey();

                    // do further processing as you want here
        }));

        // Finally insert in BQ table the formattedResults

目前来看,答案是否定的。不幸的是,如果您需要访问文件名,在这种情况下您最好的选择是自己实现文件模式扩展和文件解析(作为ParDo)。您需要记住以下几点:

  • 确保插入一个重新分配 https://github.com/apache/incubator-beam/pull/1036就在解析之前ParDo, to 防止过度融合 https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion.
  • 您可以使用GcsIoChannelFactory扩展文件模式(参见示例这个问题 https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow)并打开一个ReadableByteChannel. Use Channels.newInputStream https://docs.oracle.com/javase/7/docs/api/java/nio/channels/Channels.html#newInputStream(java.nio.channels.ReadableByteChannel)创建一个InputStream,然后将其包装成Java的标准GZipInputStream并逐行阅读 - 请参阅这个问题 https://stackoverflow.com/questions/1080381/gzipinputstream-reading-line-by-line举些例子。记得关闭流 https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html.

或者,您可以考虑编写自己的基于文件的源 https://cloud.google.com/dataflow/model/custom-io-java#convenience-source-and-reader-base-classes。但是,在这种特殊情况(.gz 文件)中,我建议不要使用它,因为该 API 主要用于可以从任何偏移量随机访问的文件。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

读取多个.gz文件并识别哪一行属于哪个文件 的相关文章

随机推荐

  • 替代 mongoDB 3.0[之前版本]中的 $strLenCP 字段

    我目前使用的是 mongo 3 0v 我需要找到聚合命令结果中每个字符串的长度 例如 db getCollection temp find key value1 key value2 key valuee2 此查询给出关键字段的长度 db
  • Python 错误 - TypeError:输入最多需要 1 个参数,得到 3 个 [重复]

    这个问题在这里已经有答案了 有人可以解释为什么我不能在目标变量中使用 your name 吗 my name Bryson my age 29 your name input What is your name your age input
  • mySQL - 使用 mysqli 应用行级锁

    使用 PHP 的 mysqli 如何应用行级锁 行级锁会阻止任何人编辑当前存在的符合您条件的行 对吗 但是他们会阻止用户插入符合您条件的行吗 Thanks 如果您想锁定特定行以防止编辑 请使用FOR UPDATE在 SELECT 查询的末尾
  • 如何用一个数组实现3个栈?

    有时 我会遇到以下面试问题 如何用一个数组实现3个堆栈 当然 任何静态分配都不是解决方案 空间 而非时间 高效 你可以 1 定义两个堆栈 从数组端点开始并沿相反方向增长 2 将第三个堆栈定义为从中间开始并向您想要的任何方向增长 3 重新定义
  • Kotlin:我们可以在Kotlin中使用EventBus(GreenRobot)的@Subscribe吗?

    我的 onEvent 在如下片段中 在我的 Kotlin 函数中捕获活动的身份验证 但是 我无法触发 onEvent Subscribe fun onEvent event AuthenticationEvent if event isAu
  • Object.GetHashCode() 的实现

    我正在阅读有效的 C https rads stackoverflow com amzn click com 0321658701并且有一条评论关于Object GetHashCode 我不明白 Object GetHashCode 使用内
  • 帆升起时出现咕噜声错误

    我在帆升降机上遇到此错误 风帆版本 v0 10 0 rc11 error Grunt module js 340 throw err Error Cannot find module home mandeep freelance hello
  • Android 联系信息更新是否有意?

    我可以通过使用 putExtra 将信息作为额外数据传递来使用 Intent 创建新联系人 是否可以使用信息创建 Intent 如果联系人已在电话簿中 它将使用新信息进行更新 实际上 您可以使用意图通过 ContactsContract 创
  • MVC 3 客户端比较验证

    这里发生了一些奇怪的事情 我有一个基本形式 br
  • WPF 自动调整元素大小

    当应用程序窗口调整大小时 我希望其中的元素也按比例调整大小 那可能吗 我尝试谷歌搜索 但找不到任何与此相关的内容 我的 XAML 代码
  • 使用 NodeJS 解包 PKCS#7 数据有效负载?

    我正在开发适用于 iOS 的 MDM NodeJS 服务器 在 Apple 文档中 给出了以下 ruby 代码 p7sign OpenSSL PKCS7 PKCS7 new req body store OpenSSL X509 Store
  • 国际象棋编程(无人工智能)——动作验证

    我正在尝试编写自己的国际象棋引擎 没有人工智能 我知道有国际象棋游戏入门套件 http www chessbin com page Chess Game Starer Kit aspx我观看它是为了获得灵感 但我没有注意到的是经过验证的动作
  • 如何在 docker-compose 中将主机网络与默认网络结合起来[重复]

    这个问题在这里已经有答案了 我正在构建包含两个容器的 docker compose 服务 这些容器之一 node 旨在支持自动发现机制 并且需要成为主机 LAN 的一部分 因为我需要由 LAN 路由器而不是内置的 docker 路由器处理多
  • 部署 Angular 项目错误类型 MIME (text/html)

    我尝试部署 Angular CLI 6 12 0 项目 当我将 dist 文件夹内容放在服务器上时 出现控制台错误 键入 MIME Le chargement du module l adresse http www sylvainalla
  • C# 中的多个 HTTP 请求

    我需要向不同的服务器并行发送大约 200 个 HTTP 请求并获得响应 我在 C 中使用 HttpWebRequest 类 但是 当并行处理请求时 我没有看到很好的时间改进 例如 如果一个请求需要 3 秒才能获得响应 则并行 2 个请求 6
  • 在 GithubActions CI 中安装用于 UWP 开发的 SDK?

    我正在尝试构建一些 UWP 库 但收到此错误 D a ZXing Net Xamarin ZXing Net Xamarin Source ZXing Net Mobile WindowsUniversal ZXing Net Mobile
  • matplotlib:图例标题的对齐

    在matplotlib中 如何调整图例标题 它始终居中 但我需要它与图例框左对齐 我尝试更改标题的文本艺术家的对齐方式 但没有效果 详细信息请参见以下示例 from pylab import x linspace 0 1 plot x x
  • Flutter:如何检查字符串中的字母是否相同[重复]

    这个问题在这里已经有答案了 我正在制作一个表单 供用户通过输入卖家的帐户名称和帐户类型来添加新卖家 但在将新卖家添加到firestore之前 它会检查该卖家是否存在 现在我的代码仅检查输入的名称是否与数据库中的名称非常相似 例如 数据库中有
  • Javafx Tile Pane,设置最大列数

    首先 我将解释我的目标 我想渲染一个像这样的表格 每个单元格的内容在执行时间中确定 但其大小固定为 13x13 因此 我的方法是创建一个平铺窗格 将列数设置为 13 并创建单元格 pane new TilePane pane setPadd
  • 读取多个.gz文件并识别哪一行属于哪个文件

    我正在读取多个 gz 文件以使用谷歌数据流进行处理 数据的最终目的地是BigQuery BigQuery 表对于 gz 文件内的 csv 文件中的每一列都有专用列 BQ 表中还有一个附加列 file name 它给出了该记录所属的文件名 我