我想使用 Flink 来使用来自 Kinesis 的 POJO。
是否有关于如何正确发送和反序列化消息的标准?
Thanks
我用以下方法解决了它:
DataStream<SamplePojo> kinesis = see.addSource(new FlinkKinesisConsumer<>(
"my-stream",
new POJODeserializationSchema(),
kinesisConsumerConfig));
and
public class POJODeserializationSchema extends AbstractDeserializationSchema<SamplePojo> {
private ObjectMapper mapper;
@Override
public SamplePojo deserialize(byte[] message) throws IOException {
if (mapper == null) {
mapper = new ObjectMapper();
}
SamplePojo retVal = mapper.readValue(message, SamplePojo.class);
return retVal;
}
@Override
public boolean isEndOfStream(SamplePojo nextElement) {
return false;
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)