如何在流式管道中按小捆绑的 N 个元素进行批处理?

2024-05-10

我已经按照此答案中的描述实现了 N 个元素的批处理:谷歌数据流管道中的数据存储输入可以一次处理一批 N 个条目吗? https://stackoverflow.com/questions/35065109/can-datastore-input-in-google-dataflow-pipeline-be-processed-in-a-batch-of-n-ent?answertab=active#tab-top



    package com.example.dataflow.transform;

    import com.example.dataflow.event.ClickEvent;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
    import org.joda.time.Instant;

    import java.util.ArrayList;
    import java.util.List;

    public class ClickToClicksPack extends DoFn> {
        public static final int BATCH_SIZE = 10;

        private List accumulator;

        @StartBundle
        public void startBundle() {
            accumulator = new ArrayList(BATCH_SIZE);
        }

        @ProcessElement
        public void processElement(ProcessContext c) {
            ClickEvent clickEvent = c.element();
            accumulator.add(clickEvent);
            if (accumulator.size() >= BATCH_SIZE) {
                c.output(accumulator);
                accumulator = new ArrayList(BATCH_SIZE);
            }
        }

        @FinishBundle
        public void finishBundle(FinishBundleContext c) {
            if (accumulator.size() > 0) {
                ClickEvent clickEvent = accumulator.get(0);
                long time = clickEvent.getClickTimestamp().getTime();

                c.output(accumulator, new Instant(time), GlobalWindow.INSTANCE);
            }
        }
    }


但是当我在流模式下运行管道时,有很多批次只有 1 或 2 个元素。据我了解,这是因为捆绑包尺寸较小。运行一天后,批处理中的平均元素数量约为 4。我确实需要它接近 10,以便后续步骤获得更好的性能。

有没有办法控制包的大小? 或者我应该使用“GroupIntoBatches”转换来实现此目的。在这种情况下,我不清楚应该选择什么作为键。

更新: 使用 java 线程 id 或 VM 主机名作为应用“GroupIntoBatches”转换的键是一个好主意吗?


我最终用内部的“GroupIntoBatches”进行了复合转换。 以下答案包含有关密钥选择的建议:https://stackoverflow.com/a/44956702/4888849 https://stackoverflow.com/a/44956702/4888849

在我当前的实现中,我使用随机键来实现并行性,并且我对事件进行窗口化以便定期发出结果,即使一个键的事件数少于 BATCH_SIZE 也是如此。



    package com.example.dataflow.transform;

    import com.example.dataflow.event.ClickEvent;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.GroupIntoBatches;
    import org.apache.beam.sdk.transforms.PTransform;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.windowing.FixedWindows;
    import org.apache.beam.sdk.transforms.windowing.Window;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.PCollection;
    import org.joda.time.Duration;

    import java.util.Random;

    /**
     * Batch clicks into packs of BATCH_SIZE size
     */
    public class ClickToClicksPack extends PTransform, PCollection>> {
        public static final int BATCH_SIZE = 10;
        // Define window duration.
        // After window's end - elements are emitted even if there are less then BATCH_SIZE elements
        public static final int WINDOW_DURATION_SECONDS = 1;
        private static final int DEFAULT_SHARDS_NUMBER = 20;
        // Determine possible parallelism level
        private int shardsNumber = DEFAULT_SHARDS_NUMBER;

        public ClickToClicksPack() {
            super();
        }

        public ClickToClicksPack(int shardsNumber) {
            super();
            this.shardsNumber = shardsNumber;
        }

        @Override
        public PCollection> expand(PCollection input) {
            return input
                    // assign keys, as "GroupIntoBatches" works only with key-value pairs
                    .apply(ParDo.of(new AssignRandomKeys(shardsNumber)))
                    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_DURATION_SECONDS))))
                    .apply(GroupIntoBatches.ofSize(BATCH_SIZE))
                    .apply(ParDo.of(new ExtractValues()));
        }

        /**
         * Assigns to clicks random integer between zero and shardsNumber
         */
        private static class AssignRandomKeys extends DoFn> {
            private int shardsNumber;
            private Random random;

            AssignRandomKeys(int shardsNumber) {
                super();
                this.shardsNumber = shardsNumber;
            }

            @Setup
            public void setup() {
                random = new Random();
            }

            @ProcessElement
            public void processElement(ProcessContext c) {
                ClickEvent clickEvent = c.element();
                KV kv = KV.of(random.nextInt(shardsNumber), clickEvent);
                c.output(kv);
            }
        }

        /**
         * Extract values from KV
         */
        private static class ExtractValues extends DoFn>, Iterable> {
            @ProcessElement
            public void processElement(ProcessContext c) {
                KV> kv = c.element();
                c.output(kv.getValue());
            }
        }
    }


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在流式管道中按小捆绑的 N 个元素进行批处理? 的相关文章

