【kafka】Exception thrown when sending a message with key='null' and payload='lizhenjuan;99' to topic

2023-05-16

 今天碰到一个奇怪的问题, 如下图:


一、问题


1.问题截图


上午还可以发送消息成功的,下午突然就发送不了消息了。我就检查我代码的问题,是传递的格式不对,还是数据要求不对。网上的资料显示是因为ip和host文件的地址不对应。

kafka Exception thrown when sending a message with key='null'

但是我检查了我本地的配置,没有问题。


2,解决方法

最后解决方法是,kafka不行了,重新启动一下,然后就可以正常发送了。主要问题是kafka挂了,然后接收不到消息了,一直处于阻塞状态,超时之后,会抛出异常。


二、意外之喜

在解决问题的时候,我一直怀疑我的数据格式的问题,就断点走了好多,kafka的源码,学习了一下kafka的发送过程,记录下。


发送的主要代码:

(1)发送时,我们需要传入topic和data两个参数,除了这两个参数,还可以根据自己的具体业务,选择调用kafkaTemplate的不同的send方法。

	//send是两个 参数,第一个参数是topic,第二个参数是Message数据
    kafkaTemplate.send("demo",m.getId()+m.getMessage()+m.getSendTime());


2)ProducerRecord<K,V> 对象实体

public final class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final K key;
    private final V value;
    private final Long timestamp;

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        if (topic == null) {
            throw new IllegalArgumentException("Topic cannot be null.");
        } else if (timestamp != null && timestamp.longValue() < 0L) {
            throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        } else if (partition != null && partition.intValue() < 0) {
            throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        } else {
               this.topic = topic;
            this.partition = partition;
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
        }
    }

这个实体中,会根据传入的参数不同,来进行走不同的方法。如果topic为空,直接抛出异常。本次代码中,走的是最后的一个else,partition和key都为空,直接赋值为null。


(3)doSend方法

 protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        final Producer<K, V> producer = this.getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }

        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
        producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                try {
                    if (exception == null) {
                        future.set(new SendResult(producerRecord, metadata));
                        if (KafkaTemplate.this.producerListener != null && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
                            KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
                        }
                    } else {
                        future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onError(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), exception);
                        }
                    }
                } finally {
                    producer.close();
                }

            }
        });
        if (this.autoFlush) {
            this.flush();
        }

        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }

        return future;
    }


小结:

   现在对于kafka的基本内容和要素不是特别熟悉,对于以上的源码,还分析不出来。但是跑代码 的过程中,确实是了解到了这些,先积累,以后再详细分析吧。虽然很傻的问题,但是在解决这个问题的过程中,收获还是非常大的。





本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【kafka】Exception thrown when sending a message with key='null' and payload='lizhenjuan;99' to topic 的相关文章

