Flink学习6-自定义分区器介绍

2023-11-11

  1. 背景说明
    我们都知道自定义source是可以自定义并行度的,数据读写有几个并行度就意味着有几个分区。那么怎么控制我想要的数据流入到指定分区呢?flink1.12官方文档给我们提供了一下几种方式,接下来我们分别进行讨论。
    在这里插入图片描述

  2. partitionCustom分区器
    按照官方的原话翻译过来就是使用一个用户自定义的分区策略为每一个元素分配一个目标task。这里的的分区策略官方提到了两种:第一个是下标,第二个是字段。在这里插入图片描述
    下面看一个实例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TestParaPartition {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        test01(env);
        env.execute();
    }
    public static void test01(StreamExecutionEnvironment env){
        DataStreamSource<String> elements = env.fromElements("zhangsan", "lisi", "wangwu");
       elements.map(new MapFunction<String, Tuple2<String,String>>() {
           @Override
           public Tuple2<String, String> map(String value) throws Exception {
               return Tuple2.of("value is: ", value);
           }
       }).partitionCustom(new TestPartition(),1)
               .map(new MapFunction<Tuple2<String, String>, String>() {
                   @Override
                   public String map(Tuple2<String, String> value) throws Exception {
                       System.out.println("当前线程是:"+Thread.currentThread().getId()+"   value is : "+value.f1);
                       return value.f1;
                   }
               })
               .print();
    }
    
}

自定义分区逻辑

import org.apache.flink.api.common.functions.Partitioner;

/**
 * 自定义分区器
 */
public class TestPartition implements Partitioner<String> {

