【kafka】Exception thrown when sending a message with key='null' and payload='lizhenjuan;99' to topic

2023-05-16

 今天碰到一个奇怪的问题, 如下图:


一、问题


1.问题截图


上午还可以发送消息成功的,下午突然就发送不了消息了。我就检查我代码的问题,是传递的格式不对,还是数据要求不对。网上的资料显示是因为ip和host文件的地址不对应。

kafka Exception thrown when sending a message with key='null'

但是我检查了我本地的配置,没有问题。


2,解决方法

最后解决方法是,kafka不行了,重新启动一下,然后就可以正常发送了。主要问题是kafka挂了,然后接收不到消息了,一直处于阻塞状态,超时之后,会抛出异常。


二、意外之喜

在解决问题的时候,我一直怀疑我的数据格式的问题,就断点走了好多,kafka的源码,学习了一下kafka的发送过程,记录下。


发送的主要代码:

(1)发送时,我们需要传入topic和data两个参数,除了这两个参数,还可以根据自己的具体业务,选择调用kafkaTemplate的不同的send方法。

	//send是两个 参数,第一个参数是topic,第二个参数是Message数据
    kafkaTemplate.send("demo",m.getId()+m.getMessage()+m.getSendTime());


2)ProducerRecord<K,V> 对象实体

public final class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final K key;
    private final V value;
    private final Long timestamp;

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        if (topic == null) {
            throw new IllegalArgumentException("Topic cannot be null.");
        } else if (timestamp != null && timestamp.longValue() < 0L) {
            throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        } else if (partition != null && partition.intValue() < 0) {
            throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        } else {
               this.topic = topic;
            this.partition = partition;
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
        }
    }

这个实体中,会根据传入的参数不同,来进行走不同的方法。如果topic为空,直接抛出异常。本次代码中,走的是最后的一个else,partition和key都为空,直接赋值为null。


(3)doSend方法

 protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        final Producer<K, V> producer = this.getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }

        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
        producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                try {
                    if (exception == null) {
                        future.set(new SendResult(producerRecord, metadata));
                        if (KafkaTemplate.this.producerListener != null && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
                            KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
                        }
                    } else {
                        future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onError(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), exception);
                        }
                    }
                } finally {
                    producer.close();
                }

            }
        });
        if (this.autoFlush) {
            this.flush();
        }

        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }

        return future;
    }


小结:

   现在对于kafka的基本内容和要素不是特别熟悉,对于以上的源码,还分析不出来。但是跑代码 的过程中,确实是了解到了这些,先积累,以后再详细分析吧。虽然很傻的问题,但是在解决这个问题的过程中,收获还是非常大的。





