kafka启动失败(版本0.8.0 beta1)

2024-04-23

我正在尝试在独立模式(在ec2上)上使用zookeeper版本(3.3.6)启动kafka服务。 所以我运行 1) sbt update 2) sbt package 3)sbt assembly-package-dependency 然后启动zookeeper服务,然后启动kafka服务器。但是,我收到以下错误消息: 对于卡夫卡服务器日志:

ERROR Error while electing or becoming leader on broker 0 (kafka.server.ZookeeperLeaderElector)
java.net.ConnectException: Connection timed out
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:465)
    at sun.nio.ch.Net.connect(Net.java:457)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:84)
    at kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:35)
    at kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:35)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
    at kafka.controller.ControllerChannelManager.<init>(ControllerChannelManager.scala:35)
    at kafka.controller.KafkaController.startChannelManager(KafkaController.scala:503)
    at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:467)
    at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:215)
    at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:89)
    at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53)
    at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:106)
    at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
    at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

对于动物园管理员日志:

    2014-07-15 15:49:22,996 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x57 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,102 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x59 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,109 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x5b zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest
2014-07-15 15:49:23,215 - INFO  [ProcessThread:-1:PrepRequestProcessor@419] - Got user-level KeeperException when processing sessionid:0x1473b82f52e0004 type:create cxid:0x5d zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/brokers/topics/edwintest Error:KeeperErrorCode = NodeExists for /brokers/topics/edwintest

对于 kafka 生产者日志:

[2014-07-15 15:49:23,107] INFO Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 23 for 1 topic(s) Set(edwintest) (kafka.client.ClientUtils$)
[2014-07-15 15:49:23,107] INFO Connected to localhost:9092 for producing (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,111] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,111] WARN Error while fetching metadata [{TopicMetadata for topic edwintest ->
No partition metadata for topic edwintest due to kafka.common.LeaderNotAvailableException}] for topic [edwintest]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2014-07-15 15:49:23,112] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: edwintest (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,112] INFO Back off for 100 ms before retrying send. Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,213] INFO Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 24 for 1 topic(s) Set(edwintest) (kafka.client.ClientUtils$)
[2014-07-15 15:49:23,213] INFO Connected to localhost:9092 for producing (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,217] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer)
[2014-07-15 15:49:23,218] WARN Error while fetching metadata [{TopicMetadata for topic edwintest ->
No partition metadata for topic edwintest due to kafka.common.LeaderNotAvailableException}] for topic [edwintest]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2014-07-15 15:49:23,219] ERROR Failed to send requests for topics edwintest with correlation ids in [17,24] (kafka.producer.async.DefaultEventHandler)
[2014-07-15 15:49:23,219] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
    at scala.collection.immutable.Stream.foreach(Stream.scala:254)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

my /etc/hosts config

127.0.0.1       ip-172-32-1-95 localhost.localdomain localhost
::1             localhost6.localdomain6 localhost6

我的 server.properties 文件

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to and advertise to producers and consumers.
# If not set, the server will bind to all interfaces and advertise the value returned from
# from java.net.InetAddress.getCanonicalHostName().
#host.name=localhost

# The number of threads handling network requests
num.network.threads=2

# The number of threads doing disk I/O
num.io.threads=2

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# The directory under which to store log files
log.dir=/tmp/kafka-logs

# The number of logical partitions per topic per server. More partitions allow greater parallelism
# for consumption, but also mean more files.
num.partitions=1

############################# Log Flush Policy #############################

# The following configurations control the flush of data to disk. This is the most
# important performance knob in kafka.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
#    3. Throughput: The flush is generally the most expensive operation.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000

# Per-topic overrides for log.flush.interval.ms
#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.cleanup.interval.mins=1

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

# metrics reporter properties
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
# Disable csv reporting by default.
kafka.csv.metrics.reporter.enabled=false </code>

我的动物园管理员配置zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181

我尝试删除/tmp/zookeeper或/tmp/kafka-logs下的kafka和zookeeper的所有信息,然后重新启动所有内容,但仍然收到相同的错误。


凉爽的!我猜您正在运行 kafka-console- Producer 来向主题“edwintest”发布消息。在运行生产者之前,使用此命令创建主题


./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic edwintest  

然后启动你的控制台制作器。希望这能解决您的问题。

