Kafka 错误:SLF4J:对 [org.apache.kafka.common.Cluster] 类型的对象调用 toString() 失败

2024-03-15

我尝试将 Gattle 与 Kafka 一起使用,但经常出现此错误:

01:32:53.933 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=12,client_id=producer-1}, body={topics=[test]})) to node 1011
SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.common.Cluster]
java.lang.NullPointerException
at org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:72)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at java.util.AbstractCollection.toString(AbstractCollection.java:462)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at org.apache.kafka.common.Cluster.toString(Cluster.java:151)
    at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:305)
    at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:277)
    at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:231)
    at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:298)
    at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:208)
    at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:212)
    at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:103)
    at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:88)
    at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:48)
    at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:273)
    at ch.qos.logback.classic.Logger.callAppenders(Logger.java:260)
    at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:442)
    at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:433)
    at ch.qos.logback.classic.Logger.debug(Logger.java:511)
    at org.apache.kafka.clients.producer.internals.Metadata.update(Metadata.java:133)
    at org.apache.kafka.clients.NetworkClient.handleMetadataResponse(NetworkClient.java:313)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:298)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:199)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)
01:32:53.937 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.producer.internals.Metadata - Updated cluster metadata version 14 to [FAILED toString()]

我不确定错误是否与代码有关,但这是我的 BasicSimulation.scala:

class BasicSimulation extends Simulation {

val kafkaConf = kafka
    .topic("test")
    .properties(
      Map(
        ProducerConfig.ACKS_CONFIG -> "1",
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092",
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
          "org.apache.kafka.common.serialization.ByteArraySerializer",
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
          "org.apache.kafka.common.serialization.ByteArraySerializer"))

  val scn = scenario("Kafka Test")
    .feed(csv("data.csv").circular)
    .exec(kafka("request")
    .send("${data}".getBytes: Array[Byte]))

  setUp(
    scn
      .inject(constantUsersPerSec(10) during(10 seconds)))
    .protocols(kafkaConf)
}

这是我的 docker-compose.yml 中与 kafka 相关的部分:

zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_LOG_CLEANER_ENABLE: 'true'
    volumes:
      - /tmp/docker.sock:/var/run/docker.sock

这是Kafka依赖版本问题。安装的Kafka版本和Kafka-client依赖版本不匹配。首先验证安装的 Kafaka 版本并相应更新 POM 中的 Kafka 客户端版本。我遇到了同样的问题并通过更正 kafka-client 版本解决了。

安装的卡夫卡版本:0.11

Maven Dependency:
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

现在它对我来说工作得很好。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 错误:SLF4J:对 [org.apache.kafka.common.Cluster] 类型的对象调用 toString() 失败 的相关文章

  • Scala 中奇怪的类型不匹配

    我希望这个问题还没有在其他地方得到解答 在这里没有找到答案 在我的本地化系统中 我有一个名为 Language 的类 class Language val name String dict HashMap String String def
  • 在scala / play框架中构建Json文件

    我正在使用 Play 框架和 Scala 我需要提供一个如下所示的输入 id node37 name 3 7 data children 如何使用 json 获取该格式 以下是 Play 框架网站上的示例 val JsonObject Js
  • ';'预期但发现“导入” - Scala 和 Spark

    我正在尝试使用 Spark 和 Scala 来编译一个独立的应用程序 我不知道为什么会收到此错误 topicModel scala 2 expected but import found error import org apache sp
  • IntelliJ IDEA 13:新的 Scala SBT 项目尚未生成 src 目录结构

    我按照 Jetbrains 网站上的入门视频设置 IntelliJ IDEA 13 1 Community Edition 以与 Scala 配合使用 Scala 插件 v0 36 431 已安装 当我使用向导创建一个新的 Scala SB
  • Spark日期格式问题

    我在火花日期格式中观察到奇怪的行为 实际上我需要转换日期yy to yyyy 日期转换后 日期应为 20yy 我尝试过如下 2040年后失败 import org apache spark sql functions val df Seq
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • 减少/折叠幺半群列表,但减少器返回任一

    我发现自己遇到过几次这样的情况 我有一个减速器 组合 fn 如下所示 def combiner a String b String Either String String a b asRight String 它是一个虚拟实现 但 fn
  • Scala 2.9 无法在 Windows XP 上运行“hello world”示例

    我正在尝试在 Windows XP 上使用 scala 2 9 1 Final 运行 HelloWorld 示例 object HelloWorld extends App println Hello World 文件另存为Hello sc
  • Scala:如何将可变参数指定为类型?

    代替 def foo configuration String String 我希望能够写 type Configuration String String def foo configuration Configuration 主要用例是
  • 具有两个通用参数的上下文边界

    在 Scala 中 我可以使用上下文边界 def sort T Ordered t Seq T 与以下意思相同 def sort T t Seq T implicit def Ordered T 如果我有一个带有两个泛型参数的类怎么办 IE
  • HashPartitioner 是如何工作的?

    我阅读了文档HashPartitioner http spark apache org docs 1 3 1 api java index html org apache spark HashPartitioner html 不幸的是 除了
  • 更改 build.sbt 自定义任务中的版本

    我在 build sbt 中定义了一个自定义任务 val doSmth taskKey Unit smth doSmth version 1 0 SNAPSHOT 但它不会改变版本 我真正想要的是自定义 sbt 发布任务 它将始终将相同的版
  • Play框架:单属性案例类的JSON读取

    我正在尝试为包含单个属性的案例类创建隐式 JSON Reads 但收到错误 Reads Nothing 不符合预期类型 这是代码 import play api libs functional syntax import play api
  • 理解 Scala FP 库

    只是为了让那些想要开始使用 Scala FP 库 在纯 FP 方面变得更好的人快速清晰地了解 有人能澄清猫和猫效应 猫效应 IO 之间的区别 关系吗 最重要的是 齐奥和莫尼克斯对此有何看法 最后 与 ScalaZ 7 8 有何关系 到目前为
  • Scala中有类似Java Stream的“peek”操作吗?

    在Java中你可以调用peek x gt println x 在 Stream 上 它将对每个元素执行操作并返回原始流 这与 foreach 不同 foreach 是 Unit Scala 中是否有类似的东西 最好是适用于所有 Monady
  • scala中的反引号有什么用[重复]

    这个问题在这里已经有答案了 我在一本书上找到了以下代码 val list List 5 4 3 2 1 val result 0 list running total next element running total next elem
  • Slick和bonecp:org.postgresql.util.PSQLException:FATAL:抱歉,太多客户端已经错误

    当我在本地开发应用程序时 我使用以下命令启动我的 play2 应用程序sbt run 我喜欢如何更改代码 然后重新加载浏览器以查看我的更改 在大约 10 次代码更改之后 我收到 postgresql 太多连接错误 见下文 我的数据库连接使用
  • 如何在超时的情况下在单独的调度程序上运行 Akka Streams 图?

    这个问题是基于我做过的一个宠物项目 这个SO https stackoverflow com questions 34641861 akka http blocking in a future blocks the server 34645
  • 如何在 scala repl 和 sbt 控制台中关闭/打开 typer 阶段

    是否可以在不退出当前会话的情况下切换阶段 我尝试进入 power 模式 但它仍然不打印类型 在SBT中只需添加以下设置 set scalacOptions in Compile console Xprint typer 在 REPL 中你可
  • 如何在 sbt 控制台中加载 scala 文件? [复制]

    这个问题在这里已经有答案了 可能的重复 将 Scala 文件加载到解释器中以使用函数 https stackoverflow com questions 7383436 load scala file into interpreter to

随机推荐