在多个转换中保持键控状态

2024-01-02

我有一个流,我想使用某个键对其进行分区,然后运行多个转换,每个转换使用一个状态。当我打电话时keyBy()我得到一个KeyedStream下一个转换可以正确访问分区状态,但之后链接的另一个转换在尝试访问分区状态时会出现异常。例外的是:

状态密钥序列化器尚未在配置中配置。该操作不能使用分区状态

看来关键信息只传递到第一个转换,而不是进一步传递到链下。

我尝试运行的代码与此代码类似(但实际上做了一些事情):

DataStream<Long> newStream = eventsStream
    .keyBy("username")
    .filter(new RichFilterFunction<Event>() {
        private ValueState<Boolean> stateStore;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE1", Boolean.class, Boolean.TRUE));
        }

        @Override
        public boolean filter(Event value) throws Exception {
            return stateStore.value();
        }
    })
    .map(new RichMapFunction<Event, Long>() {
        private ValueState<Long> stateStore;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE2", Long.class, 0L));
        }

        @Override
        public Long map(Event value) throws Exception {
            return Long.parseLong(value.data) + stateStore.value();
        }
    });

这段代码将在第二次抛出异常getState() call.

我可以打电话keyBy()再次,但随后我删除了链接操作的能力。我可以手动操作流图的对象以便传递关键信息,还是不支持这种链接?


你不能。

即使你会打电话keyBy()第二次(或以某种方式将“键控”信息传递给下游),您将获得一个新状态,因为状态仅与单个操作员关联。

作为解决方法,您需要将两个运算符合并为一个。

