窗口后 Apache-beam 挂在 groupbykey 上 - 不触发

2024-04-11

TLDR;

如何使用 python SDK 更正触发计数窗口?

Problem

我正在尝试创建一个用于转换和索引维基百科转储的管道。 目标是:

  1. 从压缩文件中读取 - 只需一个进程并以流式传输方式,因为该文件不适合 RAM
  2. 并行处理每个元素 (ParDo)
  3. 将这些元素分组到一个进程中的计数窗口中(GroupBy 仅在一个键中进行流式处理 -> 批处理),以将它们保存在数据库中。

发展

为此,我创建了一个简单的源类,它返回以下形式的元组(索引、数据、计数):

class CountingSource(beam.io.filebasedsource.FileBasedSource):
    def read_records(self, file_name, offset_range_tracker):
        # timestamp = datetime.now()
        k = 0
        with gzip.open(file_name, "rt", encoding="utf-8", errors="strict") as f:
            line = f.readline()
            while line:
                # Structure: index, page, index, page,...
                line = f.readline()
                yield line, f.readline(), k
                k += 1

我制作了管道:


_beam_pipeline_args = [
    "--runner=DirectRunner",
    "--streaming",
    # "--direct_num_workers=5",
    # "--direct_running_mode=multi_processing",
]


with beam.Pipeline(options=PipelineOptions(_beam_pipeline_args)) as pipeline:
    pipeline = (
        pipeline
        | "Read dump" >> beam.io.Read(CountingSource(dump_path))
        | "With timestamps" >> beam.Map(lambda data: beam.window.TimestampedValue(data, data[-1]))
        | "Drop timestamp" >> beam.Map(lambda data: (data[0], data[1]))
        | "Process element" >> beam.ParDo(ProcessPage())
        | "Filter nones" >> beam.Filter(lambda data: data != [])
        # * not working, keep stuck at group - not triggering the window
        | "window"
        >> beam.WindowInto(
            beam.window.GlobalWindows(),
            trigger=beam.transforms.trigger.Repeatedly(beam.transforms.trigger.AfterCount(10)),
            accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING,
        )
        | "Map to tuple" >> beam.Map(lambda data: (None, data))
        # | "Print" >> beam.Map(lambda data: print(data))
        | "Group all per window" >> beam.GroupByKey()
        | "Discard key" >> beam.Values()
        | "Index data" >> beam.Map(index_data)
    )

如果我删除窗口并直接从“Filter nones”传递到“Index data”,管道会工作,但会单独索引元素。另外,如果取消注释打印步骤,我可以看到在“映射到元组”步骤之后我仍然有数据,但它挂在“每个窗口全部分组”上,没有任何日志。我也尝试过定时触发,将窗口更改为

        >> beam.WindowInto(
            beam.window.FixedWindows(10))

但这没有改变任何东西(这应该与我在数据提取上创建“计数时间戳”相同)。 我理解窗口有问题吗?目标是批量索引数据。

选择

我可以使用自定义 do.Fn 来“破解”最后一步,例如:

class BatchIndexing(beam.DoFn):
    def __init__(self, connection_string, batch_size=50000):
        self._connection_string = connection_string
        self._batch_size = batch_size
        self._total = 0

    def setup(self):
        from sqlalchemy import create_engine
        from sqlalchemy.orm import sessionmaker
        from scripts.wikipedia.wikipedia_articles.beam_module.documents import Base

        engine = create_engine(self._connection_string, echo=False)
        self.session = sessionmaker(bind=engine)(autocommit=False, autoflush=False)
        Base.metadata.create_all(engine)

    def start_bundle(self):
        # buffer for string of lines
        self._lines = []

    def process(self, element):
        # Input element is the processed pair
        self._lines.append(element)
        if len(self._lines) >= self._batch_size:
            self._total += len(self._lines)
            self._flush_batch()

    def finish_bundle(self):
        # takes care of the unflushed buffer before finishing
        if self._lines:
            self._flush_batch()

    def _flush_batch(self):
        self.index_data(self._lines)
        # Clear the buffer.
        self._lines = []

    def index_data(self, entries_to_index):
        """
        Index batch of data.
        """
        print(f"Indexed {self._total} entries")
        self.session.add_all(entries_to_index)
        self.session.commit()


