Flink - 多源集成测试

2023-12-26

我有一份 Flink 工作,正在使用此处描述的方法进行集成测试:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing

该作业从两个来源获取输入,这两个来源组合在一个CoFlatMapFuntion。在测试环境中,我当前使用两个简单的 SourceFunction 来发出值,但这并不能提供对事件发出顺序的任何控制。为了正确测试作业的功能,这是必要的。

如何修改我的测试以确保一个源函数在第二个源函数之前发出其所有数据?

我已经看到了建议的方法Flink 中复杂拓扑(多输入)的集成测试 https://stackoverflow.com/questions/52153863/integration-test-for-complex-topology-multiple-inputs-in-flink,这对于单元测试来说很好,但我正在寻找一个允许我集成测试整个工作的解决方案。


我建议添加控制代码到你的两个SourceFunctions并使用MiniClusterWithClientResource。它可能看起来像下面这样:

public class JobITCase {

    private static final int NUM_TMS = 2;
    private static final int NUM_SLOTS = 2;
    private static final int PARALLELISM = NUM_SLOTS * NUM_TMS;

    @ClassRule
    public final static MiniClusterWithClientResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(
            new MiniClusterResourceConfiguration.Builder()
                .setNumberSlotsPerTaskManager(NUM_SLOTS)
                .setNumberTaskManagers(NUM_TMS)
                .build());

    @Test
    public void testJob() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(PARALLELISM);

        final MyControllableSourceFunction source1 = new MyControllableSourceFunction("source1");
        final MyControllableSourceFunction source2 = new MyControllableSourceFunction("source2");

        final DataStreamSource<Integer> input1 = env.addSource(source1);
        final DataStreamSource<Integer> input2 = env.addSource(source2);

        input1.connect(input2).map(new CoMapFunction<Integer, Integer, Integer>() {
            @Override
            public Integer map1(Integer integer) {
                System.out.println("Input 1: " + integer);
                return integer;
            }

            @Override
            public Integer map2(Integer integer) {
                System.out.println("Input 2: " + integer);
                return integer;
            }
        }).print();

        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();

        MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().submitJob(jobGraph).get();

        final CompletableFuture<JobResult> jobResultFuture = MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().requestJobResult(jobGraph.getJobID());

        final ArrayList<CompletableFuture<Void>> finishedFutures = new ArrayList<>(PARALLELISM);

        for (int i = 0; i < PARALLELISM; i++) {
            MyControllableSourceFunction.startExecution(source1, i);
            finishedFutures.add(MyControllableSourceFunction.getFinishedFuture(source1, i));
        }

        FutureUtils.waitForAll(finishedFutures).join();

        for (int i = 0; i < PARALLELISM; i++) {
            MyControllableSourceFunction.startExecution(source2, i);
        }

        jobResultFuture.join();
    }

    private static class MyControllableSourceFunction extends RichParallelSourceFunction<Integer> {

        private static final ConcurrentMap<String, CountDownLatch> startLatches = new ConcurrentHashMap<>();
        private static final ConcurrentMap<String, CompletableFuture<Void>> finishedFutures = new ConcurrentHashMap<>();

        private final String name;

        private boolean running = true;

        private MyControllableSourceFunction(String name) {
            this.name = name;
        }

        @Override
        public void run(SourceContext<Integer> sourceContext) throws Exception {
            final int index = getRuntimeContext().getIndexOfThisSubtask();

            final CountDownLatch startLatch = startLatches.computeIfAbsent(getId(index), ignored -> new CountDownLatch(1));
            final CompletableFuture<Void> finishedFuture = finishedFutures.computeIfAbsent(getId(index), ignored -> new CompletableFuture<>());

            startLatch.await();
            int counter = 0;

            while (running && counter < 10) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(counter++);
                }
            }

            finishedFuture.complete(null);
        }

        @Override
        public void cancel() {
            running = false;
        }

        private String getId(int index) {
            return name + '_' + index;
        }

        static void startExecution(MyControllableSourceFunction source, int index) {
            final CountDownLatch startLatch = startLatches.computeIfAbsent(source.getId(index), ignored -> new CountDownLatch(1));
            startLatch.countDown();
        }

        static CompletableFuture<Void> getFinishedFuture(MyControllableSourceFunction source, int index) {
            return finishedFutures.computeIfAbsent(source.getId(index), ignored -> new CompletableFuture<>());
        }
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink - 多源集成测试 的相关文章