如果您认为此功能可能有帮助,请随时在以下位置提出建议:[email protected] /cdn-cgi/l/email-protection.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在多个转换中保持键控状态 的相关文章

  • 使用 flink runner 在梁上进行 Tensorflow 变换

    这可能看起来很愚蠢 但这是我在这里发表的第一篇文章 抱歉做错了什么 我目前正在使用 python2 7 使用 TFX 0 11 即 tfdv tft tfserving 和tensorflow 1 11 构建一个简单的 ML 管道 我目前有
  • Flink 作业在集群节点上的分布

    我们有 4 个作业 运行在 3 个节点上 每个节点有 4 个槽位 在 Flink 1 3 2 上 作业均匀分布在每个节点上 升级到 flink 1 5 后 每个作业都在单个节点上运行 如果没有剩余插槽 则可以转移到另一个节点 有没有办法恢复
  • 如何在 flink 独立安装上进行 kerberos 身份验证?

    我有一个独立的 Flink 安装 我想在其上运行一个将数据写入 HDFS 安装的流作业 HDFS 安装是 Cloudera 部署的一部分 需要 Kerberos 身份验证才能读取和写入 HDFS 由于我没有找到有关如何使 Flink 与受
  • ClassNotFoundException:使用 kafka 主题时出现 org.apache.flink.streaming.api.checkpoint.CheckpointNotifier

    我正在使用最新的 Flink 1 1 2 Hadoop 27 和 flink connector kafka 0 10 2 hadoop1 jar Flink消费者如下 StreamExecutionEnvironment env Stre
  • 在多个转换中保持键控状态

    我有一个流 我想使用某个键对其进行分区 然后运行多个转换 每个转换使用一个状态 当我打电话时keyBy 我得到一个KeyedStream下一个转换可以正确访问分区状态 但之后链接的另一个转换在尝试访问分区状态时会出现异常 例外的是 状态密钥
  • Apache Flink RollingFileAppender

    我正在使用 Apache Flink v1 2 我想切换到滚动文件附加程序 以避免包含几天数据的巨大日志文件 然而它似乎不起作用 我调整了 log4j 配置 log4j properties 如下 log4j appender file o
  • 是否可以将 Riak CS 与 Apache Flink 一起使用?

    我要配置filesystem状态后端和zookeeper恢复模式 state backend filesystem state backend fs checkpointdir recovery mode zookeeper recover
  • 无法执行 HTTP 请求:Flink 中等待来自池的连接超时

    我正在研究一个将一些文件上传到 s3 存储桶的应用程序稍后 它从 s3 存储桶读取文件并将其推送到我的数据库 我在用着弗林克1 4 2 and fs s3a API用于从 s3 存储桶读取和写入文件 将文件上传到 s3 存储桶工作正常 没有
  • 由于无法找到或加载主类错误,Flink 集群未启动

    我正在尝试设置flink并运行集群 尽管我得到以下输出 看起来集群已启动 bin start cluster sh Starting cluster Starting standalonesession daemon on host LAP
  • Flink 处理事件太慢

    我使用 Kinesis 数据流作为源 使用 elasticsearch 作为接收器 在 AWS Kinesis Data 分析应用程序中运行 Flink 作业 事件示例 area sessions userId 4450 date 2021
  • 根据 Flink 的模式使用 GCS 文件

    由于 Flink 支持 Hadoop 文件系统抽象 并且有一个GCS连接器 https github com GoogleCloudPlatform bigdata interop 在 Google Cloud Storage 之上实现它的
  • 无法在 Flink 新 Kafka Consumer-api (1.14) 中的检查点上向 Kafka 提交消费偏移量

    我使用以下代码引用 Kafka 源连接器的 Flink 1 14 版本 我期待以下要求 在应用程序刚开始时必须读取 Kafka 主题的最新偏移量 在检查点上 它必须将消耗的偏移量提交给 Kafka 重新启动后 当应用程序手动终止 系统错误时
  • Apache Flink:如何从 Cassandra 读取数据流/数据集?

    我尝试将 Cassandra 视为 Flink 中的数据源 并使用以下链接中提供的信息 从 Cassandra 读取数据以在 Flink 中进行处理 https stackoverflow com questions 43067681 re
  • Apache Flink:KeyedStream 上的数据分布不均匀

    我在 Flink 中有这样的 Java 代码 env setParallelism 6 Read from Kafka topic with 12 partitions DataStream
  • Flink:处理数据早于应用程序水印的键控流

    我正在使用带有运动源和事件时间键控窗口的 F link 该应用程序将监听实时数据流 窗口 事件时间窗口 并处理每个键控流 我有另一个用例 我还需要能够支持某些关键流的旧数据的回填 这些将是事件时间 鉴于我正在使用水印 这会成为一个问题 因为
  • Apache Beam 计数器/指标在 Flink WebUI 中不可用

    我正在使用 Flink 1 4 1 和 Beam 2 3 0 并且想知道是否可以在 Flink WebUI 或任何地方 中提供可用的指标 如 Dataflow WebUI 中那样 我用过类似的计数器 import org apache be
  • Flink 流顺序

    Flink 能保证流的执行顺序吗 我有两个 Kafka 主题 每个主题都有一个分区 流 1 和流 2 并使用keyBy 流由一个处理coprocess功能 在我的测试过程中 我可以看到两个流的内容并不总是按顺序执行 我可以将并行度设置为 1
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • Apache Flink - “keyBy”中的异常处理

    由于代码错误或缺乏验证 进入 Flink 作业的数据可能会触发异常 我的目标是提供一致的异常处理方式 我们的团队可以在 Flink 作业中使用这种方式 而不会导致生产中出现任何停机 重启策略似乎不适用于此处 因为 简单的重启无法解决问题 我
  • Flink中为什么DataStream不支持聚合

    我是 Flink 的新手 有时 我想在 DataStream 上进行聚合 而不需要先执行 keyBy 为什么 Flink 不支持 DataStream 上的聚合 sum min max 等 谢谢你 艾哈迈德 Flink 支持非 keyed