[EDIT]显然,您必须确保正确更新您的 ec2 安全组,以便为​​生产者打开 zk 和 kafka 代理端口。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka启动失败(版本0.8.0 beta1) 的相关文章

  • Kafka - 如何捕获kafka客户端后台线程生成的消息

    使用以下配置来模拟消费者关闭 会话超时 我们如何捕获客户端记录到控制台的消息 SESSTMOUT rdkafka consumer 1 第三 主要 consumed message None msg1 0 first topic 0 Non
  • 增加 Java 中主题的分区数量

    我正在使用名称 卡夫卡 2 12版本 2 3 0 根据我想更改的流量 负载最大分区某个主题的编号 Kafka启动后是否可以进行这种更改 并且可以通过代码完成吗 是的 您可以通过代码增加分区 使用AdminClient createParti
  • 卡夫卡消费者陷入(重新)加入组

    如果 kafka 版本 0 10 消费者尝试重新加入消费者组 其默认行为是什么 我正在将单个消费者用于消费者组 但似乎它在重新加入时受到了打击 每 10 分钟后 它会在消费者日志中打印以下行 2016 08 11 13 54 53 803
  • 当我按键对数据进行分区,然后向 Kafka 中的主题添加新分区时,会发生什么?

    当我按键对数据进行分区 然后向 Kafka 中的主题添加新分区时 会发生什么 现有记录是否会发生变化 未来的数据将如何分区 当新分区添加到特定主题时 现有数据的分区不会改变 Kafka 不会尝试重新分发现有记录 此修改只会对新记录产生影响
  • Kafka结构化流KafkaSourceProvider无法实例化

    我正在开发一个流项目 其中有一个 ping 统计数据的 kafka 流 如下所示 64 bytes from vas fractalanalytics com 192 168 30 26 icmp seq 1 ttl 62 time 0 9
  • 使用 Kafka Streams 进行 OpenTracing - 如何?

    我正在尝试将 Jaeger 跟踪集成到 K Streams 中 我计划将跟踪添加到几个最重要的管道中 并且想知道将 Traceid 从一个管道传递到另一个管道的好方法是什么 这是我到目前为止所做的 在流处理管道开始时 我启动一个服务器范围并
  • 如何在java程序中获取kafka消耗滞后

    我写了一个java程序来消费来自kafka的消息 我想监控消费延迟 如何通过java获取它 顺便说一句 我用
  • Kafka Streams 在 HDFS 上查找数据

    我正在使用 Kafka Streams v0 10 0 1 编写一个应用程序 并希望通过查找数据来丰富我正在处理的记录 该数据 带时间戳的文件 每天 或每天 2 3 次 写入 HDFS 目录 我怎样才能将其加载到Kafka Streams应
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • 批量插入成功后更新 Kafka 提交偏移量

    我有一个 spring kafka 消费者 它读取记录并将其移交给缓存 计划任务会定期清除缓存中的记录 我想仅在批次成功保存到数据库后更新 COMMIT OFFSET 我尝试将确认对象传递给缓存服务以调用确认方法 如下所示 public c
  • Kafka 中的“__consumer_offsets”主题是什么

    当我运行此命令时 我得到 2 个主题 我知道我创建了测试主题 但我看到了一个名为 consumer offsets 的附加主题 从名称上看 它与消费者抵消有关 但它是如何使用的呢 bin kafka topics sh list zooke
  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
  • 无法向 Kafka 发送大消息

    我想从生产者向 Kafka 发送一条大消息 因此我更改了以下属性 代理 服务器 属性 replica fetch max bytes 317344026 message max bytes 317344026 max message byt
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • 在 WSL2 中通过 IDE 连接到 kafka 服务器时出错

    我无法通过在 Windows 上运行的 intellij 或 vscode 连接到在 ubuntu 上运行的 kafka 服务器 我在 WSL2 上尝试的第一个服务器 我什至尝试使用虚拟机的IP 但没有成功 据我了解 我们应该能够根据此文档
  • 通过 CMD 获取启用 SSL 的 Kafka 中的最新偏移量

    我一直在使用下面的 CMD 从打开纯文本端口的 Kafka 队列中获取最新的偏移量 kafka run class sh kafka tools GetOffsetShell broker list server 9092 topic sa
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • 如何更改主题的起始偏移量?

    是否可以更改新主题的起始偏移量 我想创建一个新主题并从偏移量开始阅读10000 How 自从卡夫卡0 11 0 0 https issues apache org jira browse KAFKA 4743你可以使用脚本kafka con
  • Kafka - 如何同时使用过滤器和过滤器?

    我有一个 Kafka 流 它从一个主题获取数据 并且需要将该信息过滤到两个不同的主题 KStream

