我可以将自定义分区器与 group by 一起使用吗?

2024-01-07

假设我知道我的数据集不平衡并且我知道键的分布。我想利用它来编写一个自定义分区器,以充分利用运算符实例。

我知道关于数据流#partitionCustom https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#partitionCustom-org.apache.flink.api.common.functions.Partitioner-org.apache.flink.api.java.functions.KeySelector-。但是,如果我的流被锁定,它仍然可以正常工作吗?我的工作看起来像这样:

KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())

DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()

我想要实现的是:

  • 根据某个键拥有一个流 keyBy ,以便仅使用该键中的元素调用reduce函数。
  • 该组根据一些自定义分区将工作拆分到节点之间。
  • 自定义分区根据并行运算符实例的数量返回一个数字(该数字将被修复并且不会重新缩放)。
  • 自定义分区从 keyBy 返回不同的值。然而,keyBy(x) = keyBy(y) => partition(x) = partition(y).
  • Having 预聚合 https://stackoverflow.com/questions/51634189/does-flink-support-map-side-aggregations-streaming在分区之前最大限度地减少网络流量。

用例示例:

  • 数据集:[(0, A), (0, B), (0, C), (1, D), (2, E)]
  • 并行算子实例数量:2
  • 按函数分组:返回该对的第一个元素
  • 分区函数:对于键 0 返回 0,对于键 1 和 2 返回 1。优点:处理可能将键 0 和 1 发送到同一运算符实例的数据倾斜,这意味着一个运算符实例将接收 80% 的数据集。

不幸的是这是不可能的。DataStreamUtils.reinterpretAsKeyedStream()要求数据进行相同的分区,就像您调用keyBy().

造成此限制的原因是密钥组以及密钥如何映射到密钥组。密钥组是 Flink 分配密钥状态的单位。键组的数量决定了算子的最大并行度,配置为setMaxParallelism()。密钥通过内部哈希函数分配给密钥组。通过更改密钥的分区,同一密钥组的密钥将分布在多台机器上,这是行不通的。

为了调整机器的密钥分配,您需要更改密钥组的密钥分配。但是,没有公共或可访问的接口来执行此操作。因此,Flink 1.6 中不支持自定义密钥分配。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

