无法向 kafka 主题发送消息

2024-05-05

我正在使用 Kafka、Play 以及 Scala。 这是我的代码,我想在其中发送消息到kafka服务器,主题名称是“测试主题”。 尽管我没有在主题中看到我发送的消息,但我没有收到任何错误 这里有什么问题吗

 import kafka.producer.ProducerConfig
    import java.util.Properties
    import kafka.producer.Producer
    import scala.util.Random
    import kafka.producer.Producer
    import kafka.producer.Producer
    import kafka.producer.Producer
    import kafka.producer.KeyedMessage
    import java.util.Date

    object KafkaProducerLocal extends App {

      sendMessage

      def sendMessage {

        val topicName = "test-topic"
        try {
          val rnd = new Random()
          val props = new Properties()
          props.put("metadata.broker.list", "localhost:9092") //kafka 
          props.put("zk.connect", "localhost:2181");  //zookeeper
          props.put("serializer.class", "kafka.serializer.StringEncoder")
          props.put("producer.type", "async")


          val config = new ProducerConfig(props)
          val producer = new Producer[String, String](config)
          val t = System.currentTimeMillis()
          for (nEvents <- Range(0, 10)) {
            val ip = "192.168.2." + rnd.nextInt(255);
            val data = new KeyedMessage[String, String](topicName, ip, "Swapnil Test Data" + nEvents);
            producer.send(data);
          }

          producer.close();
        } catch {
          case t: Throwable => t.printStackTrace()
        }
      }

    }

你的代码没有任何问题。

  • 检查您的 log4j 属性以查看日志
  • 您运行的kafka版本与您的客户端版本相同。
  • 首先创建一个主题link http://kafka.apache.org/documentation.html#quickstart_createtopic
  • 检查服务器是否正常工作,主题是否已创建,是否可以通过控制台生产者和消费者发送和接收消息example http://kafka.apache.org/documentation.html#quickstart_send

应用日志

2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Verifying properties
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Property metadata.broker.list is overridden to localhost:9092
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Property producer.type is overridden to async
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Property serializer.class is overridden to kafka.serializer.StringEncoder
2016-04-19 01:12:34 WARN  kafka.utils.Logging$class:83 - Property zk.connect is not valid
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/vishnu/.m2/repository/org/slf4j/slf4j-log4j12/1.7.12/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/vishnu/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Shutting down producer
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Begin shutting down ProducerSendThread
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(topic-test)
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Connected to localhost:9092 for producing
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Disconnecting from localhost:9092
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Connected to HMECL001076:9092 for producing
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Shutdown ProducerSendThread complete
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Closing all sync producers
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Disconnecting from HMECL001076:9092
2016-04-19 01:12:34 INFO  kafka.utils.Logging$class:68 - Producer shutdown completed in 298 ms

控制台消费者输出

 /opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic-test --property group.id cs1 --from-beginning
Swapnil Test Data3
Swapnil Test Data9
Swapnil Test Data2
Swapnil Test Data5
Swapnil Test Data6
Swapnil Test Data8
Swapnil Test Data0
Swapnil Test Data1
Swapnil Test Data4
Swapnil Test Data7
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法向 kafka 主题发送消息 的相关文章

  • 如何在 Apache Spark 中通过 DStream 使用特征提取

    我有通过 DStream 从 Kafka 到达的数据 我想进行特征提取以获得一些关键词 我不想等待所有数据的到达 因为它是可能永远不会结束的连续流 所以我希望以块的形式执行提取 如果准确性会受到一点影响 对我来说并不重要 到目前为止 我整理
  • 多个 scala 库导致 intellij 出错?

    我正在使用 intellij 14 和 scala 2 11 6 使用 homebrew 安装并使用符号链接 ln s usr local Cellar scala 2 11 6 libexec src usr local Cellar s
  • Scala 2.9 无法在 Windows XP 上运行“hello world”示例

    我正在尝试在 Windows XP 上使用 scala 2 9 1 Final 运行 HelloWorld 示例 object HelloWorld extends App println Hello World 文件另存为Hello sc
  • Scala:如何将可变参数指定为类型?

    代替 def foo configuration String String 我希望能够写 type Configuration String String def foo configuration Configuration 主要用例是
  • Scala 的代码覆盖率工具 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 如何使用 Spark 2 屏蔽列?

    我有一些表 我需要屏蔽其中的一些列 要屏蔽的列因表而异 我正在读取这些列application conf file 例如 对于员工表如下所示 id name age address 1 abcd 21 India 2 qazx 42 Ger
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个
  • Scala Spark 包含与不包含

    我可以使用 contains 过滤 RDD 中的元组 如下所示 但是使用 不包含 来过滤 RDD 又如何呢 val rdd2 rdd1 filter x gt x 1 contains 我找不到这个的语法 假设这是可能的并且我没有使用Dat
  • 在scala 2.13中,为什么有时无法显式调用类型类?

    这是 Shapeless 2 3 3 中的一个简单示例 val book author gt gt Benjamin Pierce title gt gt Types and Programming Languages id gt gt 2
  • 如何执行仅匹配正则表达式的测试?

    在 sbt 0 10 1 中 我经常使用test only缩小我的测试数量 sbt gt test only com example MySpec 但是 我想缩小范围 以便只运行名称 描述与正则表达式匹配的测试 是否有一些语法可以实现这样的
  • Spark 2.2 无法将 df 写入 parquet

    我正在构建一个聚类算法 我需要存储模型以供将来加载 我有一个具有以下架构的数据框 val schema new StructType add StructField uniqueId LongType add StructField tim
  • 为什么这些类型参数不符合类型细化?

    为什么此 Scala 代码无法进行类型检查 trait T type A trait GenFoo A0 S lt T type A A0 trait Foo S lt T extends GenFoo S A S 我不明白为什么 类型参数
  • 如何将 Java 地图转换为在 Scala 中使用?

    我正在开发一个 Scala 程序 该程序调用 Java 库中的函数 处理结果并生成 CSV 有问题的 Java 函数如下所示 Map
  • Java 拥有闭包后 Scala 的优势 [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 随着 Java 中添加了闭包 作为语言选择 Scala 相对于 Java 的优势是什么 有人可以详细说明一下有什么优点吗 除了闭包 J
  • 正确使用术语 Monoid

    从下面的例子来看 我认为这样的说法是正确的String在串联运算下定义了一个幺半群 因为它是关联二元运算 并且String碰巧有一个身份元素 它是一个空字符串 scala gt Jane Doe Jane Doe res0 Boolean
  • 如何在 Scala 2.11 中查找封闭源文件的名称

    在编译时 如何在 scala 2 11 中检索当前源文件 编写代码的位置 的名称 这是一种实际有效的方法 val srcFile new Exception getStackTrace head getFileName println sr
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试

随机推荐