本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【kafka】Exception thrown when sending a message with key='null' and payload='lizhenjuan;99' to topic 的相关文章

  • 条件表达式的 raise 语句

    我如何优雅地实现 武士原则 http c2 com cgi wiki SamuraiPrinciple 胜利归来 或者根本不归来 关于我的职能 return
  • 将异常从 Objective-C 引发回 Javascript

    JavascriptCore 允许我们通过 JSExport 协议将 Objective C 类公开给 javascript 比方说 在公开的 Objective C 类的方法之一中 遇到了需要引发异常的情况 有没有办法创建所述异常并引发它
  • Laravel 5 重写异常处理程序

    我想知道是否可以重写 Laravel 5 中的应用程序异常处理程序类 而不将其扩展到另一个类 也许更好的说法是我想要它 这样就不会App Exceptions Handler将在异常时调用 但是我自己的处理程序之一 提前致谢 正如 Digi
  • 抛出旧异常的同时抛出新异常

    如果在由异常引起的堆栈展开期间 C 中的析构函数抛出异常 则程序将终止 这就是为什么析构函数永远不应该在 C 中抛出 示例 struct Foo Foo throw 2 whoops already throwing 1 at this p
  • Java中如何检查抛出的异常类型?

    如果一个操作捕获了多个异常 如何确定捕获了哪种类型的异常 这个例子应该更有意义 try int x doSomething catch NotAnInt ParseError e if thrown error is NotAnInt li
  • Xunit 中的测试异常 ()

    我正在尝试对此方法编写 Xunit 测试 public async Task
  • 为什么我不能在 Java 8 lambda 表达式中引发异常? [复制]

    这个问题在这里已经有答案了 我升级到 Java 8 并尝试用新的 lamdba 表达式替换通过 Map 的简单迭代 该循环搜索空值 如果找到则抛出异常 旧的 Java 7 代码如下所示 for Map Entry
  • 发生异常时记录什么?

    public void EatDinner string appetizer string mainCourse string dessert try Code catch Exception ex Logger Log Error in
  • Android JNI异常处理

    我需要在 JNI 代码中实现异常处理 我不擅长 jni 找不到任何好的例子 因此 请提供完整的示例 这就是我正在做的 jint JNI OnLoad JavaVM vm void reserved jint result 1 g JavaV
  • 为什么要在 php 中使用异常处理?

    我已经编写 PHP 很长一段时间了 但对 PHP 5 的了解并不多 我了解 PHP 5 中的异常处理已经有一段时间了 但从未真正研究过它 经过快速谷歌后 使用异常处理似乎毫无意义 我看不出使用它比仅使用一些 if 语句 也许还有我自己的错误
  • 如何处理过时的连接?

    我们的应用程序是一个 J2EE 应用程序 在 Websphere 6 1 上通过 Mainframe DB2 后端使用 Struts EJB Hibernate 最近已投入生产 我们收到过时的连接异常当用户第一次或有时登录应用程序时 此异常
  • Polly 策略记录异常并重新抛出

    我考虑使用Polly https github com App vNext Polly handing return values and policytresult创建策略来记录异常并重新抛出 我没有找到允许它开箱即用的现有方法 但我看到
  • 从 AspectJ 获取返回值或异常?

    我能够从建议的方法调用中获取签名和参数 但我无法弄清楚如何获取返回值或异常 我有点假设它可以通过使用 around 并继续以某种方式完成 您可以使用after returning and after throwing开头的建议以下文件 ht
  • 消费者关闭了输入通道或发生错误。事件=0x8

    D AndroidRuntime 11752 D AndroidRuntime 11752 gt gt gt gt gt gt AndroidRuntime START com android internal os RuntimeInit
  • App Engine 中超时异常的名称是什么?

    出于某种原因 我的印象是它只是被称为 超时 但事实似乎并非如此 Thanks 对于数据存储区调用 例外情况是 google appengine ext db Timeout 对于超出的总 挂钟 持续时间 例外情况是 google appen
  • 如何获取C#中的异常错误代码

    try object result processClass InvokeMethod Create methodArgs catch Exception e Here I was hoping to get an error code 当
  • 位置 0 处没有行

    cmd CommandText select from product where prod code Trim txtprod code Text and branch w location and avail stock lt gt 0
  • C# 中的异常转换

    为什么我会得到一个InvalidCastException当尝试这样做时 throw ArgumentNullException new Exception errormessage null 这是以下函数的简化版本 public stat
  • Windows批处理支持异常处理吗?

    Windows批处理编程支持异常处理吗 如果没有 是否有任何方法可以有效地模拟批处理文件中的异常处理 我希望能够在批处理脚本中的任何 CALL 级别的任何位置 抛出异常 并重复弹出 CALL 堆栈 直到找到活动的 TRY 块 然后 CATC
  • 什么时候应该使用 ThrowHelper 方法而不是直接抛出?

    什么时候适合使用投掷助手方法而不是直接抛出 void MyMethod throw new ArgumentNullException paramName ThrowArgumentNullException paramName void

