今天碰到一个奇怪的问题, 如下图:
一、问题
1.问题截图
上午还可以发送消息成功的,下午突然就发送不了消息了。我就检查我代码的问题,是传递的格式不对,还是数据要求不对。网上的资料显示是因为ip和host文件的地址不对应。
但是我检查了我本地的配置,没有问题。
2,解决方法
最后解决方法是,kafka不行了,重新启动一下,然后就可以正常发送了。主要问题是kafka挂了,然后接收不到消息了,一直处于阻塞状态,超时之后,会抛出异常。
二、意外之喜
在解决问题的时候,我一直怀疑我的数据格式的问题,就断点走了好多,kafka的源码,学习了一下kafka的发送过程,记录下。
发送的主要代码:
(1)发送时,我们需要传入topic和data两个参数,除了这两个参数,还可以根据自己的具体业务,选择调用kafkaTemplate的不同的send方法。
//send是两个 参数,第一个参数是topic,第二个参数是Message数据
kafkaTemplate.send("demo",m.getId()+m.getMessage()+m.getSendTime());
(2)ProducerRecord<K,V> 对象实体
public final class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final K key;
private final V value;
private final Long timestamp;
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
if (topic == null) {
throw new IllegalArgumentException("Topic cannot be null.");
} else if (timestamp != null && timestamp.longValue() < 0L) {
throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
} else if (partition != null && partition.intValue() < 0) {
throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
} else {
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
}
}
这个实体中,会根据传入的参数不同,来进行走不同的方法。如果topic为空,直接抛出异常。本次代码中,走的是最后的一个else,partition和key都为空,直接赋值为null。
(3)doSend方法
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
final Producer<K, V> producer = this.getTheProducer();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sending: " + producerRecord);
}
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
try {
if (exception == null) {
future.set(new SendResult(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
}
} else {
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onError(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), exception);
}
}
} finally {
producer.close();
}
}
});
if (this.autoFlush) {
this.flush();
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sent: " + producerRecord);
}
return future;
}
小结:
现在对于kafka的基本内容和要素不是特别熟悉,对于以上的源码,还分析不出来。但是跑代码 的过程中,确实是了解到了这些,先积累,以后再详细分析吧。虽然很傻的问题,但是在解决这个问题的过程中,收获还是非常大的。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)