    @Override
    public int partition(String s, int i) {
        if("zhangsan".equals(s)){
            return 0;
        }else if("lisi".equals(s)){
            return 1;
        }else{
            return 2;
        }

    }
}
  1. shuffle分区器
    在这里插入图片描述
    在这里插入图片描述
    从源码可以看出shuffle分区器自己定义了一个ShufflePartitioner类,此方法里用了随机数将元素随机分配到一个分区里。所以我们无需再定义使用方式,直接调用此分区器即可。

  2. rebalance分区器
    在这里插入图片描述
    在这里插入图片描述
    从实现类看出rebalance分区器是第一个元素是随机选择一个分区下发,然后下一个元素通过取模的方式轮询下发。这样就可以将元素均分分布在每一个分区。而shuffle分区器可能会导致数据下发到同一个分区里。使用方式同shuffle分区器直接调用方法即可。

  3. rescale分区器
    在这里插入图片描述
    在这里插入图片描述
    上游操作向其发送元素的下游操作子集取决于上游和下游操作的并行度。例如,如果上游操作具有并行度2,下游操作具有并行性4,则一个上游操作将元素分配给两个下游操作,而另一个上游作业将分配给其他两个下游作业。另一方面,如果下游操作具有并行度2,而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而另两个上游作业将分配给其他下游操作。
    在不同的并行不是彼此的倍数的情况下,一个或多个下游操作将具有来自上游操作的不同数量的输入。
    区别于rebalance分区器它是从下游第一个分区开始轮训,而且还是pointwise分发模式。(Flink中有两种分发模式:pointwise 和 all-to-all,两者时间复杂度不同,前者为O(n)后者为O(n)^2,所以在pointwise分发模式下可以减少网络开销。)但对比rebalance分区器其数据均衡性不足。

  4. broadcast分区器
    在这里插入图片描述

    上游所有分区数据会自动给下游所有分区分发一份,分发模式是all-to-all。所以这里官方也提示不支持选择分区。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink学习6-自定义分区器介绍 的相关文章

  • 如何抑制 Cucumber/Junit 断言堆栈跟踪

    我有一个黄瓜场景 该步骤使用assertEquals 我的结果报告显示了对最终用户不友好的堆栈跟踪 我怎样才能抑制它 Scenario Add two numbers Given I have two inputs 3 and 2 When
  • Spring @Validated 在服务层

    Hej 我想使用 Validated group Foo class 在执行方法之前验证参数的注释 如下所示 public void doFoo Foo Validated groups Foo class foo 当我将此方法放入 Spr
  • Java:无安全管理器:RMI 类加载器已禁用

    您好 我有 RMI 应用程序 现在我尝试从客户端调用服务器上的一些方法 我有以下代码 public static void main final String args try Setting the security manager Sy
  • 无法将 INode 类型值分配给 类型变量。为什么?

    我想知道为什么以下代码无法工作 public static
  • 我应该使用 JDBC getNString() 而不是 getString() 吗?

    我们正在构建一个由 Oracle 数据库支持的 Java 应用程序 我们使用 JDBC 驱动程序 访问该数据库ojdbc6 jar and orai18n jar 数据库模式主要使用以下方式存储文本列NVARCHAR2数据类型 The JD
  • AWS SDK 2 承担角色

    Bean public DynamoDbClient amazonDynamoDB final AssumeRoleRequest assumeRoleRequest AssumeRoleRequest builder roleSessio
  • Java - toString 到 Color

    我一整天都在努力解决这个问题 基本上我做了一个 for 循环 将条目添加到数组列表中 其中一项是 颜色 变量 我已经用过random nextInt为颜色构造函数的红色 绿色和蓝色部分创建新值 我还设置了一个toString方法 这样我就可
  • Java时间转正常格式

    我有 Java 时间1380822000000 我想转换为我可以阅读的内容 import java util Date object Ws1 val a new Date 1380822000000 toString 导致异常 warnin
  • 外部实体更改后索引不更新

    我目前正在开发一个项目 使用 JPA 2 1 保存数据并使用 hibernate search 4 5 0 final 搜索实体 映射类和索引后 搜索工作正常 但是 当我更改值时描述B 类从 someStr 到 anotherStr 数据库
  • 使用java在网页中进行字符编码

    如何使用java找出网页中的字符编码类型 打开与 URL 的连接 使用URL openConnection http download oracle com javase 6 docs api java net URL html openC
  • 使用 equals 方法比较两个对象,Java

    我有一个对象数组 我想将它们与目标对象进行比较 我想返回与目标对象完全匹配的对象的数量 这是我的计数方法 public int countMatchingGhosts Ghost target int count 0 for int i 0
  • 如何将测试类打包到jar中而不运行它们?

    我正在努力将我的测试类包含到 jar 包中 但不运行它们 经过一番谷歌搜索后 我尝试过mvn package DskipTests 但我的测试类根本没有添加到 jar 中 有任何想法吗 如果您遵循 Maven 约定 那么您的测试类位于src
  • 默认情况下,JSF 生成不可用的 ID,这些 ID 与 Web 标准的 CSS 部分不兼容

    活跃的 JSF 或 Primefaces 用户能否解释一下为什么默认情况下会发生这种情况 为什么没有人对此采取任何措施
  • 单元测试、集成测试还是设计中的问题?

    我编写了我的第一个单元测试 我认为它过于依赖其他模块 我不确定是否是因为 这是一个复杂的测试 我实际上已经编写了集成测试或 我的设计有问题 我首先要说的是 虽然我有大约 4 年的开发经验 但我从未学过 也没有人教过自动化测试 我刚刚使用 H
  • Java 中通用方法参数的 getClass()

    以下 Java 方法无法编译
  • 在片段之间切换时底部导航栏会向下推

    在我的活动中 我有一个底部导航栏和框架布局来显示片段 一切正常 但问题是当我开始按顺序从 1 4 移动时 底部导航栏保持在其位置 但当我突然从 4 跳到2 然后底部导航栏就会超出屏幕 当再次单击同一项目时 它就会回到正常位置 该视频将清楚地
  • 如何在 Java 中创建一个带有连字符的值的静态枚举?

    如何创建如下所示的静态枚举 static enum Test employee id employeeCode 截至目前 我遇到了错误 这对于 Java 来说是不可能的 因为每个项目都必须是有效的标识符 并且有效的 Java 标识符可能不包
  • Java给定长度的随机数

    我需要在 Java 中生成一个恰好 6 位数字的随机数 我知道我可以在随机发生器上循环 6 次 但是在标准 Java SE 中还有其他方法可以做到这一点吗 要生成 6 位数字 Use Random http download oracle
  • 动态创建 JSON 对象

    我正在尝试使用以下格式创建 JSON 对象 tableID 1 price 53 payment cash quantity 3 products ID 1 quantity 1 ID 3 quantity 2 我知道如何使用 JSONOb
  • 一个类中有多个具有相同参数类型的方法

    我知道 至少已经有了关于这个主题的一个问题 https stackoverflow com questions 5561436 can two java methods have same name with different retur

