我想分享一个HashMap
跨 Flink 中的每个节点并允许节点更新该 HashMap。到目前为止我有这个代码:
object ParallelStreams {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Is there a way to attach a HashMap to this config variable?
val config = new Configuration()
config.setClass("HashMap", Class[CustomGlobal])
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
class CustomGlobal extends ExecutionConfig.GlobalJobParameters {
override def toMap: util.Map[String, String] = {
new HashMap[String, String]()
}
}
class MyCoMap extends RichCoMapFunction[String, String, String] {
var users: HashMap[String, String] = null
//How do I get access the HashMap I attach to the global config here?
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
val globalConf = globalParams[Configuration]
val hashMap = globalConf.getClass
}
//Other functions to override here
}
}
我想知道您是否可以将自定义对象附加到config
此处创建的变量val config = new Configuration()
? (请参阅上面代码中的注释)。
我注意到你只能附加原始值。我创建了一个扩展的自定义类ExecutionConfig.GlobalJobParameters
并通过做附加该类config.setClass("HashMap", Class[CustomGlobal])
但我不确定你是否应该这样做?
将参数分配给运算符的常见方法是将它们作为函数类中的常规成员变量。在计划构建期间创建和分配的功能对象被序列化并发送给所有工作人员。因此您不必通过配置传递参数。
这看起来如下
class MyMapper(map: HashMap) extends MapFunction[String, String] {
// class definition
}
val inStream: DataStream[String] = ???
val myHashMap: HashMap = ???
val myMapper: MyMapper = new MyMapper(myHashMap)
val mappedStream: DataStream[String] = inStream.map(myMapper)
The myMapper
对象被序列化(使用 Java 序列化)并传送以供执行。所以类型map
必须实现JavaSerializable
界面。
EDIT:我错过了您希望地图可以从所有并行任务中更新的部分。 Flink 不可能做到这一点。您必须完全复制地图并全部更新(通过广播)或使用外部系统(键值存储)。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)