Apache Kafka 0.9.0.0 显示所有带分区的主题

2023-12-31

我目前正在评估 Apache Kafka,我有一个简单的消费者,应该从特定主题分区读取消息。这是我的客户:

public static void main(String args[]) {

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

    TopicPartition partition0 = new TopicPartition("test_topic", Integer.parseInt(args[0]));

    ArrayList topicAssignment = new ArrayList();
    topicAssignment.add(partition0);
    consumer.assign(topicAssignment);

    //consumer.subscribe(Arrays.asList("test_topic"));
    int commitInterval = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
            if (buffer.size() >= commitInterval) {
                process(buffer);
                consumer.commitSync();
                buffer.clear();
            }
        }
    }
}

static void process(List<ConsumerRecord<String, String>> buffers) {
   for (ConsumerRecord<String, String> buffer : buffers) {
       System.out.println(buffer);
   }
}

这是我用来启动 Apache Kafka 的命令:

bin/kafka-server-start.sh config/server.properties & bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test_topic

正如您在此处看到的,我正在创建具有 2 个分区(p0 和 p1)的主题!

然后,我使用以下命令启动消费者的两个实例:

对于消费者 1:

java -cp target/scala-2.11/kafka-consumer-0.1.0-SNAPAHOT.jar com.test.api.consumer.KafkaConsumer09Java 0

对于消费者 2:

java -cp target/scala-2.11/kafka-consumer-0.1.0-SNAPAHOT.jar com.test.api.consumer.KafkaConsumer09Java 1

其中 0 和 1 代表我希望消费者从中读取消息的实际分区。

但发生的情况是,只有我的消费者 1 收到了所有消息。我的印象是来自生产者的消息最终平等地出现在分区上。

我使用以下命令查看主题 test_topic 有多少个分区:

Joes-MacBook-Pro:kafka_2.11-0.9.0.0 joe$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info --group test --topic test_topic --zookeeper localhost:2181
[2016-01-14 13:36:48,831] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test            test_topic                     0   10000           10000           0               none
BROKER INFO
0 -> 172.22.4.34:9092

为什么我让 Kafka 为 test_topic 创建 2 个分区,却只有 1 个分区?

这是我的制作人:

  def main(args: Array[String]) {
    //val conf = new SparkConf().setAppName("VPP metrics producer")
    //val sc = new SparkContext(conf)

    val props: Properties = new Properties()
      props.put("metadata.broker.list", "localhost:9092,localhost:9093")
      props.put("serializer.class", "kafka.serializer.StringEncoder")

    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config)

    1 to 10000 map {
      case i => 
        val jsonStr = getRandomTsDataPoint().toJson.toString
        println(s"sending message $i to kafka")
        producer.send(new KeyedMessage[String, String]("test_topic", jsonStr))
        println(s"sent message $i to kafka")
    }
  }

我不确定如果您使用 2 个分区创建主题,为什么您会有 1 个分区。我从来没有发生过这种情况,这是肯定的。

你能试试这个吗: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic 这应该会告诉您实际有多少个分区。

然后,如果确实有 1 个分区,也许您可​​以通过创建一个新主题来重新开始: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test_topic_2

然后尝试: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic_2 ...并报告结果。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Kafka 0.9.0.0 显示所有带分区的主题 的相关文章