并将管道更改为:

with beam.Pipeline(options=PipelineOptions(_beam_pipeline_args)) as pipeline:
    pipeline = (
        pipeline
        | "Read dump" >> beam.io.Read(CountingSource(dump_path))
        | "Drop timestamp" >> beam.Map(lambda data: (data[0], data[1]))
        | "Process element" >> beam.ParDo(ProcessPage())
        | "Filter nones" >> beam.Filter(lambda data: data != [])
        | "Unroll" >> beam.FlatMap(lambda data: data)
        | "Index data" >> beam.ParDo(BatchIndexing(connection_string, batch_size=10000))
    )

哪个“有效”,但并行执行最后一步(因此,压倒数据库或使用 sqlite 生成锁定数据库问题),并且我希望只有一个接收器与数据库通信。


在 Beam 中触发并不是硬性要求。我的猜测是触发器在输入结束之前无法触发。 10 个元素的早期触发意味着跑步者可以在 10 个元素之后触发,但不是必须的(与 Beam 如何将输入拆分为bundles https://beam.apache.org/documentation/runtime/model/).

The FixedWindows(10)固定为 10 秒间隔,并且您的数据都将具有相同的时间戳,因此这也没有帮助。

如果您的目标是将数据分组为批次,则有一个非常方便的转换:分组 https://beam.apache.org/documentation/transforms/python/aggregation/groupintobatches/,它应该适用于该用例,并且具有其他功能,例如限制记录在处理之前可以在批处理中等待的时间。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

窗口后 Apache-beam 挂在 groupbykey 上 - 不触发 的相关文章

  • Pandas - 按每个可能的键组合聚合

    我有一个 DataFrame Pandas 我想通过 A B C 和 D 列的组合尽可能按数据进行分组 假设它具有以下形式 A B C D E F G 0 Y X Y Z 1 2 7 1 Y X Y Z 3 4 8 2 X Y U V 1
  • 默认可变参数的惯用方式

    在 python 中 如果直接将可变类型设置为默认参数 则会出现众所周知的边缘情况 def foo x return x y foo y append 1 print foo 通常的解决方法是将参数默认为None然后将其放入体内 然而 有
  • 使用 PyQt5 拖放 QLabels

    我正在尝试使用 PyQt5 将 Qlabel 拖放到另一个 Qlabel 上 from PyQt5 QtWidgets import QApplication QWidget QToolTip QPushButton QMessageBox
  • 如何导入Python文件?

    抱歉 这绝对是重复的 但我找不到答案 我正在使用 Python 3 这是我的应用程序的结构 home common py australia new south wales fetch py 我在home 目录 运行fetch py 我如何
  • 如何将one-hot向量转换为多标签?

    我有一项多分类任务 并且我得到了像这样的单热类型预测 0 1 1 0 1 0 1 0 1 我希望将这个单热向量转换为标签 例如 1 2 1 0 2 我已经尝试过 tf argmax 但它不起作用 那么我该如何处理呢 使用列表理解 oheLi
  • 如何使用Python高效地将CSV文件数据插入MYSQL?

    我有一个带有 aprox 的 CSV 输入文件 400 万条记录 插入已运行超过 2 小时 但仍未完成 数据库仍然是空的 关于如何实际插入值的任何建议 使用insert into 并且更快 比如将插入物分成块 我对 python 还很陌生
  • SQLite 在使用之间不保存数据

    我制作了一个包含以下内容的模块 import sqlite3 as sq connection sq connect test db cursor connection cursor cursor execute DROP TABLE IF
  • keras 中的增量学习

    我正在寻找 scikit learn 的 keras 等效项partial fit https scikit learn org 0 15 modules scaling strategies html incremental learni
  • 如何更改Python使用的SQLite版本?

    我在 Debian 9 12 上安装了 Python 3 8 和 SQLite 3 16 2 并且需要升级到较新版本的 SQLite 我已经下载并编译了 SQLite 网站上提供的合并 并将其放入 usr bin 所以当我这样做时 sqli
  • requests.iter_content() 认为文件已完成,但事实并非如此

    这个问题与我见过的其他问题不同requests iter content 在那里面requests似乎认为它已成功到达我正在迭代的文件末尾 实际上 该文件已被截断且不完整 我尝试处理的文件是一个 17gb gzip 需要丰富并存储在数据库中
  • Linux 上的 Python 3.6 tkinter 窗口图标错误

    我正在从 Python GUI 编程手册 学习 Python GUI 某项任务要求我通过将以下代码添加到我的配方中来更改窗口图标 Change the main windows icon win iconbitmap r C Python3
  • 如何使用 python aiohttp 连接到 .onion 网站?

    我正在尝试连接到 onion使用 python 的网站 我在端口 9050 上运行 Tor 但收到以下错误 Traceback most recent call last File Users jane code test test py
  • 如何检查discord.py中的所有者

    我试图让这个命令只有所有者才能运行它 是否有办法检查服务器的最高角色或创建者 我尝试了 commands is owner 但这仅检查某人是否是机器人的所有者 Guild owner https discordpy readthedocs
  • Java中的媒体播放器库[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在评估用于在 Java 中播放音频 视频的库 它不需要 100 Java Java 与本机库的绑定
  • 如何从 PyObject 获取指向字符串的 char*

    我怎样才能得到一个char from a PyObject它指向一个字符串 例如 这是 python 脚本 Test Connect 272 22 20 65 1234 这是 C 代码 static PyObject Connect PyO
  • 内置模块位于哪里?

    我尝试查找列出的所有目录sys path但我找不到任何builtins py文件 那么它在哪里呢 从字面上看 该模块内置于 python 解释器中 gt gt gt import builtins gt gt gt builtins
  • self.assertRaises 作为上下文管理器,但 msg 参数未按预期工作

    请检查以下代码 import unittest CORRECT MESSAGE Correct message WRONG MESSAGE Wrong message def fn raise KeyError CORRECT MESSAG
  • Keras ImageDataGenerator 相当于 csv 文件

    我在文件夹中排序了一堆数据 如下图所示 我需要构建一个 DataIterator 以便将数据放入神经网络模型中 当数据是图像时 我找到了很多例子来解决这个问题 使用 Keras 类图像数据生成器及其方法流自目录 但当数据是 csv 结构时则
  • Python range() 和 zip() 对象类型

    我了解功能如何range and zip 可以在 for 循环中使用 然而我期望range 输出一个列表 很像seq在 Unix shell 中 如果我运行以下代码 a range 10 print a 输出是range 10 表明它不是一
  • Python中矩阵元素的双重求和

    基于下面的简化示例 我想在我的代码中 from sympy import import numpy as np init printing x y symbols x y mat Matrix x 1 1 y X 1 2 3 Y 10 20

随机推荐

  • 在 TPL 中返回空静态任务是一种不好的做法吗?

    在某些情况下 我想有条件地运行任务 我使用某种扩展方法 如下所示 public static class MyTaskExtension private static Task theEmptyTask Task Factory Start
  • 如何在 Laravel 5.5 中获取验证消息

    大家好 我正在开发 Laravel 5 5 在这里我需要显示我的 API 的验证消息 到目前为止我已经这样做了 validator Validator make request gt all first name gt email requ
  • grep 时间命令输出

    Using time ls 我有以下输出 time ls l total 2 rwx 1 FRIENDS None 97 Jun 23 08 59 location txt rw r r 1 FRIENDS None 10 Jun 23 0
  • JavaScript OOP 原型在构造函数之外?

    我正在阅读developer mozilla org 的 OOP JS 指南 并发现了以下代码片段 function Person gender this gender gender Person prototype gender Pers
  • 标签 在 Google 脚本中意味着什么?

    是否是标签Google 脚本的特定语法或者它可以在纯 html javascript 页面中工作吗 有没有相关的描述 这类似于PHP 标签 但当我看到这个时我怀疑 正式地 这些代码在 GAS 文档中被称为 scriptlet 它们是用于服务
  • 缓存 auth_request 中的令牌

    我想缓存请求标头字段授权中的令牌 Authorization Bearer abcdefghijklmnopqrstuvwxyz 我的目标是 我不必验证验证服务器上的每个请求 如果授权令牌已缓存 且有效 则请求应调用 API 而无需验证 l
  • 如何有效地逐行迭代“Vec>”?

    我正在写一个图书馆 它采用了Vec
  • 为什么 PostgreSQL 在 Windows 上这么慢?

    我们有一个使用 MySql 运行的应用程序 在我们发现 MySql 不支持 PostGIS 所具有的某些 GIS 功能后 我们发现它不适合我们的应用程序 注意 mysql 仅支持最小边界矩形 GIS 搜索 所以我们将数据库更改为 Postg
  • 导入模块时内存使用差异

    我想知道以这些方式导入模块时内存使用情况有什么区别 import Mod1 from Mod1 import from Mod1 import a b c 主要介于前两者之间 第一个使用最少的内存 因为它只在模块范围中创建单个名称 第二个使
  • libgcc.a 和 libgcc_s.a 之间的区别?

    我们已经安装了 GCC 和libgcc在 AIX 6 1 上使用 RPM 文件 我想知道的是为什么 libgcc s a 没有在以下文件夹下创建 opt freeware lib gcc powerpc ibm aix6 1 0 0 4 2
  • ASP.NET 母版页 DefaultButton 覆盖

    我有一个带有表单元素的母版页 并将 defaultbutton 属性设置为服务器端 ImageButton 在我的其中一个页面上 我想通过在 Page Load 事件中设置 Forms DefaultButton 来 覆盖 母版页默认按钮属
  • UILabel 文本没有改变,但是 xx.title 正在工作

    我有两个视图控制器 在第一个视图控制器中 我有名称列表 当我单击它时 我希望在第二个视图控制器中显示相同的名称 我有下面的代码 void tableView UITableView tableView didSelectRowAtIndex
  • didChangeObject:未调用 NSFetchedResultsController

    我几乎尝试了所有方法 但无法找出问题所在 我有一个 NSFetchedResultsController 并从核心数据中获取一些帖子 然后我有一个方法 可以将新帖子插入到同一上下文中并保存上下文 通常 现在应该调用 didChangeObj
  • 将图像置于 div 中居中

    我已经在 a 中设置了图像的边框div成为无 我现在想将该图像放在其包含的 div 中居中 我尝试过使用margin 0 auto 但这没有用 我确信我忽略了一些愚蠢的事情 但我想寻求 stackoverflow 社区的帮助 这样我就不需要
  • Azure WebJobs SDK 基础知识

    我想了解 Azure WebJobs SDK 上的 JobHostConfiguration 我在哪里可以找到配置 它在 app config 上吗 JobHostConfiguration 如何识别这是不是 IsDevelopment 我
  • 自定义属性与非自定义属性?

    实现的东西ICustomAttributeProvider接口将允许您获取已通过以下方式应用到它的自定义属性GetCustomAttributes方法 据我了解 自定义属性基本上是一个特殊的类 以 Attribute 结尾并扩展Attrib
  • iCloud Drive 通过 NSMetadataQuery 列出目录和文件

    I have built an iCloud enabled app named rmc My app now can upload files to iCloud Drive and get metadata by NSMetadataQ
  • Asp.net MVC Web Api Http 放置和删除请求失败

    我正在使用 Asp net MVC 4Web Api项目 我的应用程序使用 mvc 来实现网站 它向 Web API 发出 http 请求以实现服务器功能 对控制器的常规页面请求工作正常 并且能够显示网页 该应用程序能够使get and p
  • Hibernate Annotations - 字段访问和属性访问哪个更好?

    这个问题有点相关Hibernate注解放置问题 https stackoverflow com questions 305880 hibernate annotation placement question 但我想知道哪个是better
  • 窗口后 Apache-beam 挂在 groupbykey 上 - 不触发

    TLDR 如何使用 python SDK 更正触发计数窗口 Problem 我正在尝试创建一个用于转换和索引维基百科转储的管道 目标是 从压缩文件中读取 只需一个进程并以流式传输方式 因为该文件不适合 RAM 并行处理每个元素 ParDo