我有一个流,我想使用某个键对其进行分区,然后运行多个转换,每个转换使用一个状态。当我打电话时keyBy()
我得到一个KeyedStream
下一个转换可以正确访问分区状态,但之后链接的另一个转换在尝试访问分区状态时会出现异常。例外的是:
状态密钥序列化器尚未在配置中配置。该操作不能使用分区状态
看来关键信息只传递到第一个转换,而不是进一步传递到链下。
我尝试运行的代码与此代码类似(但实际上做了一些事情):
DataStream<Long> newStream = eventsStream
.keyBy("username")
.filter(new RichFilterFunction<Event>() {
private ValueState<Boolean> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE1", Boolean.class, Boolean.TRUE));
}
@Override
public boolean filter(Event value) throws Exception {
return stateStore.value();
}
})
.map(new RichMapFunction<Event, Long>() {
private ValueState<Long> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE2", Long.class, 0L));
}
@Override
public Long map(Event value) throws Exception {
return Long.parseLong(value.data) + stateStore.value();
}
});
这段代码将在第二次抛出异常getState()
call.
我可以打电话keyBy()
再次,但随后我删除了链接操作的能力。我可以手动操作流图的对象以便传递关键信息,还是不支持这种链接?
你不能。
即使你会打电话keyBy()
第二次(或以某种方式将“键控”信息传递给下游),您将获得一个新状态,因为状态仅与单个操作员关联。
作为解决方法,您需要将两个运算符合并为一个。
如果您认为此功能可能有帮助,请随时在以下位置提出建议:[email protected] /cdn-cgi/l/email-protection
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)