随机推荐

  • 覆盖paint或paintComponent时需要恢复图形原始状态

    我意识到大多数Java代码都覆盖了paint或paintComponent 它们中的大多数在改变图形对象的状态后并没有恢复图形对象的旧状态 例如 setStroke setRenderingHint 我想知道在从方法返回之前恢复图形对象的旧
  • 我们可以关闭终结器吗?

    由于无法保证终结器何时运行以及是否会运行 而且如今终结器几乎被认为是一种气味 有什么方法可以说服 JVM 完全跳过所有终结过程吗 我问这个问题是因为我们有一个庞大的应用程序 当它迁移到较新的 JVM 现阶段不确定是哪个 时 它会因为看起来非
  • twig - 在 for 循环中构建数组

    是否可以迭代地用值填充树枝数组 for question in questions set multipleChoiceArray for multipleChoice in question multipleChoiceAnswers s
  • 如何构建正确的 SPARQL 查询

    我需要使用 SPARQL 查询和 dbpedia org 获取曾经为足球队效力过的所有球员 我可以让当前的团队成员使用http dbpedia org sparql http dbpedia org sparql和这个查询 PREFIX g
  • Shiny/R 错误:路径应该是项目目录中的文件

    我的 Shiny 应用程序将在本地运行 但当我尝试部署到shinyapps io 时 它不会运行 我通过删除路径中的 点 暂时解决了该问题 csv file data lt read csv Users JMJC Desktop bbtea
  • MongoDB:更新/更新插入与插入

    最近我注意到多次更新插入之间存在巨大的性能差异 通过批量操作 https docs mongodb org manual core bulk write operations 与插入 多个文档 我想知道我的说法是否正确 更新插入 更新就像f
  • 图像方向 - Android

    在过去一个月左右的时间里 我一直断断续续地与这个错误作斗争 每当我认为我已经解决了它 它似乎就会以某种形式回来 这是旧的 Android 图像旋转 90 度 错误 我在这里阅读了无数的帖子 StackOverFlow 并尝试了多种方法 但似
  • 将 [String: AnyObject] 转换为 [String: Any] [关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 我有一个 String AnyObject 类型的 Swift 变量 但是我尝试调用的函数需要一个 String Any
  • shebang 标志与 set 内置标志之间的区别

    在 shebang 行上传递给脚本的标志与使用set内置 例如 bin bash e do stuff vs bin bash set e do stuff 这个问题并不具体针对 e标志 但一般来说对于任何此类标志 显然set flags
  • WCF 服务默认值

    我的 WCF 服务有以下数据协定类 DataContract Name MyClassDTO public class MyClass private string name Default Name DataMember public s
  • 如何强制仅匿名访问控制器操作?

    我可以使用 AllowAnonymous 属性允许用户访问控制器操作 但是是否有属性只允许匿名用户访问操作 例如 AllowAnonymousOnly 不 它不存在 但是 您可以通过创建自己的属性来创建它 该属性继承自授权属性 https
  • 使用原始类型进行模乘的方法

    有没有办法构建例如 853467 21660421200929 100000000000007没有 BigInteger 库 请注意 每个数字都适合 64 位整数 但乘法结果不适合 这个解决方案似乎效率低下 int64 t mulmod i
  • Scala 集合上的高效分组聚合

    我经常需要做类似的事情 coll groupBy f mapValues foldLeft x g 达到相同效果但避免显式构造中间集合的最佳方法是什么groupBy 您可以将初始集合折叠在保存中间结果的地图上 def groupFold A
  • 如何在 Action 运行的 Bash 脚本中访问 GitHub Action 环境变量?

    我无法从操作运行的脚本中访问在 GitHub 操作配置文件顶层定义的环境变量 例如 给定以下配置文件 name x pull request on pull request env FOO bar jobs test runs on ubu
  • 为什么我仍然看到发布者未知并出现 UAC 提示?

    我制作了自己的 CA 然后制作了 pfx 文件 我正在使用 Wix 工具集来构建安装程序 在 wix 项目文件中 我使用以下内容对其进行了编辑
  • 将 Func 委托转换为字符串

    有没有办法将现有的 Func 委托转换为这样的字符串 Func
  • 错误:Kotlin:不支持的插件选项:org.jetbrains.kotlin.android:enabled=true

    今天我收到此错误 而一小时前完全相同的代码正在运行 错误 Kotlin 不支持的插件选项 org jetbrains kotlin android enabled true 并且这个项目不运行 原因 重复条目 更新 从用户文件夹中删除 An
  • Java 优化字符串与字符数组

    在我正在编写的程序中 我正在进行大量的字符串操作 我正在尝试提高性能 并且想知道使用 char 数组是否会显示出不错的性能提升 有什么建议么 你在做什么操纵 您可以发布代码示例吗 您可能想看一下字符串生成器 http java sun co
  • 用于切换功能和启用/禁用的变量

    正如我之前的问题一样 我正在制作一个巨魔功能 现在我正在尝试弄清楚如何让它切换以使其工作 这样我的朋友就不必时不时地禁止它 切换命令可以工作 但它实际上在内部不起作用 注意 我有两个不和谐帐户 因此我可以在另一个帐户上进行测试 使用切换开关
  • Flink - 多源集成测试

    我有一份 Flink 工作 正在使用此处描述的方法进行集成测试 https ci apache org projects flink flink docs stable dev stream testing html integration