我需要为 flink 流拓扑编写单元测试。这基本上是一个CoFlatMapFunction
,并且它有 2 个输入。
我尝试从这个页面中获得一些灵感:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
输入的顺序对我的拓扑很重要,所以当我测试时,我不能使用StreamExecutionEnvironment#fromCollection
对于每个输入,因为我不会控制在每个输入中注入数据点的顺序。
我尝试使用创建单个输入StreamExecutionEnvironment#fromCollection
并将每个元素分派到我的实际输入CoFlatMapFunction
基于它们的类型,但元素的顺序在此操作中丢失。
还有其他方法来编写这个测试吗?
flink 训练练习中有一个使用 TwoInputStreamOperatorTestHarness 的示例,您可以参考:
https://github.com/dataArtisans/flink-training-exercises/blob/master/src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinTest.java https://github.com/dataArtisans/flink-training-exercises/blob/master/src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinTest.java
您将需要这些依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
您应该记住,这不是一个公共的、受支持的接口,因此它可能会以意想不到的方式发展。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)