当文档说NonInferrableReturnType
这意味着我们可以使用类型变量<T>
,或您喜欢的任何其他字母。所以你可以创建一个MapFunction
返回一个T
。但接下来你必须使用.returns(TypeInformation.of(String.class)
例如,如果您的目标是返回String
.
public class MyMapFunctionNonInferrableReturnType<T> implements MapFunction<AbstractDataModel, T> {
@Override
public T map(AbstractDataModel value) throws Exception {
return (T) value.getValue();
}
}
在这里我使用你最后一个问题的类使用超类型创建 MapFunction 时编译失败 https://stackoverflow.com/q/68174020/2096986。相同的代码没有.returns(TypeInformation.of(String.class))
编译但抛出运行时异常:
由于类型擦除,无法自动确定。你可以
通过使用 returns(...) 方法给出类型信息提示
转换调用的结果,或者让你的函数
实现“ResultTypeQueryable”接口。
public class NonInferrableReturnTypeStreamJob {
private final List<AbstractDataModel> abstractDataModelList;
private final ValenciaSinkFunction sink;
public NonInferrableReturnTypeStreamJob() {
this.abstractDataModelList = new ArrayList<AbstractDataModel>();
this.abstractDataModelList.add(new ConcreteModel("a", "1"));
this.abstractDataModelList.add(new ConcreteModel("a", "2"));
this.sink = new ValenciaSinkFunction();
}
public NonInferrableReturnTypeStreamJob(List<AbstractDataModel> abstractDataModelList, ValenciaSinkFunction sink) {
this.abstractDataModelList = abstractDataModelList;
this.sink = sink;
}
public static void main(String[] args) throws Exception {
NonInferrableReturnTypeStreamJob concreteModelTest = new NonInferrableReturnTypeStreamJob();
concreteModelTest.execute();
}
public void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(this.abstractDataModelList)
.map(new MyMapFunctionNonInferrableReturnType())
.returns(TypeInformation.of(String.class))
.addSink(sink);
env.execute();
}
}
如果您愿意,这里是此示例的集成测试:
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import org.sense.flink.examples.stream.valencia.ValenciaSinkFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertTrue;
public class NonInferrableReturnTypeStreamJobTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster;
private final int minAvailableProcessors = 4;
private final boolean runInParallel;
public NonInferrableReturnTypeStreamJobTest() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
this.runInParallel = availableProcessors >= minAvailableProcessors;
if (this.runInParallel) {
flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(minAvailableProcessors)
.setNumberTaskManagers(1)
.build());
}
}
@Test
public void execute() throws Exception {
List<AbstractDataModel> abstractDataModelList = new ArrayList<AbstractDataModel>();
abstractDataModelList.add(new ConcreteModel("a", "1"));
abstractDataModelList.add(new ConcreteModel("a", "2"));
ValenciaSinkFunction.values.clear();
NonInferrableReturnTypeStreamJob streamJob = new NonInferrableReturnTypeStreamJob(abstractDataModelList, new ValenciaSinkFunction());
streamJob.execute();
List<String> results = ValenciaSinkFunction.values;
assertEquals(2, results.size());
assertTrue(results.containsAll(Arrays.asList("1", "2")));
}
}