如何在通过Spark生产消息的Kafka中均匀分布数据?

2023-12-10

我有一个将数据写入 Kafka 的流作业,我注意到其中一个 Kafka 分区(#3)比其他分区获取更多的数据。

+-----------------------------------------------------+
| partition | messages  | earlist offset | next offset|
+-----------------------------------------------------+
|1          | 166522754 | 5861603324     | 6028126078 |
|2          | 152251127 | 6010226633     | 6162477760 |
|3          | 382935293 | 6332944925     | 6715880218 |
|4          | 188126274 | 6171311709     | 6359437983 |
|5          | 188270700 | 6100140089     | 6288410789 |
+-----------------------------------------------------+

我找到了一种选择 - 使用 Kafka 分区数 (5) 重新分区输出数据集。

还有其他方法可以均匀分布数据吗?


数据在 Kafka 中的分区方式并不取决于数据在 Spark 及其数据集中的分区方式。从 Kafka 的角度来看,它取决于消息的键,或者您在写入 Kafka 时应用自定义 Partitioner 类。

Kafka中数据的分区方式有以下几种场景:

消息键为空并且没有自定义分区程序

如果 Kafka 消息中未定义键,Kafka 将以循环方式在所有分区中分发消息。

消息键不为空且没有自定义分区程序

如果您提供消息密钥,默认情况下,Kafka 将根据以下条件决定分区:

hash(key) % numer_of_partitions

提供自定义分区器

如果您想完全控制 Kafka 如何在主题的分区中存储消息,您可以编写自己的 Partitioner 类并将其设置为partitioner.class在您的生产者配置中。

以下是客户分区器类的示例

public class MyPartitioner implements Partitioner {
  public void configure(Map<String, ?> configs) {}
  public void close() {}

  public int partition(String topic, Object key, byte[] keyBytes,
                       Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();

    if ((keyBytes == null) || (!(key instanceOf String)))
      throw new InvalidRecordException("Record did not have a string Key");

    if (((String) key).equals("myKey"))
       return 0; // This key will always go to Partition 0

    // Other records will go to the rest of the Partitions using a hashing function
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) + 1;
  }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在通过Spark生产消息的Kafka中均匀分布数据? 的相关文章

随机推荐

  • 查询视图以获取其列名称

    我有大量的 SQL 2008 R2 视图 我想知道视图中引用了哪些数据库字段 有没有一种方法可以查询架构以列出这些列名称 使用此查询针对sys sql dependencies SELECT ViewName O name Referenc
  • 在指定行上方插入行

    我想知道如何在具有 订单类型 如下指定 的指定行上方插入 3 行额外的行 该行在工作表中多次出现 下面的代码可以工作 只是它会在指定行下方插入行 谢谢 Sub try Dim c As Range For Each c In Range A
  • 在 MYSQL 子查询中使用 LIMIT 关键字的替代方法

    我有一个包含以下列的表 TEST code ver VARCHAR suite VARCHAR date DATE 现在我想选择 10 行具有不同的 c 值ode ver code ver NOT LIKE DevBld sorted by
  • if 语句中的增量运算符如何工作?

    include
  • 如何在没有网络连接的情况下克隆git存储库

    基础设施 我有两台主机 A 和 B 我无法从 A 连接到 B 反之亦然 主机 A 和主机 B 之间没有可到达的主机 C 我可以双向发送电子邮件 允许添加文本附件 不允许使用其他附件 可能还有其他问题 我想将 git 存储库从主机 A 克隆到
  • Django admin - 覆盖已经注册的模型

    我需要覆盖django eav应用程序管理类 在eav admin py型号已注册 admin site register Value 我需要添加列表显示到这个模型 我知道修改已安装的应用程序代码是不好的做法 所以我需要覆盖它 但是 不知道
  • EF5 Code First 枚举和查找表

    我想定义一个供 EF5 使用的枚举以及相应的查找表 我知道 EF5 现在支持枚举 但开箱即用 它似乎只在对象级别支持此功能 并且默认情况下不会为这些查找值添加表 例如 我有一个 User 实体 public class User int I
  • 选择类别后勾选子类别

    我有一个过滤器框图标 单击时会打开一个包含类别和子类别的对话框 如下 在此处输入图像描述 1 1 我想这样做 以便当用户检查一个类别 在我的例子中是一个国家 时 所有子类别 在我的例子中是一个城市 都会自动检查 是的 从数据库搜索的角度来看
  • Primefaces 组件 CSS 定制

    正如我在 primefaces 文档中看到的那样 1 To change the font size of PrimeFaces components globally use the ui widget style class An ex
  • 如何缓存 Firebase 存储下载的图像

    我正在开发一个应用程序 我需要缓存图像 以便我可以在用户离线时检索它们 并且在用户关闭并重新打开应用程序后也应该可以访问它们 我需要类似的东西 FirebaseFirestoreSettings setPersistenceEnables
  • 配置更改后或启动这些操作的活动被破坏后,SQLite CRUD 操作的预期行为是什么?

    我正在重构我前段时间制作的一个应用程序 当时我刚刚迈入 Android 最简单的方法就是避免方向更改 对于几乎所有的 CRUD 操作 我都使用AsyncTask类 没有实现内容提供程序或使用片段 现在我做了一些改变 I use Fragme
  • 使用 Rcpp 属性允许 C++ 常量作为默认函数参数

    我在 R 包中使用 rcpp 创建了一个 cumsum 函数 它将对向量进行累积求和 直到达到用户定义的上限或下限 然而 如果希望将累积和限制在上方 则用户仍必须指定下限 Example a c 1 1 1 1 1 1 1 如果我想高潮a上
  • 在 UI 元素上方的特定位置显示弹出按钮

    我有 gridView 它的项目非常简单 每个 gridViewItem 上都有按钮 在此按钮上单击我想显示一个与 gridViewItem 内容相同的浮出控件 但也显示更多数据 这很简单 但我想将弹出窗口放置在 gridViewItem
  • 类型转换器不适用于 @Query 中的集合

    我有一个名为 Events 的实体 其定义如下 Entity tableName Events data class Event PrimaryKey val id Long val name String val venues Set
  • Python:定义具有依赖属性的类

    我的目标是编写一个可用于计算设备所有属性的类 import numpy as np class pythagoras def init self a None b None c None self a a self b b self c c
  • jQuery .each 中的 setTimeout() 问题

    以下代码将无法正常工作 我尝试过不同的变体并到处搜索但没有运气 i 1 var timer new Array jQuery a each function i timer i setTimeout jQuery this remove i
  • C++:调用临时对象的构造函数

    假设我有以下内容 int main SomeClass return 0 如果没有优化 SomeClass 构造函数将被调用 然后它的析构函数将被调用 对象将不复存在 然而 根据 IRC 频道 如果编译器认为 SomeClass 构造函数
  • Gstreamer 的 OpenCV 3.0.0 错误

    我刚刚按照本教程安装了 OpenCV 3 0 http rodrigoberriel com 2014 10 installing opencv 3 0 0 on ubuntu 14 04 我在安装过程中没有遇到任何错误 但是 当我尝试运行
  • 如何使用 AJAX 上传文件而不使用 multipart?

    我的应用程序允许用户上传的唯一文件是图像 并且它们始终作为表单中的唯一输入字段上传 因此 多部分是不必要的 并且我可以在没有多部分解析器的情况下更轻松地使用该文件 如何使用 AJAX 和 vanilla Javascript 在不使用分段的
  • 如何在通过Spark生产消息的Kafka中均匀分布数据?

    我有一个将数据写入 Kafka 的流作业 我注意到其中一个 Kafka 分区 3 比其他分区获取更多的数据 partition messages earlist offset next offset 1 166522754 58616033