我使用 Flink(最新通过 git)从 kafka 流式传输到 cassandra。为了简化单元测试,我通过 Dagger 添加依赖注入。
ObjectGraph 似乎已正确设置自身,但“内部对象”被 Flink 标记为“不可序列化”。如果我直接包含这些对象,它们就可以工作 - 那么有什么区别呢?
有问题的类实现地图功能 and @Inject一个用于 cassandra 的模块和一个用于读取配置文件的模块。
有没有办法构建这个,以便我可以使用后期绑定,或者 Flink 是否使这变得不可能?
Edit:
fwiw - 依赖注入(通过 dagger)和丰富的地图功能不能共存。 Dagger 不会让你包含任何具有extends在他们的定义中。
Further:
通过 Dagger Lazy 实例化的对象也不会序列化。
线程“main”中的异常 org.apache.flink.api.common.InvalidProgramException:对象 com.someapp.SaveMap@2e029d61 不可序列化
...
引起原因:java.io.NotSerializedException:dagger.internal.LazyBinding$1
在深入讨论问题的具体细节之前,先了解一下 Apache Flink 中函数的可序列化性的一些背景知识:
可串行化
Apache Flink 使用 Java 序列化 (java.io.Serialized) 来传送函数对象(此处为MapFunction
)到并行执行它们的工作人员。因此,函数需要可序列化:函数不能包含任何不可序列化字段,即非原始类型(int、long、double...)且未实现java.io.Serializable
.
使用不可序列化构造的典型方法是延迟初始化它们。
延迟初始化
在 Flink 函数中使用不可序列化类型的一种方法是延迟初始化它们。保存这些类型的字段仍然是null
当函数被序列化以进行交付时,并且仅在函数被工作人员反序列化后才设置。
在 Scala 中,您可以简单地使用惰性字段,例如lazy val x = new NonSerializableType()
. The NonSerializableType
类型实际上仅在第一次访问变量时创建x
,通常在工人身上。因此,该类型可能是不可序列化的,因为x
当函数被序列化以运送给工作人员时,为 null。
在Java中,您可以初始化不可序列化的字段open()
函数的方法,如果你把它做成功能丰富。丰富的功能(如RichMapFunction
)是基本函数的扩展版本(这里MapFunction
)并让您可以访问生命周期方法,例如open()
and close()
.
惰性依赖注入
我对依赖注入不太熟悉,但 dagger 似乎也提供了类似于惰性依赖的东西,这可能有助于作为一种解决方法,就像 Scala 中的惰性变量一样:
new MapFunction<Long, Long>() {
@Inject Lazy<MyDependency> dep;
public Long map(Long value) {
return dep.get().doSomething(value);
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)