重启后Kafka主题不再存在

2023-12-12

我在本地 kafka 集群中创建了一个主题,其中包含 3 个服务器/代理 通过从我的 kafka 安装目录运行以下命令

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic user-activity-tracking-pipeline

一切都很顺利,因为我能够根据我的主题生成和使用消息。重新启动计算机后,我通过在终端中运行以下命令从 kafka 安装目录启动捆绑的 Zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

通过在 kafka 安装目录的终端中执行以下命令,启动属于集群的 3 个服务器

env JMX_PORT=10001 bin/kafka-server-start.sh config/server1.properties
env JMX_PORT=10002 bin/kafka-server-start.sh config/server2.properties
env JMX_PORT=10003 bin/kafka-server-start.sh config/server3.properties

现在,当我通过在 kafka 安装目录的终端中运行以下命令来列出可用主题时,

bin/kafka-topics.sh --zookeeper localhost:2181 --list

结果为空!

以下是相关服务器 1 配置条目。服务器 2 和服务器 3 的值非常相似

broker.id=1
listeners=PLAINTEXT://:9093
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-broker-1
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

我确实注意到重启后的日志文件,所以没有清理任何内容

/tmp/kafka-logs-broker-1
/tmp/kafka-logs-broker-2
/tmp/kafka-logs-broker-3

我想知道为什么之前创建的主题“用户活动跟踪管道”当我尝试列出它时不再存在?


kafka-topics.sh 实际上在后台使用 Zookeeper 数据来回答查询。理由是单个经纪人本身通常无法拥有足够的信息来完整地描述主题。

如果您在重新启动过程中丢失了(我怀疑您确实这样做了,因为您提到了新的zookeeper启动)zookeeper数据,那么kafka-topics现在完全是盲目的,无法看到以前的kafka数据。

检查发生情况的最佳方法是在查询时实际执行 kafka 正在执行的操作!启动你的zookeeper客户端(就像做一样简单./zkCli.sh,然后输入ls /brokers/topics。如果为空,你的ZK数据就会丢失。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

重启后Kafka主题不再存在 的相关文章

  • 在 Confluence 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件?

    我已经成功地将用 Java 编写的简单自定义 Partitioner 类用于 Confluence 3 2 x Kafka 0 10 x 上的 Kafka Connect 接收器 我想升级到 Confluence 4 1 Kafka 1 1
  • Kafka 0.8.2 中是否可以向现有主题添加分区

    我有一个Kafka https kafka apache org 集群运行有 2 个分区 我一直在寻找一种将分区计数增加到 3 的方法 但是 我不想丢失有关该主题的现有消息 我尝试停下来Kafka https kafka apache or
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • Hive - 线程安全的自动递增序列号生成

    我遇到一种情况 需要将记录插入到特定的 Hive 表中 其中一列需要是自动递增的序列号 即在任何时间点都必须严格遵循 max value 1 规则 记录从许多并行的 Hive 作业插入到这个特定的表中 这些作业每天 每周 每月批量运行 现在
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 我的 Kafka 流应用程序刚刚退出,代码为 0,什么也不做

    为了尝试 Kafka 流 我这样做了 public static void main String args final StreamsBuilder builder new StreamsBuilder final Properties
  • Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息

    我们的用例是从 kafka 中删除陈旧 未使用的主题 即如果某个主题 在所有分区上 在过去 7 天内没有任何新消息 那么我们会将其视为陈旧 未使用并删除它 许多谷歌结果建议向消息添加时间戳 然后解析它 对于新主题和消息 灵魂可以工作 但我们
  • 动物园管理员服务器已启动但没有输出

    我已经在zoo cfg中设置 clientPort 2181 cloudera cloudera vm sudo usr lib zookeeper bin zkServer sh 启动 我得到以下回复 JMX enabled by def
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • 如何检测 KTable 连接的哪一侧触发了更新?

    当您在 Kafka 中连接两个表时 每次更新两个 KTable 之一时 您的输出 Ktable 也会更新 想象一下你正在加入Customers与一个列表Orders你已经适当减少了 再次想象一下 您使用此连接的结果来为最终客户提供特别优惠和
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • 无法与 ZooKeeper 通信 - 更新被禁用

    我们正面临着 Zoo Keeper 的一个特殊问题 其中 ZK 突然失去了与 solr 云的连接并开始抛出异常 其中显示 无法与 ZooKeeper 对话 更新被禁用 我们的应用程序有 2 个 solr 集群 分别设置在 2 个不同的数据中

随机推荐