随机推荐

  • ActiveSync 客户端 Java 实现

    我的公司正在开发一个桌面和移动电子邮件客户端项目 该客户端可以通过用户或服务器管理员的最少配置连接到不同的邮件服务器 由于我们想要支持 Microsoft Exchange 因此我们似乎必须在 Java 中实现 ActiveSync 协议
  • 使用电话号码格式 NaN 屏蔽 EditText,就像 PhoneNumberUtils 中一样

    我想让用户在 editText 中输入电话号码 以便每次用户输入号码时动态更改格式 也就是说 当用户输入最多 4 位数字 例如 7144 时 editText 显示 714 4 我希望每当用户输入数字时 editText 就会动态更新为格式
  • HashLocationStrategy 在路由时不生成 # 个位置?

    我正在运行 Angular 2 beta 0 并且正在搞乱路由 这是我所拥有的 应用程序组件 import Component provide from angular2 core import bootstrap from angular
  • 使用 Vue-router 进行 Firebase 身份验证检查

    问题是 vue router 的 beforeEnter 比 main js 中的 beforeCreate 钩子更早触发 并且有第二个延迟 而在重新加载 vuex 操作后将用户设置为状态 这会导致用户被弹回登录页面 如何延迟 vue ro
  • fork后的变量

    这是一个代码 int i 0 pid t pid puts Hello World puts pid fork if pid i 42 printf p n i printf d n i puts 并输出 Hello World 0x7ff
  • 应用程序关闭时如何处理推送负载?

    我正在向我的用户发送包含以下内容的推送负载 aps alert Go To Google sound Default url http www google com 当应用程序在后台运行时 一切顺利 如果我收到推送并且应用程序已关闭 我打开
  • 使用 imread 函数读取 opencv 中的 jpg 文件时是否有任何可能的原因?

    最近在python中使用opencv 正如我注意到的 当我想导入时cv2python中的模块 我需要添加cv2 so使用以下命令手动将文件路径设置为系统路径 sys path append path to cv so 但是 当我想在 ipy
  • 如何正确设置 django-debug-toolbar 的内部 IP

    我第一次编辑setting py文件输入google cloud computing请原谅我这个愚蠢的问题 我想跑django debug toolbar并遵循该教程中的每一步 我想要的是工具栏在 our office only 所以我只是
  • 如何在 Perl 中轻松解析

    我想将网站解析为 Perl 数据结构 首先我加载页面 use LWP Simple my html get http f oo 现在我知道了两种处理方法 首先是正则表达式 其次是模块 我开始阅读有关HTML 解析器 http p3rl or
  • 从 JSON 中选择随机对象[重复]

    这个问题在这里已经有答案了 我有以下代码 getJSON js questions1 json done function data window questionnaire data console log window question
  • Haskell 中遵守模态公理的有趣运算符

    我只是在看类型map a gt b gt a gt b 这个函数的形状让我想知道我们是否可以将列表形成运算符 视为遵守正常模态逻辑 例如 T S4 S5 B 常见的各种公理 因为我们似乎至少有 K 正规模态逻辑公理 其中 a gt b gt
  • git 从 github 远程存储库导出

    我想从 github 远程存储库导出 而不是克隆它 与 svn export 类似 我不想用它获取 git 文件夹 我可以通过克隆和删除 git 文件夹来解决这个问题 我想知道是否有更清洁的方法 我在某个地方读过它 你可以使用 git ar
  • 流星 $and 与 $or

    我正在尝试在 Meteor 中为我的 mongo 查询执行 and 然后 or 我有以下内容 但它似乎不起作用 希望查询匹配 OrganizationId 键在变量 user organizationId 中具有值且类型键为 convert
  • 将 R 数据表列从 JSON 转换为数据表

    我有一个包含 JSON 数据的列 如下例所示 library data table test lt data table a list 1 2 3 info list duration 10 country US duration 20 c
  • 是否可以在 pdf 中向使用 R knit::kable 生成的表格添加垂直线?

    我想制作一张桌子knitr kable边界上和某些列之间有垂直线 有办法做到吗 我的输出文档是pdf Thanks 不太清楚 但也许这可以帮助 library knitr library kableExtra library dplyr d
  • 如何关闭笔记本中的初始化单元?

    在我的笔记本中 我有一个相当大的初始化单元 如何使其可关闭 我的意思是我怎样才能将这个单元格卷成一行并能够将其展开呢 经典的解决方案是将初始化单元放入其自己的部分 Alt 4 标题为 初始化 此部分位于笔记本的开头或结尾 要隐藏内容 请关闭
  • JDialog 让主应用程序失去焦点

    我想知道为什么我的 JDialog 将我的主应用程序推入后台 这意味着 如果显示 JDialog 并且用户单击 确定 或 取消 则主应用程序将失去焦点并将被推入后台 经过调查 我发现 只有当我在显示 JDialog 时禁用主框架时 才会发生
  • 我的 Apache CXF 客户端出现问题

    我正在尝试编写一个 Apache CXF JAX WS 客户端 我的依赖项pom xml are
  • 如何禁用 html5 canvas 元素的选择

    我监听 html5 画布内的点击事件 它工作得很好 但是 当我单击图像上的任意位置时 浏览器会突出显示该图像 就好像它已被选中一样 类似于在页面上单击时图像可能会突出显示的方式 我很好奇是否有人知道如何禁用选择 html 元素 例如画布 我
  • 在多个转换中保持键控状态

    我有一个流 我想使用某个键对其进行分区 然后运行多个转换 每个转换使用一个状态 当我打电话时keyBy 我得到一个KeyedStream下一个转换可以正确访问分区状态 但之后链接的另一个转换在尝试访问分区状态时会出现异常 例外的是 状态密钥