随机推荐

  • C#多线程加载控件界面卡死的解决

    先听一个故事 xff1a 有一个老板忙不过来 xff0c 于是招一个员工去负责某些事务 这样老板就可以腾出时间处理其它事 后来发现员工干不下去 xff0c 原因是干活需要花费 xff0c 没有老板的认可 xff0c 财务不给批钱 这是原则
  • vs2022账户无法登录的解决

    因为昨天重做系统 xff0c 重装了vs2022 xff0c 又涉及到登录的问题 xff0c 一时想不起来之前怎么解决的了 xff0c 想起来以后决定还是记录下来 我遇到的问题是下面这样的 xff0c 提示脚本错误 xff0c 要求升级最新
  • 使用centos7+bind9构建内网私有dns

    有这样一种场景 xff0c 局域网内有一个为网内用户提供服务的机器 xff0c 我们希望像访问互联网站点一样去访问它 xff0c 而不用记忆ip地址和端口 xff0c 比如在web浏览器地址栏输入http www nx com就可以访问它
  • PowerBuilder制作纸牌游戏

    本文记录的是2001年我大三那年假期制作小游戏的思路 xff0c 希望给在读计算机专业的朋友们一些参考 xff0c 如果还没来得及动手尝试的同学 xff0c 一定要勇于动手 你们一定比我做得更好 制作动力 xff1a 我有一个好朋友 xff
  • 【Qt线程-1】this,volatile,exec(),moveToThread()

    背景 xff1a 个人学习多线程控制 xff0c 写了一些博文用于记录 xff1a Qt线程 2 事件循环 xff08 QCoreApplication processEvents xff0c exec xff09 的应用 Qt线程 3 使
  • 【Qt线程-2】事件循环(QCoreApplication::processEvents,exec)的应用

    背景 xff1a 个人学习多线程控制 xff0c 写了一些博文用于记录 Qt线程 1 this xff0c volatile xff0c exec xff0c moveToThread Qt线程 3 使用事件循环 xff0c 信号 xff0
  • 【Qt线程-4】事件循环嵌套,BlockingQueuedConnection与QWaitCondition比较

    背景 xff1a 个人学习多线程控制 xff0c 写了一些博文用于记录 xff1a Qt线程 1 this xff0c volatile xff0c exec xff0c moveToThread Qt线程 2 事件循环 xff08 QCo
  • 【Qt样式(qss)-3】几套配色方案

    背景 xff1a 之前写过有关qss的博客 xff0c 记录了如何使用qt手册 xff0c 以及在项目中如何使用qss的体验 经过实践 xff0c 我归纳了自己需要的qss使用方法 xff0c 使之尽量高效 xff0c 容易维护 Qt样式
  • 【Qt样式(qss)-4】应用到QMdiArea不生效的解决

    背景 xff1a 之前写记录过几篇qss相关内容 xff1a Qt样式 xff08 qss xff09 1 手册小结 xff08 附例 xff1a 软件深色模式 xff09 Qt样式 xff08 qss xff09 2 使用小结 xff08
  • Google play billing(Google play 内支付) 下篇

    开篇 xff1a 如billing开发文档所说 xff0c 要在你的应用中实现In app Billing只需要完成以下几步就可以了 第一 把你上篇下载的AIDL文件添加到你的工程里 xff0c 第二 把 lt uses permissio
  • Qt creator中操作QAction加入QToolBar

    背景 xff1a 个人笔记 我之前没有系统化学习过任何资料 xff0c 使用很多工具都是按需出发 xff0c 直接上手 xff0c 遇到问题再研究的 所以会有一些弯路 本文言语中难免有对个人情绪的生动描述 xff0c 希望不要影响读者心情
  • Java 通过map构造树形结构

    在开发中 xff0c 经常会有将 数据组装成为树形结构的场景 xff0c 除了可以通过递归实现 xff0c 还可以通过map 组装实现 一 xff0c 构造基本数据 import apple laf JRSUIUtils import co
  • 【无标题】es搜索基本操作

    一 xff0c 准备数据 1 创建索引 PUT lagou book 2 创建mapping PUT lagou book doc mapping 34 properties 34 34 description 34 34 type 34
  • 【ES】常用操作工具

    工欲善其事 xff0c 必先利于器 xff0c es使用过程中 xff0c 有些工具能帮助我们快速的上手和使用 一 es head es head 是一款专门针对 es的客户端工具elasticSearch配置包 是一个基于node js的
  • 【es】基本概念理解

    一 xff0c 初识es 1 是什么 xff1f ElasticSearch 简称es 开源的分布式的全文搜索引擎 xff0c 可以近乎实时的存储检索数据 xff0c es使用java开发 xff0c 并且使用Lucene作为核心实现搜索功
  • 无法安装net framework 3.5 的解决方法

    电脑刚重装了Windows8 1系统 xff0c 然后安装数据库的时候 xff0c 却出现了这样的问题 xff1a 您的电脑上的应用需要使用以下windows功能 问题原因是 xff1a 在安装系统的时候 xff0c NET Framewo
  • 【计算机网络原理】第四章 数据链路层

    今天主要梳理了一下数据链路层的内容 xff0c 如下 一 宏观规划 综合数据链路层的整体 xff0c 分为两大部分 xff0c 第一部分讲解数据链路层的功能 xff0c 第二部分讲解数据链路层的功能 这些协议 xff0c 其实还是为了实现数
  • 【redis】关系型数据库 VS 非关系型数据库

    一 关系型数据库 xff1f 1 概念 关系型数据库是指采用了关系模型来组织数据的数据库 简单来说 xff0c 关系模式就是二维表格模型 主要代表 xff1a SQL Server xff0c Oracle Mysql PostgreSQL
  • resultful风格接口

    一 产生背景 网络应用程序 xff0c 越来越流行前端和后端的分离设计 当前的发展趋势是前端的设计层出不穷 比如 xff1a 各种型号的手机 平板灯其他设计 因为必须要一种统一的机制方便不同的前端和后端进行通信 这就导致了API结构的流行
  • 【kafka】Exception thrown when sending a message with key='null' and payload='lizhenjuan;99' to topic

    今天碰到一个奇怪的问题 xff0c 如下图 xff1a 一 问题 1 问题截图 上午还可以发送消息成功的 xff0c 下午突然就发送不了消息了 我就检查我代码的问题 xff0c 是传递的格式不对 xff0c 还是数据要求不对 网上的资料显示