flink - 使用匕首注入 - 不可序列化?

2024-04-30

我使用 Flink(最新通过 git)从 kafka 流式传输到 cassandra。为了简化单元测试,我通过 Dagger 添加依赖注入。

ObjectGraph 似乎已正确设置自身,但“内部对象”被 Flink 标记为“不可序列化”。如果我直接包含这些对象,它们就可以工作 - 那么有什么区别呢?

有问题的类实现地图功能 and @Inject一个用于 cassandra 的模块和一个用于读取配置文件的模块。

有没有办法构建这个,以便我可以使用后期绑定,或者 Flink 是否使这变得不可能?


Edit:

fwiw - 依赖注入(通过 dagger)和丰富的地图功能不能共存。 Dagger 不会让你包含任何具有extends在他们的定义中。

Further:

通过 Dagger Lazy 实例化的对象也不会序列化。

线程“main”中的异常 org.apache.flink.api.common.InvalidProgramException:对象 com.someapp.SaveMap@2e029d61 不可序列化
...
引起原因:java.io.NotSerializedException:dagger.internal.LazyBinding$1


在深入讨论问题的具体细节之前,先了解一下 Apache Flink 中函数的可序列化性的一些背景知识:

可串行化

Apache Flink 使用 Java 序列化 (java.io.Serialized) 来传送函数对象(此处为MapFunction)到并行执行它们的工作人员。因此,函数需要可序列化:函数不能包含任何不可序列化字段,即非原始类型(int、long、double...)且未实现java.io.Serializable.

使用不可序列化构造的典型方法是延迟初始化它们。

延迟初始化

在 Flink 函数中使用不可序列化类型的一种方法是延迟初始化它们。保存这些类型的字段仍然是null当函数被序列化以进行交付时,并且仅在函数被工作人员反序列化后才设置。

  • 在 Scala 中,您可以简单地使用惰性字段,例如lazy val x = new NonSerializableType(). The NonSerializableType类型实际上仅在第一次访问变量时创建x,通常在工人身上。因此,该类型可能是不可序列化的,因为x当函数被序列化以运送给工作人员时,为 null。

  • 在Java中,您可以初始化不可序列化的字段open()函数的方法,如果你把它做成功能丰富。丰富的功能(如RichMapFunction)是基本函数的扩展版本(这里MapFunction)并让您可以访问生命周期方法,例如open() and close().

惰性依赖注入

我对依赖注入不太熟悉,但 dagger 似乎也提供了类似于惰性依赖的东西,这可能有助于作为一种解决方法,就像 Scala 中的惰性变量一样:

new MapFunction<Long, Long>() {

  @Inject Lazy<MyDependency> dep;

  public Long map(Long value) {
    return dep.get().doSomething(value);
  }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink - 使用匕首注入 - 不可序列化? 的相关文章

随机推荐

  • 在 OSX 10.6 中以编程方式与连接的 iOS/iPod 进行交互

    我正在尝试开发一个简单的菜单栏应用程序 让我可以直观地看到所连接的 iPhone iPad 的充电状态 而无需启动 iTunes 经过长时间的搜索 我无法断定 Snow Leopard API 是否可以实现这一点 我的搜索引导我找到了一个在
  • 如何在 RHEL 上更新 git 版本?

    我刚刚在 GCP 上创建了一个新的 RHEL 虚拟机来在上面运行一些 Kubernetes 它上面没有安装任何 git I used yum包管理器在其上安装git 但它没有安装最新版本的git 当前版本 2 38 0 2022 年 10
  • MATLAB:涉及大数的计算

    如何在 MATLAB 中执行涉及大量数字的计算 举一个简单的例子 任意精度计算器将显示 1 120 132 370 260 约为 1 56 但 MATLAB 无法执行此类计算 power 120 132 factorial 370 fact
  • .NET 中的嵌套事务

    我怎样才能执行与此等效的操作 我的理解是 这对于 TransactionScopes 来说是不可能的 https stackoverflow com questions 2741988 nested child transactionsco
  • 什么是 ADO.NET?

    我编写了一些 Access 数据库并使用了一些轻型 VBA 并且有一个 OO 类 现在我正在编写一个 C 数据库应用程序 我已经安装并连接了 VS 和 System Data SQLite 并输入了我的表和列 但这就是我陷入困境的地方 我正
  • 如何在 $match 内的 mongodb 聚合查询中使用 $regex

    我正在尝试使用 regex http docs mongodb org manual reference operator regex within match http docs mongodb org manual reference
  • 具有动态参数的 Oracle Lag 函数

    我有一个具体的问题 我有一个包含无效值的表 我需要替换无效值 此处0 与之前的值大于0 困难是 使用更新或插入对我来说是不合适的 游标和更新就可以了 我唯一的方法是使用 Select 语句 当我使用lag col1 1 当情况发生时 我只得
  • 从应用程序中删除死代码的最佳方法是什么? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我经常觉得 在多次迭代我的代码之后 我留下了一些函数 类或其他代码行 这些代码在以前的版本中有意义 但对于新版本来说并不是很有用 我知道探查器可
  • 如何使 scalatest 与 Spraytestkit 和 HttpServiceActor 一起工作

    我在看Spray 1 3 1 测试套件文档 http spray io documentation 1 2 1 spray testkit 但找不到下面我需要的正确示例 我有这个样本spray 1 3 1 service trait MyS
  • 如何在java中创建保留方法参数注释的动态代理?

    我目前正在尝试代理一些现有的 JAX RS 资源 以便允许我使用 Hibernate Validator 的方法验证支持 但是 当我代理我的类 当前使用 cglib 2 2 时 代理类中的参数上不存在 FormParam 注释 因此 JAX
  • firebase:“无法解析身份验证令牌。”

    你好 我正在使用 firebase 和 php 并使用这个library https github com ktamas77 firebase php supported commands 我收到此错误 无法解析身份验证令牌 我的身份验证令
  • Visual Studio Professional 2013 Update 5 中的“在线服务不可用”问题

    我一直在 vscode 2013 工作直到今天 但是 当我今天早上尝试打开 IDE 时 它收到 您的许可证已过时 必须更新 错误消息 如下所示 在我尝试更新许可证和登录操作之后 当我尝试这两个时 它得到 在线服务不可用 请稍后重试 错误消息
  • 将文本环绕在 div 两侧

    这是我试图实现的目标 与以下HTML div p some text p div Awesome content div div 有这个 text text text text text text text text text text t
  • data.table .SD 的 LHS 为 :=

    这是参考一个不同的问题 https stackoverflow com questions 26804362 iteratively create columns based on grouped variables 26805158 26
  • 从 IntelliJ IDEA 中的版本控制中删除文件

    我正在使用 IntelliJ IDEA 社区版 2016 1 我已将我的项目放在 Git 下 并将其托管在 GitHub 上 When I first hit that Share Project on GitHub button ever
  • 转储 $mft 文件的内容

    对于一些商业的我正在做的项目我需要能够读取 mft 文件中存储的实际数据 我找到了一个gpl lib http www codeproject com KB files NTFSParseLib aspx artkw ntfs这可能会有所帮
  • 如何在 Xcode 中保持标题注释最新

    Xcode 习惯于将各种 冗余 信息放在它创建的每个代码文件的顶部 其中包含版权声明 类名 项目名和客户端名称 不管你喜欢与否 一旦你创建了一个新的类 A 然后将其重构为 B 信息就已经是错误的 评论会一直说这是 A h 或 A m 此外
  • 对所有列实施搜索过滤器

    我在 PostgreSQL 中找到了这个搜索示例http www postgresql org docs current interactive textsearch tables html TEXTSEARCH TABLES SEARCH
  • 使用接口将泛型委托转换为另一种类型

    使用 NET 4 0 好的 所以我有 private Dictionary
  • flink - 使用匕首注入 - 不可序列化?

    我使用 Flink 最新通过 git 从 kafka 流式传输到 cassandra 为了简化单元测试 我通过 Dagger 添加依赖注入 ObjectGraph 似乎已正确设置自身 但 内部对象 被 Flink 标记为 不可序列化 如果我