随机推荐

  • 从堆栈跟踪行号获取实际的 jsp 行号?

    这是堆栈跟踪 org apache jsp showcustomer jsp jspService showcustomer jsp java 128 org apache jasper runtime HttpJspBase servic
  • HTML5 CSS:行和调整大小

    下面的代码是响应式的 可以调整大小等 但我正在寻找一些真正简单的 CSS 来在桌面和移动设备上调整大小 我知道我可以使用标题链接 w3 css 中的 CSS 但必须有一种更好的方法来轻松地在桌面和移动设备上显示某些内容 移动设备默认显示内联
  • 打印/另存为 PDF(保留 CSS 布局)

    当我简单地打印 如在纸上 或将页面另存为 PDF 使用浏览器内置工具 时 CSS 会被完全忽略 我只会看到一行又一行的内容 有没有办法做到这一点 无需转换 HTML 2 PDF 图像 Thanks 这可能是因为你有media指定的选项
  • 当 SSM 代理变为活动状态时捕获事件

    我想在 SSM 的 Fleet Manager 中注册新的 EC2 实例时触发 lambda 这意味着可以使用 SSM 连接到该实例 但是我找不到在 EventBridge 中使用的模式 在 EventBridge 中 我尝试使用在文档中找
  • 在 SQL Server 2005 上违反 INSERT WHERE COUNT(*) = 0 上的 UNIQUE KEY 约束

    我正在从多个进程插入 SQL 数据库 这些进程有时可能会尝试将重复数据插入表中 我尝试以处理重复项的方式编写查询 但我仍然得到 System Data SqlClient SqlException Violation of UNIQUE K
  • nginx - laravel - hhvm-Fastcgi 出现错误 500

    I install a LEMP server in ubuntu 12 04 LTS 64 whit HHVM Fastcgi Service and i install laravel via laravel phar and test
  • 根据浏览器高度和宽度保持纵横比和字体大小?

    下面的代码附在window onresize resize The baseWidth and baseHeight在负载上读取作为计算的基础 这main变量只需将其设置到主 html 节点即可定义 字体设置在块元素上会导致所有的em基于其
  • ASP.NET Core 1.0 是否支持 ApiExplorer?如何使用它?

    ASP NET Core 1 0支持使用API Explorer吗 我无法找到任何有关它的文档或如何使用它 有人使用过它并且可以分享一些见解吗 Itay的回答帮助我得到了我想要的答案 Rob Lang 博士向任何需要使用 ApiExplor
  • ajax 调用的生命周期是多长?

    假设我在 javascript 中有这段代码 function doAnAjaxCall var xhr1 new XMLHttpRequest xhr1 open GET mylink true xhr1 onreadystatechan
  • 错误:运算符“++”的操作数必须是左值

    In C i j 在代码中工作正常 但是当我使用时 i j 我收到以下错误 Operand for operator must be an lvalue 为什么我会收到此错误 后自增要求操作数应该是可修改的左值但后自增的结果是prvalue
  • 会话超时混乱 - session.setMaxInactiveInterval(0)

    我是 JEE 的新手 这让我感到困惑 根据HttpSession html setMaxInactiveInterval int 间隔 http docs oracle com javaee 7 api javax servlet http
  • 共享 OpenGL VAO/VBO/等。通过 QGLWidget

    我正在使用 QGLWidgets 的 3 层层次结构在我的类似 CAD 的应用程序中的 5 个 OpenGL 视口之间共享着色器和顶点数据 根上下文用于编译应用程序范围的着色器 每个文档上下文用于共享模型顶点数据 视口上下文是实际进行渲染的
  • 如何在 XML 属性值中包含 &、<、> 等

    我想创建一个 XML 文件 用于存储 Java 程序的结构 我能够成功解析 Java 程序并根据需要创建标签 当我尝试将源代码包含在标签中时 问题就出现了 因为 Java 源代码可能使用大量实体引用和保留字符 例如 lt gt 我无法创建有
  • d3 过滤后无法附加完整数据

    我有一个项目的简化版本 我将其浓缩为以下片段 var margins top 20 bottom 300 left 100 right 100 var height 600 var width 1200 var totalWidth wid
  • 将 GraphML 转换为 GV 或 Dot 文件

    如何使用 graphml 文件作为输入在 graphviz 中绘制图形 I know graphviz附带graphml2gv为此目的 但是当我尝试从命令提示符运行此命令时 我收到此消息 cvtgxl not configured for
  • 如何在我的父 git 项目中使用 git 存储库?

    我尝试使用子模块将 github 存储库中的副本保留在我的第 3 方目录中 创建并获取文件后 当我从项目提交时 它只能看到文件夹 看不到文件 是的 文件在那里 有具体命令吗 在 GitHub 上 你只会看到一个灰色文件夹 https sta
  • Jackson 中的 readValue 和 readTree:何时使用哪个?

    我刚刚开始使用 Jackson JSON 库 Jackson 是一个非常强大的库 但它有非常广泛的 API 很多事情可以通过多种方式完成 这使得你很难在杰克逊找到自己的方式 如何知道什么是正确 最好的做事方式 为什么我要使用这个解决方案 S
  • C 中的递归函数:return 总是必要的吗?

    这是我第一次使用递归函数 我编写的这个函数如果仅包含按升序排列的字母 则返回字符串的大小 否则返回 1 在我取出第二个 返回 后 我不明白为什么它对这两个代码都有效 一个比另一个更浪费吗 希望有一些见解 with returnonly as
  • opencv4nodejs如何计算拉普拉斯方差的模糊度

    我有一个代码 const cv require opencv4nodejs let text let image cv imread images focused jpg let gray image cvtColor cv COLOR B
  • Apache Kafka 0.9.0.0 显示所有带分区的主题

    我目前正在评估 Apache Kafka 我有一个简单的消费者 应该从特定主题分区读取消息 这是我的客户 public static void main String args Properties props new Properties