继上一篇Kafka安装以及环境准备文章后,这一小节是讲解如何利用Java客户端kafka-clients库进行消息的发送。
工程结构(maven工程)
maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
producer代码
public class MyProducer {
public static void main(String[] args) {
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers", "192.168.8.135:9092");
kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(kafkaProperties);
ProducerRecord record = new ProducerRecord("test", "name", "water");
try {
Future result = producer.send(record);
System.out.println(result.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码解析:
- 主要是两个对象:producer和record,record组装想要发送的数据,然后通过producer进行发送。
- bootstrap.servers是表示Kafka服务器broker的地址,一般建议提供至少两个broker的地址信息,通过逗号分隔,一旦其中一个死机了,生产者仍然能连接到集群上,我这里的本地环境是单机,所以只填写了一个地址
- key.serializer是表示消息中的键的序列化器,网络传输需要进行序列化成字节数组
- value.serializer是表示消息中值的序列化器
可能出现的错误
-
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms. 造成该错误的原因有可能是防火墙问题,请把虚拟机的防火墙关闭就ok
-
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 30534 ms has passed since batch creation plus linger time 这种错误一开始让我百思不得其解,明明Kafka服务器broker的地址没有问题,为什么就是发送数据失败呢?最后通过debug发现了获取到的cluster的地址居然是bogon:9092(下面的图一),这个bogon是什么东西,为什么不是我在代码中的IP地址呢?个人推测是不是host name的问题,于是在kafka中的server.properties添加了图二中的host.name配置,重启Kafka就OK了
简单的producer发送消息的例子到此结束,大部分时间都花在了解决异常问题上,不过收获不少。