我正在使用 Kafka、Play 以及 Scala。
这是我的代码,我想在其中发送消息到kafka服务器,主题名称是“测试主题”。
尽管我没有在主题中看到我发送的消息,但我没有收到任何错误
这里有什么问题吗
import kafka.producer.ProducerConfig
import java.util.Properties
import kafka.producer.Producer
import scala.util.Random
import kafka.producer.Producer
import kafka.producer.Producer
import kafka.producer.Producer
import kafka.producer.KeyedMessage
import java.util.Date
object KafkaProducerLocal extends App {
sendMessage
def sendMessage {
val topicName = "test-topic"
try {
val rnd = new Random()
val props = new Properties()
props.put("metadata.broker.list", "localhost:9092") //kafka
props.put("zk.connect", "localhost:2181"); //zookeeper
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("producer.type", "async")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
val t = System.currentTimeMillis()
for (nEvents <- Range(0, 10)) {
val ip = "192.168.2." + rnd.nextInt(255);
val data = new KeyedMessage[String, String](topicName, ip, "Swapnil Test Data" + nEvents);
producer.send(data);
}
producer.close();
} catch {
case t: Throwable => t.printStackTrace()
}
}
}
你的代码没有任何问题。
- 检查您的 log4j 属性以查看日志
- 您运行的kafka版本与您的客户端版本相同。
- 首先创建一个主题link http://kafka.apache.org/documentation.html#quickstart_createtopic
- 检查服务器是否正常工作,主题是否已创建,是否可以通过控制台生产者和消费者发送和接收消息example http://kafka.apache.org/documentation.html#quickstart_send
应用日志
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Verifying properties
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Property metadata.broker.list is overridden to localhost:9092
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Property producer.type is overridden to async
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Property serializer.class is overridden to kafka.serializer.StringEncoder
2016-04-19 01:12:34 WARN kafka.utils.Logging$class:83 - Property zk.connect is not valid
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/vishnu/.m2/repository/org/slf4j/slf4j-log4j12/1.7.12/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/vishnu/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Shutting down producer
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Begin shutting down ProducerSendThread
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(topic-test)
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Connected to localhost:9092 for producing
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Disconnecting from localhost:9092
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Connected to HMECL001076:9092 for producing
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Shutdown ProducerSendThread complete
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Closing all sync producers
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Disconnecting from HMECL001076:9092
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Producer shutdown completed in 298 ms
控制台消费者输出
/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic-test --property group.id cs1 --from-beginning
Swapnil Test Data3
Swapnil Test Data9
Swapnil Test Data2
Swapnil Test Data5
Swapnil Test Data6
Swapnil Test Data8
Swapnil Test Data0
Swapnil Test Data1
Swapnil Test Data4
Swapnil Test Data7
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)