如何在 Flink 中将 HashMap 附加到 Configuration 对象?

2023-12-11

我想分享一个HashMap跨 Flink 中的每个节点并允许节点更新该 HashMap。到目前为止我有这个代码:

object ParallelStreams {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //Is there a way to attach a HashMap to this config variable?
    val config = new Configuration()
    config.setClass("HashMap", Class[CustomGlobal])
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    class CustomGlobal extends ExecutionConfig.GlobalJobParameters {
      override def toMap: util.Map[String, String] = {
        new HashMap[String, String]()
      }
    }

    class MyCoMap extends RichCoMapFunction[String, String, String] {
      var users: HashMap[String, String] = null
      //How do I get access the HashMap I attach to the global config here?
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
        val globalConf = globalParams[Configuration]
        val hashMap = globalConf.getClass

      }
      //Other functions to override here
    }
}

我想知道您是否可以将自定义对象附加到config此处创建的变量val config = new Configuration()? (请参阅上面代码中的注释)。

我注意到你只能附加原始值。我创建了一个扩展的自定义类ExecutionConfig.GlobalJobParameters并通过做附加该类config.setClass("HashMap", Class[CustomGlobal])但我不确定你是否应该这样做?


将参数分配给运算符的常见方法是将它们作为函数类中的常规成员变量。在计划构建期间创建和分配的功能对象被序列化并发送给所有工作人员。因此您不必通过配置传递参数。

这看起来如下

class MyMapper(map: HashMap) extends MapFunction[String, String] {
 // class definition
}


val inStream: DataStream[String] = ???

val myHashMap: HashMap = ???
val myMapper: MyMapper = new MyMapper(myHashMap)
val mappedStream: DataStream[String] = inStream.map(myMapper)

The myMapper对象被序列化(使用 Java 序列化)并传送以供执行。所以类型map必须实现JavaSerializable界面。

EDIT:我错过了您希望地图可以从所有并行任务中更新的部分。 Flink 不可能做到这一点。您必须完全复制地图并全部更新(通过广播)或使用外部系统(键值存储)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Flink 中将 HashMap 附加到 Configuration 对象? 的相关文章

  • 无法证明与路径相关类型的等价性

    为什么最后一个summon编译失败 我该怎么做才能让它编译 import java time LocalDateTime LocalTime trait Circular T type Parent given localTimeCircu
  • Scala:为什么 Actor 是轻量级的?

    是什么让演员如此轻盈 我什至不确定它们是如何工作的 它们不是单独的线程吗 当他们说轻量级时 他们的意思是每个参与者都没有映射到单个线程 JVM 提供共享内存线程 锁作为主要形式 并发抽象 但分享了 内存线程是相当重量级的 并招致严重的绩效处
  • Play 框架:异步与同步性能

    我有以下代码 def sync Action val t0 System nanoTime Thread sleep 100 val t1 System nanoTime Ok Elapsed time t1 t0 1000000 0 ms
  • Scala 如何使用我的所有核心?

    object PrefixScan sealed abstract class Tree A case class Leaf A a A extends Tree A case class Node A l Tree A r Tree A
  • 如何插入UUID的值?

    我在 Play Framework 2 3 支持的 postgresql 9 4 中使用 anorm 2 4 给出一个这样的模型 case class EmailQueue id UUID send from String send to
  • 如何将 csv 文件读取为键值对的映射

    我的 csv 文件中有数据 例如 value key A Name B Name C Name 24 Age 25 Age 20 Age M Gender F Gender 我想解析它以生成以下地图 Map Name gt List A B
  • 由于 UTFDataFormatException 导致 Spark 中的任务无法序列化:编码字符串太长

    我在 Yarn 上运行 Spark 应用程序时遇到一些问题 我有非常广泛的集成测试 运行时没有任何问题 但是当我在 YARN 上运行应用程序时 它将抛出以下错误 17 01 06 11 22 23 ERROR yarn Applicatio
  • 对 HList 进行协变过滤

    我打算以协变方式过滤 HList 我也想包含子类 所以协变滤波器Foo应捕获以下元素Foo也Bar 我已经构建了这个例子来尝试 lt lt 看看它是否做了我想做的事情 http scastie org 6465 http scastie o
  • Scala 中的超时未来

    假设我有一个函数 它调用一个阻塞可中断的手术 我想在超时的情况下异步运行它 也就是说 我想在超时到期时中断该功能 所以我正在尝试做这样的事情 import scala util Try import scala concurrent Fut
  • Spark SQL中如何按列降序排序?

    I tried df orderBy col1 show 10 但它是按升序排列的 df sort col1 show 10 也按升序排序 我查看了 stackoverflow 发现的答案都已过时或称为 RDD https stackove
  • 组合部分函数

    我有两个偏函数f and g 它们没有副作用并且执行速度快 将它们组合成另一个部分函数的最佳方法是什么h这样h isDefinedAt x iff f isDefinedAt x g isDefinedAt f x 如果h是一个返回一个函数
  • IntelliJ IDEA 13 给出有关不兼容类型的无效错误(Play 中的 Java-Scala-InterOp)

    我刚刚从 IDEA 12 升级到 13 社区版 从那时起 我在 IDEA 中收到关于我的游戏项目的类型不兼容的错误 Option
  • 如何识别远程参与者?

    我有一个远程参与者 客户端 它正在向另一个远程参与者 服务器 注册 然后注销 使用关闭挂钩 然而 虽然服务器接收到注销 但实际sender财产是一个不同的 Channel 对象 所以在我的服务器日志中我有 Registered new cl
  • Scala:如何定义带有变量参数列表的匿名函数?

    在 Scala 中 如何定义接受可变数量参数的匿名函数 scala gt def foo blah Int gt 3
  • scala 中的模拟案例类:Mockito

    在我的游戏应用程序中 我打算模拟一个案例类 我可以这样做 但它创建了一个所有成员变量都为空的对象 有没有办法创建案例类的模拟对象 以便该对象可以初始化一些成员 case class User name String address Stri
  • 在 scala 中保留推导的更高类型

    我有一个高阶类型 并致力于用它构建一些 DSL 我正在寻找一种方法来定义可以接受类型而无需显式指定此类型的函数 自我描述示例 class Wrap T val data T class DSL def doSomething T x Wra
  • 用 HashMap[Int, Vector[Int]] (Scala) 表示图(邻接列表)?

    我想知道如何 如果可能的话 我可以通过以下方式制作 可变 图的邻接列表表示HashMap Int Vector Int HashMap当然是可变的 目前我将其设置为HashMap Int ArrayBuffer Int 但我可以更改 Arr
  • 如何在 akka actor 中测试公共方法?

    我有一个 akka 演员 class MyActor extends Actor def recieve def getCount id String Int do a lot of stuff proccess id do more st
  • Spark Streaming 中是否需要检查点

    我注意到 Spark 流示例也有检查点代码 我的问题是检查点有多重要 如果是为了容错 那么在此类流应用程序中发生故障的频率是多少 这一切都取决于您的用例 假设您正在运行一个流作业 它仅从 Kafka 读取数据并计算记录数 如果您的应用程序在
  • Scala:将整个列表的 Either 与每个元素的 Either 组合

    我有一个 Either 列表 它代表错误 type ErrorType List String type FailFast A Either ErrorType A import cats syntax either val l List

随机推荐