随机推荐

  • 为什么 pgAdmin 4 这么慢?

    postgreSQL 的 pgAdmin 4 GUI 非常慢 即使扩展服务器树或数据库树也需要花费太多时间 它们各自花费了近 30 秒的时间来展开 创建新数据库或表时它也会挂起 即使加载后 创建和保存新数据库也需要一分多钟的时间 几乎每次我
  • 回形针未保存,没有错误

    我被绊倒了 浏览了文档 教程等 但不确定我做错了什么 项目中的另一个模型是为 Paperclip 设置的 并且在测试时可以正常工作 它将附件文件信息保存和检索到数据库中 并将文件放入 public system 内的子文件夹中 我基本上将相
  • jQuery 剪贴板复制

    我需要剪贴板复制功能 即使我正在寻求帮助http plugins jquery com project copy http plugins jquery com project copy链接 但无法正常工作 li li
  • Spark:相当于数据帧中的 zipwithindex

    假设我有以下数据框 dummy data a 1 b 25 c 3 d 8 e 1 df sc parallelize dummy data toDF letter number 我想创建以下数据框 a 0 b 2 c 1 d 3 e 0
  • 从坐标中提取地址分量

    我正在尝试使用 R 进行反向地理编码 我首先使用了 ggmap 但无法让它与我的 API 密钥一起使用 现在我正在用 googleway 尝试 newframe c Front lat Front long Front lat Front
  • 共享苹果付费开发者帐户的选项?

    问题 这里正确的程序是什么 Do both the developer and account holder need paid Apple Developer accounts 正如在下面的上下文中所说 我的客户已经有一个 我还需要一个付
  • Kotlin 中 ArrayList() 和 mutableListOf() 之间的区别

    private val repositories mutableListOf
  • 在 ResourceDictionary 文件中使用 viewbox

    我有 ResourceFile1 xaml 文件 其内容
  • 在控制器中使用 Angular 的 $watch 是反模式吗?

    在我永无休止地追求以 正确的 角度方式做事的过程中 我阅读了很多有关如何让控制器观察角度服务中保存的模型变化的文章 一些网站 http www benlesh com 2013 08 angularjs watch digest and a
  • 如何在单页应用程序中实现 gmail 撰写窗口概念?

    我正在开发一个项目 用户可以更轻松地快速添加交易 我非常有兴趣做一些类似于 gmail 撰写弹出窗口在单页上所做的事情 我不知道如何实现这样的事情 请给我指导如何做这些事情 我有兴趣使用 AngularJS 构建它 P S 抱歉问了一个宽泛
  • 包 oracle.jdbc.driver 不存在

    以下代码出错 发生错误 1 import java sql public class DBConnect public static void main String a throws SQLException package oracle
  • 如何使用脚本。在 JADE 模板中

    我使用 JADE 模板使用 Express 框架创建了一个简单的节点应用程序 学习过程中一切都很顺利 直到我开始尝试运行一些客户端 js 但我不知道该怎么做 我需要在 app index js 中做一些事情来告诉节点它们吗 任何帮助将非常感
  • 从相关系数计算中删除异常值

    假设我们有两个数值向量x and y 之间的皮尔逊相关系数x and y是 谁 给的 坐标 x y 我怎样才能自动考虑仅一个子集x and y在计算中 比如90 最大化相关系数 If you really想要做到这一点 删除最大 绝对 残差
  • Boost 互斥范围锁

    我正在阅读 drdobbs com 上的 Boost Mutex 教程 并发现了这段代码 include
  • GNU gdb 如何显示源文件名和符号行

    当使用 GNU gdb 调试 c 进程时 list 命令将打印行但不告诉我文件名 设置断点可以显示我想要的所有行和文件信息 但我不想设置断点并且必须禁用或删除它 gdb b oyss funtion Breakpoint 13 at 0x8
  • 如何在 Google Chrome 扩展程序中创建侧边栏?

    我正在考虑在 Google Chrome 中创建一个侧边栏扩展 并读到有一个 API 调用 Google 禁用了它 那么也许有人知道如何创建并有例子吗 不幸的是 侧边栏 API 工作最近已停止 https bugs chromium org
  • Tkinter 按钮在禁用和更新后仍然响应点击

    我希望按钮启动命令 然后在执行时禁用并在执行完成后再次启用 当我单击该按钮时 它似乎被禁用并且命令被执行 但是 当我在禁用按钮时单击该按钮时 该命令会在第一次执行完成后第二次执行 似乎在第二次单击后 该按钮确实被禁用了 因为我可以在禁用它时
  • 在 NASM 中使用 istruc 时:“警告:尝试初始化 BSS 部分‘.bss’中的内存:忽略 [-w+other]”

    在搜索这个错误时我发现this https stackoverflow com questions 65731514 nasm attempt to initialize memory in bss section 77001709问题 但
  • 我需要删除分割块之间的一点空间

    我的两个分割块之间有一点空间 https i stack imgur com ysU0R png https i stack imgur com ysU0R png在这里你可以看到我的问题 我不明白为什么这些块会这样 body main w
  • kafka启动失败(版本0.8.0 beta1)

    我正在尝试在独立模式 在ec2上 上使用zookeeper版本 3 3 6 启动kafka服务 所以我运行 1 sbt update 2 sbt package 3 sbt assembly package dependency 然后启动z