1.美图
2.背景
一段kafka写入程序,不晓得为啥突然发现很多奇怪的日志。
kafka 多线程发送数据,然后在本地是可以的,在服务器上是偶现的
我写了一个本地程序多线程生产数据,发现是没有问题的。
@Test
public void multiThreadProducer() throws InterruptedException {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
int i = 0;
while (i < 20) {
new Thread(new Runnable() {
@Override
public void run() {
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(prop);
Integer messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
RecordMetadata result = null;
try {
result = producer.send(new ProducerRecord<Integer, String>("topic_lcc", messageNo, messageStr)).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("发送消息" + result);
++messageNo;
}
}
}, "thread" + i).start();
i++;
}
Thread.sleep(Integer.MAX_VALUE);
}
猜测是Host连接问题:参考添加链接描述
但是,我们是采用Ip直接访问的,应该也不是这个错误,并且telnet zk和kafka的地址端口,都是通畅的。
"KafkaProducer Closing the Kafka producer with timeoutMillis"是KafkaProducer在关闭时遇到的超时错误。这通常意味着关闭KafkaProducer实例时花费的时间超过了预期的超时时间。
要解决这个问题,可以考虑以下几个步骤:
-
增加超时时间:尝试增加timeoutMillis的值,给KafkaProducer更多的时间来关闭。可以根据实际情况逐渐增加超时时间,直到不再出现超时错误。
-
检查网络连接:确保KafkaProducer实例与Kafka集群之间的网络连接正常。检查网络配置、防火墙设置等,确保能够正常连接到Kafka集群。
-
检查Kafka集群状态:验证Kafka集群的健康状态,确保没有任何故障或资源耗尽的情况。可以使用Kafka提供的管理工具(如kafka-topics.sh、kafka-consumer-groups.sh等)进行集群状态检查。
-
代码优化:检查代码中是否存在任何潜在的性能问题或资源泄漏。确保在使用完KafkaProducer后正确关闭和释放相关资源。
-
更新Kafka版本:如果您正在使用较旧的Kafka版本,考虑升级到最新版本,以便获得更好的稳定性和性能。
-
联系支持:如果以上步骤都没有解决问题,可以联系Kafka的技术支持或在Kafka社区寻求帮助,他们可能能够提供更具体的解决方案。
请注意,具体解决方法可能因您的环境和使用情况而异。建议根据实际情况进行调试和排查。