我已经实施了MapFunction
对于我的 Apache Flink 流程。它正在解析传入元素并将其转换为其他格式,但有时会出现错误(即传入数据无效)。
我看到两种可能的处理方法:
- 忽略无效元素,但似乎我无法忽略错误,因为对于任何传入元素,我必须提供传出元素。
- 将传入元素拆分为有效和无效,但似乎我应该为此使用其他函数。
所以,我有两个问题:
- 如何正确处理我的错误
MapFunction
?
- 如何正确实现这样的转换函数呢?
你可以使用FlatMapFunction
代替MapFunction
。这将允许您仅发出有效的元素。下面展示了一个示例实现:
input.flatMap(new FlatMapFunction<String, Long>() {
@Override
public void flatMap(String input, Collector<Long> collector) throws Exception {
try {
Long value = Long.parseLong(input);
collector.collect(value);
} catch (NumberFormatException e) {
// ignore invalid data
}
}
});
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)