Flink 键控流密钥为空

2023-11-23

我正在尝试在 Flink 中的 KeyedStream 上执行映射操作:

stream.map(new JsonToMessageObjectMapper())
                    .keyBy("keyfield")
                    .map(new MessageProcessorStateful())

JsonToObjectMapper 运算符的输出是类的 POJO消息对象其中有一个字符串字段 'keyfield'。然后将流键入该字段。

MessageProcessorStateful 是一个 RichMapFunction,如下所示:

public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {

    private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;
    ...
    @Override
    public void open(Configuration config) throws Exception {
        MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =
                    new MapStateDescriptor<>(
                        "state",                                                                                     // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information
                    state = getRuntimeContext().getMapState(descriptor);

        state.put(...); // Insert a key, value here. Exception here!

    }
}

代码抛出 NullPointer 异常:

Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
    at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
    at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:75)
    at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
    at org.myorg.quickstart.MessageStreamProcessor$MessageAdProcessorStateful.open(MessageStreamProcessor.java:226)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)

尽管我已验证“keyfield”始终是有效字符串,但 KeyedStream 之一的 keyedState 中的键似乎为 null。根据 Flink 文档,其余似乎是正确的。知道发生了什么事吗?


问题是您尝试访问中的键控状态open() method.

键控状态为每个键维护一个状态实例。在你的例子中你正在使用MapState。所以你有一个MapState每个键的实例。访问状态时,您始终会获得与当前处理的记录的键对应的状态实例。在一个MapFunction(就像在您的示例中一样)这将是传递给的记录map()方法。

