我有一个将数据写入 Kafka 的流作业,我注意到其中一个 Kafka 分区(#3)比其他分区获取更多的数据。
+-----------------------------------------------------+
| partition | messages | earlist offset | next offset|
+-----------------------------------------------------+
|1 | 166522754 | 5861603324 | 6028126078 |
|2 | 152251127 | 6010226633 | 6162477760 |
|3 | 382935293 | 6332944925 | 6715880218 |
|4 | 188126274 | 6171311709 | 6359437983 |
|5 | 188270700 | 6100140089 | 6288410789 |
+-----------------------------------------------------+
我找到了一种选择 - 使用 Kafka 分区数 (5) 重新分区输出数据集。
还有其他方法可以均匀分布数据吗?
数据在 Kafka 中的分区方式并不取决于数据在 Spark 及其数据集中的分区方式。从 Kafka 的角度来看,它取决于消息的键,或者您在写入 Kafka 时应用自定义 Partitioner 类。
Kafka中数据的分区方式有以下几种场景:
消息键为空并且没有自定义分区程序
如果 Kafka 消息中未定义键,Kafka 将以循环方式在所有分区中分发消息。
消息键不为空且没有自定义分区程序
如果您提供消息密钥,默认情况下,Kafka 将根据以下条件决定分区:
hash(key) % numer_of_partitions
提供自定义分区器
如果您想完全控制 Kafka 如何在主题的分区中存储消息,您可以编写自己的 Partitioner 类并将其设置为partitioner.class
在您的生产者配置中。
以下是客户分区器类的示例
public class MyPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public void close() {}
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("Record did not have a string Key");
if (((String) key).equals("myKey"))
return 0; // This key will always go to Partition 0
// Other records will go to the rest of the Partitions using a hashing function
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) + 1;
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)