随机推荐

  • jpa 动态参数查询 高级查询

    GetMapping find ApiOperation 通用查询 ScSecurityPermission name 通用查询 public ScResult find FileInfo fileInfo RequestParam nam
  • JS获取json子项/数组的个数/长度

    JS获取json子项 数组的个数 长度 微信小程序获取json格式数据的个数 长度
  • java 请求参数加解密

    项目开发中 需要针对请求参数加密 解密操作 可以使用下列工具类 oap security enabled true oap security enableIgnoreAnnotation true oap security secretKe
  • [4G/5G/6G专题基础-158]: 5G VoNR(Voice over NR)与VoLTE共同组成5G三大语音方案

    目录 第1章 语音方案概述 1 1 VoLTE概述 1 2 5G VoNR概述 第2章 5G VoNR网络架构 2 1 基本原则 2 2 NSA VoLTE方案 2 3 SA EPS Fallback SA组网 早期方案 2 4 SA Vo
  • 对于售点POI标签计算脚本优化总结

    对于售点POI标签计算脚本性能优化总结 减少udf 函数的使用多多使用spark的算子 对于两组经纬度的距离计算可以使用常规的公式计算法 Haversine 公式是一种用于计算两个球面坐标点 经度和纬度 之间的距离的公式 它的原理是基于球面
  • netpoll浅析

    netpoll只是一种框架和一些接口 只有依赖这个框架和接口实现的netpoll实例 netpoll才能发挥它的功能 类似于kernel中的vfs vfs本身并不会去做具体的文件操作 只是为不同的文件系统提供了一个框架 netpoll不依赖
  • 理解编程中的while循环(C/C++)

    固定的语法结构 省略 include 和 using int main printf x 我们写这个代码 打了一个int main 然后打了一个大括号 为什么要这样做 这个main 是一个函数 但为什么一定要叫main 为什么又需要打 又需
  • JS中的递归

    1 什么是递归 如果一个函数在内部可以调用其本身 那么这个函数就是递归函数 简单理解 函数内部自己调用自己 这个函数就是递归函数 比如 但上面代码报错因为递归很容易发生 栈溢出 错误 stack 所以必须要加退出条件 return 递归必须
  • python读取csv文件内容,并保存到数据库中

    coding utf 8 python读取csv文件内容 并保存到数据库中 import csv import time import pymysql import emoji num 0 file path r H 20221103174
  • 快速掌握 Android Studio 中 Gradle 的使用方法

    Gradle是可以用于Android开发的新一代的 Build System 也是 Android Studio默认的build工具 Gradle脚本是基于一种JVM语言 Groovy 再加上DSL 领域特定语言 组成的 因为Groovy是
  • 81. Search in Rotated Sorted Array II

    31 Search in Rotated Sorted Array ll 描述不包含相同的元素情况 Input nums 4 5 6 7 0 1 2 target 0 Output 4 对有序数组进行一定的旋转 进行查找 二分查找and双指
  • VMware player桥接模式不能联网的解决方法

    VMware虚拟机下主要使用两种网络连接方式 桥接模式 NAT模式 桥接模式 直接连接网络 虚拟机独立IP 并与宿主机处于同一网段内 相当于虚拟机是局域网内独立的一台电脑 使用起来较为方便 NAT模式 网络地址转换 借由宿主机的IP访问网络
  • linux进程间的通信(详细且有demo)

    目录 1 进程间通信常用的几种方式 2 无名管道 1 管道的概念 2 管道的原理 3 管道的局限性 4 创建匿名管道 5 demo 6 父子进程使用管道通信 7 管道的读写行为 8 查看管道缓冲区大小 3 有名管道 1 特点 2使用场景 3
  • VsCode中书写markdown文档快速插入图片

    文章目录 安装插件 插入图片 效果预览 安装插件 搜索 markdown image 安装这个插件 插入图片 首先复制图片 将图片保存在剪贴板中 在需要插入的地址点击鼠标右键 选择粘贴图片 粘贴图片后 会在文件 根路径 下生成一个 imag
  • redis内存数据库C客户端hiredis API 中文说明

    A 编译安装 make make install usr local make install PREFIX HOME progs 可以自由指定安装路径 B 同步的API接口 redisContext redisConnect const
  • 二十一、java版 SpringCloud分布式微服务云架构之Java 继承

    继承的概念 继承是java面向对象编程技术的一块基石 因为它允许创建分等级层次的类 继承就是子类继承父类的特征和行为 使得子类对象 实例 具有父类的实例域和方法 或子类从父类继承方法 使得子类具有父类相同的行为 生活中的继承 兔子和羊属于食
  • 【华为OD统一考试A卷

    华为OD统一考试A卷 B卷 新题库说明 2023年5月份 华为官方已经将的 2022 0223Q 1 2 3 4 统一修改为OD统一考试 A卷 和OD统一考试 B卷 你收到的链接上面会标注A卷还是B卷 请注意 根据反馈 目前大部分收到的都是
  • python库之execjs使用方法

    一 execjs使用方法 import execjs def encrypt c1 c2 with open encrypt js encoding utf 8 as f jscode f read txt execjs compile j
  • Vmware vSphere 5.0系列教程之五 存储简介及配置openfiler存储

    转载至 http andygao blog 51cto com 323260 822152 从前面的vSwitch 我们知道 vSphere的高级功能的实现 必须通过多片物理网卡来实现 不过 这仅仅是一方面 更重要的是 我们需要独立的共享存
  • Flink学习6-自定义分区器介绍

    背景说明 我们都知道自定义source是可以自定义并行度的 数据读写有几个并行度就意味着有几个分区 那么怎么控制我想要的数据流入到指定分区呢 flink1 12官方文档给我们提供了一下几种方式 接下来我们分别进行讨论 partitionCu