Since open()没有用记录调用,当前的键在open() is null并且无法访问密钥状态。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink 键控流密钥为空 的相关文章

  • Java Swing BoxLayout 忽略 AlignmentX

    在下面的代码中 通过调用setAlignmentX with Component LEFT ALIGNMENT我希望在居中的滑块上获得左对齐的标签 由于某种原因 标签也居中 似乎与传递给 setAlignmentX 的值无关 我必须向 se
  • Java,顺序流在哪个线程中执行?

    在阅读有关流的文档时 我遇到了以下句子 attempting to access mutable state from behavioral parameters presents you with a bad choice if you
  • Java Junit 测试 HTTP POST 请求

    我需要测试以下方法而不改变方法本身 该方法向服务器发出 POST 方法 但我需要制作一个独立于服务器的测试用例 在将其重定向到本地文件之前 我测试了类似的方法 但为此我将协议指定为文件 主机名指定为 localhost 端口指定为 1 我的
  • Maven WebApp META-INF context.xml

    我正在使用 Maven 3 并且尝试在 webapp 文件夹下添加 META INF 文件夹 所以我正在尝试执行以下操作 src main webapp META INF context xml WEB INF 下面是我的 POM 文件
  • 使用 kryo 注册课程的策略

    我最近发现了 kryonet 库 它非常棒并且非常适合我的需求 然而 我遇到的一个问题是制定一种好的策略来注册所有可以转移的类 我知道我可以在每个对象中编写一个静态方法 该方法将返回它使用的所有类的列表 但我真的不想这样做 为了我自己的时间
  • 如何自动转换十六进制代码以将其用作 Java 中的 byte[]?

    我这里有很多十六进制代码 我想将它们放入 Java 中 而不需要向每个实体附加 0x 喜欢 0102FFAB 和我必须执行以下操作 byte test 0x01 0x02 0xFF 0xAB 我有很多很长的十六进制代码 有什么办法可以自动做
  • for循环中更新JLabel的问题

    我的程序的想法是从之前在其他 JFrame 中保存的列表中选择一个名称 我想在标签中一个接一个地打印所有名称 它们之间有很小的延迟 然后停在其中一个名称上 问题是lbl setText String 如果有多个则不起作用setText co
  • JERSEY:错误跟踪:java.lang.IllegalStateException:实体输入流已关闭

    我正在使用 Jersey 2 x 以下是我的控制器 GET Path id Produces application json public Response getUser PathParam id int userId Context
  • RxJava android mvp 单元测试 NullPointerException

    我是 mvp 单元测试的新手 我想对演示者进行一个非常基本的测试 它负责登录 我只想断言 view onLoginSuccess 这是演示者代码 public LoginPresenter LoginViewContract loginVi
  • 错误膨胀类 android.support.design.widget.NavigationView [启动时崩溃]

    该应用程序应该有一个导航抽屉 可以从左侧拉出并显示各种活动 但是一旦将导航栏添加到 XML Activity homescreen 文档中 应用程序一启动就会崩溃 主屏幕 java package com t99sdevelopment c
  • 使用 Guava Ordering 对对象列表进行多条件排序

    我有一个类无法实现可比较 但需要根据 2 个字段进行排序 我怎样才能用番石榴实现这一目标 假设班级是 class X String stringValue java util Date dateValue 我有一个清单 List
  • 如何在 spring-data 中强制使用 CrudRepository 进行预加载?

    我有一个实体 其中包含List就是这样lazy默认加载 interface MyEntityRepository extends CrudRepository
  • Janusgraph 0.3.2 + HBase 1.4.9 - 无法设置 graph.timestamps

    我在 Docker 容器中运行 Janusgraph 0 3 2 并尝试使用运行 HBase 1 4 9 的 AWS EMR 集群作为存储后端 我可以运行 gremlin server sh 但如果我尝试保存某些内容 我会得到粘贴在下面的堆
  • 从字节数组设置 img src

    我需要设置img src我在对象中拥有的字节数组的属性 img
  • 开发者环境-如何调用/消费其他微服务

    背景 我的环境 Java Play2 MySql 我在 Play2 gt S1 S2 S3 上编写了 3 个无状态 Restful 微服务 S1 消耗来自 S2 和 S3 的数据 因此 当用户点击 S1 时 该服务会异步调用 S2 S3 合
  • 在java中执行匿名pl/sql块并获取结果集

    我想执行匿名 PL SQL 并需要获取结果集对象 我得到了可以通过在 PL SQL 块内使用游标来完成的代码 但 PL SQL 块本身将以文本形式来自数据库 所以我无法编辑该 PL SQL 块 并且它只会返回两个值 其列名始终相同 它将返回
  • 检查按钮是否可用?如果没有,请等待 5 秒钟,然后再次检查?

    基本上我想看看此刻是否可以单击按钮 如果没有我想再试一次 所以我需要某种 goto 函数来返回到代码的前一行 尽管我怀疑我写得非常糟糕 但它本来可以做得更容易 try driver findElement By xpath button i
  • 从 InputStream 中删除换行符

    我喜欢从一个文件中删除所有换行符 对于 n 和 r n java io InputStream 在读取文件时 相应的方法如下所示 param target linkplain File return linkplain InputStrea
  • Java时区混乱

    我正在运行 Tomcat 应用程序 并且需要显示一些时间值 不幸的是 时间快到了 还有一个小时的休息时间 我调查了一下 发现我的默认时区被设置为 sun util calendar ZoneInfo id GMT 08 00 offset
  • Unicode(希腊语)字符存储在数据库中,例如“??????”

    数据库中的希腊字符就像问号 我找不到解决办法 我使用 Java Swing 开发了一个应用程序 但是当我在 MySQL 中插入希腊字母时 就像问号一样 我将数据库排序规则更改为 utf8 并将列也更改为 utf8 我的项目编码设置为UTF