我可以将自定义分区器与 group by 一起使用吗? 的相关文章

  • 如何构建和使用flink-connector-kinesis?

    我正在尝试将 Apache Flink 与 AWS kinesis 结合使用 这document https ci apache org projects flink flink docs release 1 7 dev connector
  • 下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息

    我的目标是使用kafka读取json格式的字符串 对字符串进行过滤 然后将消息接收出来 仍然是json字符串格式 出于测试目的 我的输入字符串消息如下所示 a 1 b 2 我的实现代码是 def main args Array String
  • Apache Flink 使用 Windows 在写入 Sink 之前引发延迟

    我想知道 Flink 窗口是否可能导致从数据进入管道到写入 Cassandra 中的表之间有 10 分钟的延迟 我最初的意图是将每个事务写入 Cassandra 中的一个表 并在 Web 层使用范围键查询该表 但由于数据量很大 我正在考虑延
  • logback 在 Flink 中不起作用

    我有一个单节点 Flink 实例 它在 lib 文件夹中具有 logback 所需的 jar logback classic jar logback core jar log4j over slf4j jar 我已从 lib 文件夹中删除了
  • 无法在 Flink 新 Kafka Consumer-api (1.14) 中的检查点上向 Kafka 提交消费偏移量

    我使用以下代码引用 Kafka 源连接器的 Flink 1 14 版本 我期待以下要求 在应用程序刚开始时必须读取 Kafka 主题的最新偏移量 在检查点上 它必须将消耗的偏移量提交给 Kafka 重新启动后 当应用程序手动终止 系统错误时
  • SingleOutputStreamOperator#returns(TypeHint typeHint) 方法的 javadoc

    我正在阅读源代码SingleOutputStreamOperator returns 它的javadoc是 Adds a type information hint about the return type of this operato
  • 为什么我的 Flink 窗口使用这么多状态?

    我的 Flink 作业的检查点变得越来越大 在深入研究各个任务后 键控窗口函数似乎负责大部分大小 我怎样才能减少这个 如果您在 Windows 中绑定了很多状态 则有几种可能性 使用增量聚合 通过使用reduce or aggregate
  • Apache Flink 1.3 中的 Elasticsearch 5 连接器

    通过阅读文档 我了解到使用 Apache Flink 1 3 我应该能够使用 Elasticsearch 5 x 但是 在我的 pom xml 中
  • flink集群启动错误[ERROR]无法正确获取JVM参数

    bin start cluster sh Starting cluster INFO 1 instance s of standalonesession are already running on centos1 Starting sta
  • 从 FlinkML 多元线性回归中提取权重

    我正在运行 Flink 0 10 SNAPSHOT 的示例多元线性回归 我不知道如何提取权重 例如斜率和截距 beta0 beta1 无论你想怎么称呼它们 我对 Scala 不太熟悉 这可能是我问题的一半 感谢任何人可以提供的任何帮助 ob
  • 使用 Flink LocalEnvironment 进行生产

    我想了解本地执行环境的局限性以及它是否可以用于在生产中运行 感谢任何帮助 见解 谢谢 LocalExecutionEnvironment 启动一个 Flink MiniCluster 它在单个 JVM 中运行整个 Flink 系统 JobM
  • Apache Flink:KeyedStream 上的数据分布不均匀

    我在 Flink 中有这样的 Java 代码 env setParallelism 6 Read from Kafka topic with 12 partitions DataStream
  • 如何在 Flink 中引用外部 Jar

    每个人 我尝试在所有任务管理器中以将其复制到 FLINK lib 的方式在 Flink 中引用我的公司 jar 但失败了 而且我不想打包一个胖罐子 太重而且浪费时间 我认为第一种方法也不是一个好主意 因为我必须管理整个集群中的jar 有谁知
  • Apache Flink 环境中的 AWS SDK 冲突

    我正在尝试将我的作业部署到 Flink 环境 但总是收到错误 java lang NoSuchMethodError com amazonaws AmazonWebServiceRequest putCustomQueryParameter
  • Apache Flink、JDBC 和 fat jar 是否存在类加载问题?

    使用 Apache Flink 1 8 并尝试运行RichAsyncFunction 我得到No Suitable Driver Found初始化 Hikari 池时出错RichAsyncFunction open 在 IDE 中它运行得很
  • 在 Flink 流中使用静态 DataSet 丰富 DataStream

    我正在编写一个 Flink 流程序 其中我需要使用一些静态数据集 信息库 IB 来丰富用户事件的数据流 对于例如假设我们有一个买家的静态数据集 并且有一个传入的事件点击流 对于每个事件 我们希望添加一个布尔标志来指示事件的执行者是否是买家
  • Flink 流顺序

    Flink 能保证流的执行顺序吗 我有两个 Kafka 主题 每个主题都有一个分区 流 1 和流 2 并使用keyBy 流由一个处理coprocess功能 在我的测试过程中 我可以看到两个流的内容并不总是按顺序执行 我可以将并行度设置为 1
  • 基于流的应用程序中的受控/手动错误/恢复处理

    我正在开发一个基于的应用程序Apache Flink 它利用Apache Kafka用于输入和输出 该应用程序可能会被移植到Apache Spark 所以我也将其添加为标签 问题仍然相同 我要求通过 kafka 接收的所有传入消息必须按顺序
  • Cassandra Pojo Sink Flink 中的动态表名称

    我是 Apache Flink 的新手 我正在使用 Pojo Sink 将数据加载到 Cassandra 中 现在 我在以下命令的帮助下指定表和键空间名称 Table注解 现在 我想在运行时动态传递表名称和键空间名称 以便可以将数据加载到用
  • Apache Flink 动态设置 JVM_OPT env.java.opts

    是否可以设置自定义 JVM 选项env java opts提交作业时未在作业中指定conf flink conf yaml file 我问的原因是我想在 log4j 中使用一些自定义变量 我也在 YARN 上运行我的工作 我已经使用 CLI

随机推荐