Flink 中复杂拓扑(多输入)的集成测试

2024-02-05

我需要为 flink 流拓扑编写单元测试。这基本上是一个CoFlatMapFunction,并且它有 2 个输入。

我尝试从这个页面中获得一些灵感:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

输入的顺序对我的拓扑很重要,所以当我测试时,我不能使用StreamExecutionEnvironment#fromCollection对于每个输入,因为我不会控制在每个输入中注入数据点的顺序。

我尝试使用创建单个输入StreamExecutionEnvironment#fromCollection并将每个元素分派到我的实际输入CoFlatMapFunction基于它们的类型,但元素的顺序在此操作中丢失。

还有其他方法来编写这个测试吗?


flink 训练练习中有一个使用 TwoInputStreamOperatorTestHarness 的示例,您可以参考:

https://github.com/dataArtisans/flink-training-exercises/blob/master/src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinTest.java https://github.com/dataArtisans/flink-training-exercises/blob/master/src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinTest.java

您将需要这些依赖项:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils-junit</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

<dependency>
  <groupId>org.mockito</groupId>
  <artifactId>mockito-all</artifactId>
  <version>1.10.19</version>
  <type>jar</type>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

您应该记住,这不是一个公共的、受支持的接口,因此它可能会以意想不到的方式发展。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 中复杂拓扑(多输入)的集成测试 的相关文章

随机推荐

  • Visual Studio 2013.3 中新的 MVC5 项目出现错误

    想知道其他人是否经历过这种情况以及他们的解决方案是什么 在 Visual Studio 2013 中 我创建一个新的 ASP NET Web 应用程序 保留所有默认值 在下一个屏幕中 我选择 MVC 添加 MVC 的文件夹和核心引用 但不添
  • Scrum - 您何时估计产品待办事项的工作量? [关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 您的团队在 Scrum 流程的哪一部分对完成给定产品积压项目所需的工作量进行了有根据的估计 例如 假设您有一个产品待办事项列表项 内容为 Sprint
  • xCode 中存在重复符号,但不存在重复符号

    问题Xcode 版本 4 6 1 4H512 抱怨重复符号问题 重复符号 OBJC METACLASS PacksStoreHelper 位于 Users shannoga Library Developer Xcode DerivedDa
  • 如何在 flutter 中使用展示案例视图?

    I use 展示案例查看 https github com simformsolutions flutter showcaseview打包在我的应用程序中 并想展示一次 就在第一次启动之后 我怎样才能只执行一次而不在下次启动时显示它 ove
  • MapViewOfFile 返回什么? [复制]

    这个问题在这里已经有答案了 可能的重复 为什么加载程序无法加载到所需位置 https stackoverflow com questions 9698496 why wont the loader load at the desired l
  • 为什么当列包含空字符串时bcp输出null,当列包含空字符串时bcp输出空字符串?

    这让我觉得非常奇怪的行为 在发现这个之前我花了一段时间检查代码中的错误 out 从数据库表或视图复制到文件 如果指定现有文件 则该文件将被覆盖 提取数据时 请注意 bcp 实用程序将空字符串表示为 null 将 null 字符串表示为空字符
  • Excel 搜索包含子字符串的多行

    我有一个包含 2 张纸的 Excel 文件 第一个获得了列中的关键字列表 第二个在一个列上有句子 在另一列上有一个 id 因此 这两张纸看起来像这样 Sheet 1 Sheet 2 A A B the 15587 The cat is wa
  • Gomock无法使用类型map[string]*mockFoo作为map[string]foo

    我正在使用 gomock 并且我有一段希望测试的示例代码 type StructA struct client map string Foo type Foo interface foo methodFoo string func a st
  • 如何从 S3 存储桶中递归删除文件

    我在 S3 中有以下文件夹结构 有没有办法递归删除某个文件夹下的所有文件 比如foo bar1 or foo or foo bar2 1 foo bar1 1 foo bar1 2 foo bar1 3 foo bar2 1 foo bar
  • OnsenUi Angular 和登录

    我正在尝试使用 onsen cordova 开发移动应用程序 我需要的是 当应用程序启动时 它会加载login html页面 如果应用程序检测到用户已登录 则会重定向到 home html 对于每个 受保护的页面 我想调用一个检测用户是否已
  • Mac 版 Java 中的文件输入

    我们现在正在编程课上学习文件输入和输出 但我有一台 Macbook Pro 我了解如何执行此操作 并且可以在 Windows 上执行此操作 但我无法找到如何在 Mac 上执行此操作 我只是似乎不知道在 File Fred new File
  • 如何从 Java 调用 .NET 代码?

    我并不是在寻找像网络服务这样的常见答案 我正在寻找在同一台机器上运行的轻型解决方案 编辑 我正在寻找 Java 中调用 NET 方法的方法 我是作者jni4net http jni4net sf net JVM和CLR之间的开源进程间桥梁
  • 将 AppCompat 主题应用于 PreferenceFragment 中的个人首选项

    我一直在努力争取我的PreferenceFragment与我的应用程序的其余部分具有相同的基于材质的主题和样式 通过 AppCompat 这PreferenceFragment我用来管理所有应用程序设置的工具如下所示 正如您从上面的屏幕截图
  • 如何在布局中包含布局?

    如何在Android布局中包含布局 我正在创建通用布局 我想将该布局包含在另一个页面中 Edit 正如评论中正确要求的更多信息 使用include tag
  • JavaScript 中的匿名函数命名有何不同?

    我正在分析 John Resig 网站上的以下两个 url 但我不明白为匿名函数命名有何不同 我的理解是 赋予匿名函数的名称只能在函数定义内部使用 而不能在函数定义之外使用 但在以下链接中它会产生巨大的差异 http ejohn org a
  • 核心数据:迁移后,额外的迁移代码

    我希望从版本 1 数据模型迁移到版本 2 但迁移完成后我希望执行一些自定义迁移代码 我如何知道迁移是否 何时发生 是否有migrationHasCompleed委托方法或通知 为了兴趣 我希望执行调整数据库中 png 大小的自定义迁移代码
  • 包括函数已知参数的不确定性,以在 python 中使用 curve_fit 进行拟合

    我在Python中使用不确定性包 我有一个像 A x N 这样的函数 其中 N 是一个具有不确定性的固定参数 例如 N 1 0 0 1 我想通过将方程拟合到数据集 y 来获得参数 A 但有错误 yerr 我尝试使用 uncertaintie
  • 两个java库互相导入?

    我正在开发一个遗留框架 显然有两个相互依赖的库 我的意思是libA进口于libB and libB进口于libA 首先我认为这是一个糟糕的设计 但为什么有人会这样做呢 相反 哪些条件可以导致某人写下此内容 edit 每个库都依赖于另一个库中
  • ListTile 的 SelectedTileColor 不起作用

    我有一个ListView其中包含ListTile被选中时会改变颜色 然而 在下面的代码中 当有中间时Container or SizedBox在 之间ListView和Scaffold SelectedTileColor不起作用 我跟着这个
  • Flink 中复杂拓扑(多输入)的集成测试

    我需要为 flink 流拓扑编写单元测试 这基本上是一个CoFlatMapFunction 并且它有 2 个输入 我尝试从这个页面中获得一些灵感 https ci apache org projects flink flink docs s