Kafka 0.8,是否可以使用java代码创建带有分区和复制的主题?

2024-04-02

在 Kafka 0.8beta 中,可以使用下面提到的命令创建主题here http://kafka.apache.org/08/quickstart.html

    bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 2 --partition 3 --topic test

上面的命令将创建一个名为“test”的主题,其中包含 3 个分区,每个分区有 2 个副本。

我可以使用 Java 做同样的事情吗?

到目前为止,我发现使用 Java 我们可以创建一个生产者,如下所示

    Producer<String, String> producer = new Producer<String, String>(config);
    producer.send(new KeyedMessage<String, String>("mytopic", msg));

这将创建一个名为“mytopic”的主题,并使用“num.partitions”属性指定分区数并开始生成。

但是有没有办法也定义分区和复制呢?我找不到任何这样的例子。如果我们不能,那么这意味着我们总是需要创建带有分区和复制的主题(根据我们的要求),然后使用生产者在该主题内生成消息。例如,如果我想以相同的方式创建“mytopic”但具有不同数量的分区(覆盖 num.partitions 属性),是否可以?


注意:我的答案涵盖 Kafka 0.8.1+,即截至 2014 年 4 月可用的最新稳定版本。

是的,您可以通过 Kafka API 以编程方式创建主题。是的,您可以指定所需的分区数量以及主题的复制因子。

请注意,最近发布的 Kafka 0.8.1+ 提供的 API 与 Kafka 0.8.0 略有不同(Biks 在其链接回复中使用了该 API)。我添加了一个在 Kafka 0.8.1+ 中创建主题的代码示例 https://stackoverflow.com/a/23360100/1743580对我对问题的答复我们如何使用 API 从 IDE 在 Kafka 中创建主题 https://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api/Biks 上面提到的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 0.8,是否可以使用java代码创建带有分区和复制的主题? 的相关文章

  • 如何在KafkaStream应用程序中获取partitionId和TopicName

    我们如何从 KafkaStream 获取主题名称和分区 id 对于任何其他 Kafka 消费者 我们可以获得主题名称和分区 ID 如下所示 ConsumerRecords
  • python 脚本在 docker 内运行时无法导入 kafka 库 [重复]

    这个问题在这里已经有答案了 我有以下 python 脚本 可以从 twitter 中提取推文并将其发送到 kafka 主题 该脚本运行完美 但是当我尝试在 docker 容器内运行它时 它无法导入 kafka 库 它说 语法错误 语法无效
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • 如何使用不同的kafka主题配置Kubernetes部署的微服务的每个pod/进程?

    在我们的应用程序中 有多个不同 kafka 主题的消费者 例如 Cosumer C1 Cosumer C2 Cosumer C3 Cosumer C4 Cosumer C5 以及不同的 kafka 主题 例如主题 1 主题 2 主题 3 主
  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • Spark:将 bytearray 转换为 bigint

    尝试使用 pyspark 和 Spark sql 将 kafka 键 二进制 字节数组 转换为 long bigint 会导致数据类型不匹配 无法将二进制转换为 bigint 环境详情 Python 3 6 8 Anaconda custo
  • 带有 kafka-avro-console-consumer 的未知魔法字节

    我一直在尝试将 Confluence 中的 kafka avro console consumer 连接到我们的旧版 Kafka 集群 该集群是在没有 Confluence Schema Registry 的情况下部署的 我使用以下属性显式
  • 在 WSL2 中通过 IDE 连接到 kafka 服务器时出错

    我无法通过在 Windows 上运行的 intellij 或 vscode 连接到在 ubuntu 上运行的 kafka 服务器 我在 WSL2 上尝试的第一个服务器 我什至尝试使用虚拟机的IP 但没有成功 据我了解 我们应该能够根据此文档
  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • 有没有办法重新分区 Kafka 流中的输入主题?

    我有一个由 byte 键控的主题 我想对其进行重新分区并通过消息正文中字段中的另一个键处理该主题 我发现有KGroupedStream and groupby功能 但它需要一个聚合函数来转换为 KTable KStream 我不需要聚合 我
  • Kafka 0.8.2 中是否可以向现有主题添加分区

    我有一个Kafka https kafka apache org 集群运行有 2 个分区 我一直在寻找一种将分区计数增加到 3 的方法 但是 我不想丢失有关该主题的现有消息 我尝试停下来Kafka https kafka apache or
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • Kafka REST 代理 API 有哪些好处?

    我不知道Kafka REST Proxy API的优点 它是一个 REST API 所以我知道它对于管理来说很方便 人们为什么使用 Kafka REST 代理 API 添加对生产者或消费者的 Maven 依赖是否很麻烦 另外 我知道kafk
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确

随机推荐