随机推荐

  • 如何解决 服务器错误?

    我在 Visual Studio 中制作了一个 ASP NET 应用程序 当它准备好后 我做了 发布网站 我将创建的文件夹放在我的网站上 当我访问网站上的应用程序时 出现以下错误 应用程序中的服务器错误 配置错误 描述 处理服务此请求所需的
  • Matlab Solve():未给出所有解决方案

    我试图找到两条曲线的交点 syms x y g x 20 exp x 30 3 5 1 sol x sol y solve x 22 3097 2 y 16 2497 2 25 y g x x y Real true 它只提供一种解决方案
  • 如何使用 Selenium Webdriver .NET 绑定设置 Chrome 首选项?

    这是我正在使用的 用户代理可以成功设置 而下载首选项则不能 Windows 7 Chrome 26 Selenium dotnet 2 31 2 chromedriver win 26 0 1383 0 ChromeOptions chro
  • Keras 中的损失函数和度量有什么区别? [复制]

    这个问题在这里已经有答案了 我不清楚 Keras 中损失函数和指标之间的区别 该文档对我没有帮助 损失函数用于优化您的模型 这是优化器将最小化的函数 指标用于判断模型的性能 这仅供您查看 与优化过程无关
  • dplyr 总结小计

    Excel 中数据透视表的一大优点是它们会自动提供小计 首先 我想知道 dplyr 中是否已经创建了任何可以实现此目的的东西 如果没有 实现它的最简单方法是什么 在下面的示例中 我按气缸和化油器的数量显示了平均排量 对于每组气缸 4 6 8
  • 用 Java 创建迷宫求解算法

    我被分配了用 Java 创建迷宫求解器的任务 这是任务 Write an application that finds a path through a maze The maze should be read from a file A
  • 使用资源字符串的 DisplayFormat 数据注释

    我想使用 DisplayFormat 数据注释来格式化模型数据 但我想使用资源文件中存储的格式字符串 我已经能够将资源类型和名称传递给某些数据注释 例如指定错误消息时 如何告诉 DisplayFormat 从我的资源文件之一获取格式字符串
  • MS-Access 查询中的语法错误(缺少运算符)

    以下查询给了我 missing operator 语法错误 所需的输出是表中数据的组合 dbo tbl 和意见 vw 我用过的所有钥匙都存在 有任何想法吗 SELECT dbo tbl BOD fpartno AS PartNumber d
  • 将 mat3 转换为 mat4 的最简单方法

    我提取了 mat4 的左上角 3x3 旋转矩阵 glm mat4 model glm mat3 rot glm mat3 model 现在我想要单位矩阵 左上角是我的新 mat3 最简单的方法是什么 glm mat4 result resu
  • 如何在r中进行左连接[重复]

    这个问题在这里已经有答案了 我有两个数据集一和二 数据集一 a b c 111 a 1 112 b 2 113 c 3 114 d 4 115 e 5 数据集二 e d g 222 ss 11 111 ff 22 113 ww 33 114
  • 如何在一列中存储数组或多个值

    运行 Postgres 7 4 是的 我们正在升级 我需要将 1 到 100 个选定项目存储到数据库的一个字段中 98 的情况下 只会输入 1 个项目 而 2 的情况下 如果是这样的话 会输入多个项目 这些项目只不过是文本描述 截至目前 长
  • DeadSystemException启动服务Android 7

    在过去的几周里 我在我的事故报告中看到 Fatal Exception java lang RuntimeException Unable to start service com MyService ef705d8 with Intent
  • 空白约束和空约束之间的区别

    空白约束和空约束有什么区别 我有以下课程 class Task String title String notes TekUser assignedTo Date dueDate TekEvent event static constrai
  • Angular - 将焦点放在动态创建的输入字段上

    我如何将焦点添加到新创建的字段 参见到目前为止的示例 http jsfiddle net aERwc 165 http jsfiddle net aERwc 165 scope addField function console log h
  • Xaml 不知道哪些对象是在不同的程序集中定义的

    当我在一些非类中添加一个类时UI组装并在某些中使用它xaml in UI assembly出现以下错误 Provide value on System Windows StaticResourceExtension threw an exc
  • addAttr 在 jquery 中不起作用?

    我有一个示例代码
  • 将水平线添加到 html rmarkdown 文档隐藏文本部分

    我正在制作一个与 HTML 页面结合的 Rmarkdown 文档 我不想用标题或项目符号分隔某些文本部分 而是想在它们之间绘制水平线 根据http rmarkdown rstudio com authoring basics html ht
  • 我如何模拟 UserManager 和 RoleManager 进行单元测试

    我模拟了抽象类来测试类的具体方法 如下所示 var mock new Mock
  • 如何消除 jQuery Mobile 中的悬停延迟?

    我正在使用 jQuery Mobile 制作一个网站 当我将鼠标悬停在按钮上时 它会更改其类 并扩展其颜色 但感觉需要半秒左右才能完成 有没有办法减少这种延迟 您可以覆盖hoverDelay无需修改 jQuery Mobile js 库 要
  • 如何在流式管道中按小捆绑的 N 个元素进行批处理?

    我已经按照此答案中的描述实现了 N 个元素的批处理 谷歌数据流管道中的数据存储输入可以一次处理一批 N 个条目吗 https stackoverflow com questions 35065109 can datastore input