随机推荐

  • 在 Windows 上更改 .gitconfig 位置

    默认情况下 在 Windows 上 Git 将全局 gitconfig 放置在c documents and settings user 我如何更改该位置以便 gitconfig 存储在c my configuration files 到底
  • Haskell:严格顺序执行外部命令

    如果我处于需要按顺序执行外部命令的情况 最好的解决方案是什么 例如 我有两个命令 制作快照 和 备份快照 在第一个完成之前 第二个无法开始 如果我将这两个命令有序地粘贴在 do 语法中 它们是否会依次执行 或者我是否必须手动检查并确保第一个
  • 无法从 xcode 7 推送到 gitlab

    升级到 Xcode 7 的一个令人不快的副作用是我无法再推送到我的 Gitlab 存储库 在过去的两年里 我一直在毫无问题地推送和拉取我的项目 所以我怀疑 Xcode 7 正在做一些不同的事情 我已经通过 Web 界面登录和注销来验证我的
  • 如何将粗体文本设置为Android Snackbar Action Text?

    我们可以使用 Snackbar 的 Action Text 设置颜色setActionTextColor如记录在https developer android com reference android support design wid
  • 如何在android画布上绘制一个实心三角形?

    所以我在 android 地图中使用以下代码在我的绘制方法中绘制这个三角形 paint setARGB 255 153 29 29 paint setStyle Paint Style FILL AND STROKE paint setAn
  • com.android.camera.action.CROP 替代方案?

    它似乎com android camera action CROP不可靠 因为它是内部 API 并非在所有设备上都可用 不过 我发现这个库非常实用 它在我的 Galaxy Nexus 上运行良好 我真的应该考虑实施自己的解决方案吗 我使用它
  • 设置与高度/宽度相关的图像 DPI C#

    我正在编写一个应用程序来将一些图像发送给第三方 并且图像必须为 200x200 DPI 该图像是位图 尺寸为 500 宽度和 250 高度 我第一次与第三方测试图像时 我的分辨率不正确 我只是用过image SetResolution 20
  • 如何使用 touchmove 使 mousemove 事件适用于触摸屏?

    我正在开发一个画布 JavaScript 通过 mousemove 事件你可以擦除背景 现在我正在尝试在触摸屏 移动设备 上获得相同的体验 如何同时为我的代码提供 mousemove 和 touchmove 事件 function Crea
  • 如何向 axlsx 中的单元格添加超链接?

    随着spreadsheet宝石 你可以跑Spreadsheet Link new http hyperlinkhere com Some words 制作一个电子表格 其中的单元格包含字符串 Some Words 并带有指向 的超链接 ht
  • 如何使用 C# SendKeys 以编程方式按下 Windows 键

    基本上我想在代码中模拟用户单击 Windows 键 我知道有 SendKeys 如果我得到了按键的句柄 它允许我将按键发送到窗口 但我不知道我需要获取什么句柄才能发送 Windows 按键命令 例如 Windows 键 L 读过一点后 似乎
  • Rails render_to_string 在部分视图中给出错误

    我正进入 状态ActionView MissingTemplate在代码下方使用带有部分视图的 render to string 方法时出错 bizz render to string partial gt biz new layout g
  • SQL Server Management Studio 无法连接

    我已经安装了 SQL Server Management Studio 2014 在 连接到服务器 窗口中 我选择服务器类型为 数据库引擎 服务器名称为 本地 但是当我尝试连接时 显示 n 错误 标题 连接到服务器 无法连接到 附加信息 与
  • Telnet IAC命令应答

    我正在尝试与套接字协商 telnet 连接 套接字正在工作 但服务器告诉我 login The 表示 255 253 1 255 253 31 255 251 1 255 251 3 我阅读了所有 RFC 文档 但我不明白我应该响应什么才能
  • 如何使用 CMake 在链接命令行末尾添加标志?

    我有一个 CMake 的问题无法检测到 pthread 作为一种解决方法 我尝试过 set CMAKE EXE LINKER FLAGS CMAKE EXE LINKER FLAGS lpthread 然而 这插入 lpthread在错误的
  • Java 8 偏移日期解析

    我需要解析以下格式的字符串2015 01 15 05 00UTC 中的 LocalDate 或其他 问题是下面的代码 System out println LocalDate parse 2015 01 15 05 00 DateTimeF
  • 独立数据库

    我目前正在 Net 中设计一个小型应用程序 过去我一直使用 MSAccess 作为独立数据库 可以随该程序一起提供 但我想知道在当今时代是否没有替代解决方案 可以轻松集成到 Net 应用程序中 对用户透明 并且在处理大量数据时可能具有更好的
  • 拖动和平移完成后获取中心坐标

    我想跟踪地图中心的坐标 到目前为止我一直在使用这个 On Drag End google maps event addListener map dragend function map center coords latitude html
  • Python 帮助读取 csv 文件由于行结束而失败

    我正在尝试创建此脚本 该脚本将检查计算机主机名 然后在主列表中搜索该值以返回 csv 文件中的相应值 然后打开另一个文件并进行查找替换 我知道这应该很容易 但以前没有在 python 中做过这么多 这是我到目前为止所拥有的 masterli
  • Rust是如何实现反射的?

    铁锈具有Any特点 但它也有 不为不使用的东西付费 的政策 Rust是如何实现反射的 我的猜测是 Rust 使用惰性标记 每个类型最初都是未分配的 但后来如果该类型的实例被传递给需要一个Any特征 类型被分配一个TypeId 或者 Rust
  • Flink 键控流密钥为空

    我正在尝试在 Flink 中的 KeyedStream 上执行映射操作 stream map new JsonToMessageObjectMapper keyBy keyfield map new MessageProcessorStat