随机推荐

  • 【Python】使用网络调试助手传输数据(UDP协议)

    目录 1 发送与接收数据代码 2 网络调试助手配置 3 网络通信的流程 1 发送与接收数据代码 导入socket模块 import socket 判断程序入口 if name 61 61 39 main 39 创建套接字 xff0c AF
  • 课程设计——学生成绩管理系统 C语言

    课程设计 学生成绩管理系统 基本要求 xff1a 1 程序功能 编程实现一个基础的成绩管理系统 2 设计目的 通过本程序综合掌握结构体类型 指针 函数 文件等知识的综合使用 3 功能要求 可扩充功能 1 实现简单的菜单设计 如下所示 1添加
  • Foxmail登陆失败-NO LOGIN Login error password error

    问题描述 xff1a 因为今天在一个项目中需要用到邮箱的客户端授权码 xff0c 所以我在网易邮箱网页版中设置了授权码 xff0c 但是在后来在Foxmail客户端收取邮件时出现了问题 xff1a 我确定自己没有改过密码 xff0c 所以又
  • 远程登录阿里云服务器时,提示Permission denied, please try again.

    在使用ssh命令 xff0c 远程登录阿里云服务器时 xff0c 提示Permission denied please try again 百分之九十的可能 xff1a 你的密码错了 xff01 你需要注意的是 xff0c 我们的实例有两个
  • 为什么使用token?session与token的区别

    目录 一 session的状态保持及弊端 二 token认证机制 一 session的状态保持及弊端 当用户第一次通过浏览器使用用户名和密码访问服务器时 xff0c 服务器会验证用户数据 xff0c 验证成功后在服务器端写入session数
  • 在线加密解密工具

    http tool oschina net encrypt 一个在线加解密工具 xff0c 界面如图 xff1a
  • Django到底是MVC还是MVT?以及MVC和MVT的区别

    最近一直被一些问题所困惑 xff1a Django是遵循MVC还是MVT xff1f MVC和MVT到底有哪些区别 xff1f MVC可以继续拆分吗 xff1f 在网络上浏览了很多无关痛痒的文章 xff0c 无非是M V C分别代表什么 x
  • 【Python】使用Counter方法统计字符串每个字符出现的次数

    from collections import Counter a 61 34 a lskdh 96 foiegn 96 96 as ldnf asd 121 2ljladsfkja 96 sdijfhaosjlfd gjsdfg as d
  • 【Python】时间戳的格式化输出

    将当前时间按照 2019 02 25 10 xff1a 26 xff1a 55 输出 xff1a import datetime now time 61 datetime datetime now strftime 34 Y m d H M
  • 【Python】输出给定范围内的所有素数

    质数 xff08 prime number xff09 又称素数 xff0c 有无限个 质数定义为在大于1的自然数中 xff0c 除了1和它本身以外不再有其他因数 首先定义一个空列表用来存放所有的素数 sushu 61 遍历给定的范围 xf
  • Java 通过map构造树形结构

    在开发中 xff0c 经常会有将 数据组装成为树形结构的场景 xff0c 除了可以通过递归实现 xff0c 还可以通过map 组装实现 一 xff0c 构造基本数据 import apple laf JRSUIUtils import co
  • 【无标题】es搜索基本操作

    一 xff0c 准备数据 1 创建索引 PUT lagou book 2 创建mapping PUT lagou book doc mapping 34 properties 34 34 description 34 34 type 34
  • ubuntu 环境使用eclipse和JLink对uboot进行调试

    ubuntu 12 04 Jlink v8 Jlink Linux v422a tar gz Linux J Link ARM V4 34 Windows 下载eclipse C C 43 43 版本 http www eclipse or
  • 【ES】常用操作工具

    工欲善其事 xff0c 必先利于器 xff0c es使用过程中 xff0c 有些工具能帮助我们快速的上手和使用 一 es head es head 是一款专门针对 es的客户端工具elasticSearch配置包 是一个基于node js的
  • 【es】基本概念理解

    一 xff0c 初识es 1 是什么 xff1f ElasticSearch 简称es 开源的分布式的全文搜索引擎 xff0c 可以近乎实时的存储检索数据 xff0c es使用java开发 xff0c 并且使用Lucene作为核心实现搜索功
  • 无法安装net framework 3.5 的解决方法

    电脑刚重装了Windows8 1系统 xff0c 然后安装数据库的时候 xff0c 却出现了这样的问题 xff1a 您的电脑上的应用需要使用以下windows功能 问题原因是 xff1a 在安装系统的时候 xff0c NET Framewo
  • 【计算机网络原理】第四章 数据链路层

    今天主要梳理了一下数据链路层的内容 xff0c 如下 一 宏观规划 综合数据链路层的整体 xff0c 分为两大部分 xff0c 第一部分讲解数据链路层的功能 xff0c 第二部分讲解数据链路层的功能 这些协议 xff0c 其实还是为了实现数
  • 【redis】关系型数据库 VS 非关系型数据库

    一 关系型数据库 xff1f 1 概念 关系型数据库是指采用了关系模型来组织数据的数据库 简单来说 xff0c 关系模式就是二维表格模型 主要代表 xff1a SQL Server xff0c Oracle Mysql PostgreSQL
  • resultful风格接口

    一 产生背景 网络应用程序 xff0c 越来越流行前端和后端的分离设计 当前的发展趋势是前端的设计层出不穷 比如 xff1a 各种型号的手机 平板灯其他设计 因为必须要一种统一的机制方便不同的前端和后端进行通信 这就导致了API结构的流行
  • 【kafka】Exception thrown when sending a message with key='null' and payload='lizhenjuan;99' to topic

    今天碰到一个奇怪的问题 xff0c 如下图 xff1a 一 问题 1 问题截图 上午还可以发送消息成功的 xff0c 下午突然就发送不了消息了 我就检查我代码的问题 xff0c 是传递的格式不对 xff0c 还是数据要求不对 网上的资料显示