无法解码Spring云流DefaultKafkaHeaderMapper中的json类型

2023-12-21

我们正在使用 spring-cloud-stream 并计划升级我们的 Kafka 版本。
我们的应用程序使用spring-cloud-stream:2.0.0(spring-kafka 2.1.7) 与 apache kafka 服务器1.0.1并且还使用spring-cloud-sleuth:2.0.0用于跟踪。
我们将把我们的 Kafka 服务器升级到版本2.3.0所以需要升级到spring-boot 2.2.x (Hoxton) with spring-cloud-sleuth:2.2.0 and spring-cloud-stream:3.0.3 (Horsham.SR3).
我们有大约 200 个使用 Kafka 的应用程序,因此升级将逐步进行,因此我们将有中间状态生产者在新版本上和消费者使用旧版本。
我们的消费者正在使用@StreamListener.

在我们的测试过程中,我们遇到了解析大多数类型的标头的问题String并得到以下内容:

ERROR 27448 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : Could not decode json type: ecb89ccb3e79418b for key: X-B3-TraceId
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ecb89ccb3e79418b': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"ecb89ccb3e79418b"; line: 1, column: 33]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[jackson-databind-2.9.6.jar:2.9.6]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders$1(DefaultKafkaHeaderMapper.java:233) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_221]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.toHeaders(KafkaMessageChannelBinder.java:554) ~[spring-cloud-stream-binder-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
...

而 Types 标头是:

{spanTraceId=java.lang.String, spanId=java.lang.String, spanParentSpanId=java.lang.String, nativeHeaders=org.springframework.util.LinkedMultiValueMap, X-B3-SpanId=java.lang.String, X-B3-ParentSpanId=java.lang.String, scst_partition=java.lang.Integer, X-B3-Sampled=java.lang.String, X-B3-TraceId=java.lang.String, spanSampled=java.lang.String, contentType=java.lang.String}

例如X-B3-SpanIdSleuth 添加的字符串类型为:ecb89ccb3e79418b这不是 JSON 字符串,因此是 ObjectMapperfails此处转换为字符串对象:

headers.put(h.key(), getObjectMapper().readValue(h.value(), type))

当我们有 String 类型时,看起来它不应该使用 ObjectMapper,因此我们的旧消费者失败了。

使用新生产者和旧消费者时有没有办法防止这个问题?


您可以配置DefaultKafkaHeaderMapper与旧版本兼容:

    /**
     * Set to true to encode String-valued headers as JSON ("..."), by default just the
     * raw String value is converted to a byte array using the configured charset. Set to
     * true if a consumer of the outbound record is using Spring for Apache Kafka version
     * less than 2.3
     * @param encodeStrings true to encode (default false).
     * @since 2.3
     */
    public void setEncodeStrings(boolean encodeStrings) {
        this.encodeStrings = encodeStrings;
    }

另请参阅https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties

spring.cloud.stream.kafka.binder.headerMapperBeanName

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法解码Spring云流DefaultKafkaHeaderMapper中的json类型 的相关文章

随机推荐

  • 在 laravel 4 中生成相对于基本 url 的 url

    我是 Laravel 的新手 现在正在 L 4 上构建一个应用程序 但卡在一个地方 无法理解如何生成相对于基本 url 的 url 在 laravel 3 中我知道这可以通过 url URL to user profile 但是 在 L 4
  • 在当前项目和插件组(本地、中央)中找不到前缀“jetty”的插件

    为了轻松运行我的 web 应用程序 我决定将 Jetty 添加到我的单个 POM 文件中 继官方文档 https www eclipse org jetty documentation 9 4 x maven and jetty html
  • Android Studio - 如何使用 getExternalFilesDir

    由于谷歌杀死了完美的方法 getExternalStoragePublicDirectory 我必须尝试让 getExternalFilesDir 工作 这是我尝试过的事情之一 private Context context this ge
  • 获取 Google App Engine 数据存储区中不属于列表的随机实体

    我正在使用 Google App Engine 构建网页游戏 游戏有一个存储的用户创建的关卡列表 可能很多 并且动态增加 并且每个用户都有一个他已经玩过的关卡列表 我需要为用户随机选择一个他以前没有玩过的级别 实体建模如下 class Us
  • 为什么 hibernate 给出 ConstraintException 就好像在对象已经存在时尝试创建对象一样

    我在更新 创建 CoverImage 对象时使用 saveOrUpdate 我偶尔会遇到主键约束冲突 org hibernate exception ConstraintViolationException Unique index or
  • 在docker容器中启用mysqli

    我使用以下代码构建一些容器 version 3 services db image mysql 5 7 environment MYSQL ROOT PASSWORD test MYSQL DATABASE test MYSQL USER
  • php整数和浮点比较不匹配

    我有以下代码 amount1 7299 amount2 72 9875 amount2 in cents round amount2 2 100 if amount1 amount2 in cents echo Amount amount1
  • Mysql排序分层数据

    我有一个问题 上周正在处理这个问题 但还无法解决 我可以使用 join 查询子菜单 但我无法订购它 我有一张这样的桌子 id name parent order 1 menu1 0 1 2 submenu1 1 2 3 submenu2 1
  • Django 中关系不存在错误

    我知道关于这个问题有很多问题 我查看了解决方案 不幸的是它们都不适合我 我创建了一个名为 用户管理 的新应用程序 并向该应用程序添加了一个模型 添加模型后 我将用户管理添加到设置中的 INSTALLED APPS 中 然后我运行 pytho
  • 如何用空格填充 NSString?

    例如 我需要 NSString 至少有 8 个字符 而不是使用循环在其上添加左侧填充空格 有没有办法做到这一点 Examples Input Output Hello Hello Bye Bye Very Long Very Long ab
  • 优化导致超时? [复制]

    这个问题在这里已经有答案了 我正在开发一个程序 该程序接受一个整数并查找该整数具有的连续总和的组合数量 数字13可以表示为连续正数之和 整数6 7 十四可以表示为2 3 4 5 也是一个和 连续正整数 有些数字可以表示为 以多种方式求连续正
  • 我知道回调函数是异步运行的,但为什么呢?

    语法的哪一部分提供了该函数应该在其他线程中运行并且是非阻塞的信息 让我们考虑一下 Node js 中的简单异步 I O var fs require fs var path process argv 2 fs readFile path u
  • 用户定义对象的类型转换

    就像我们对 ToString 所做的那样 有没有办法定义转换方法 obj MyClass another class obj 无需在 php 中输入强制类型转换 Edit 由于这个话题似乎引起了一些混乱 我想我应该详细说明一下 在 Java
  • 更改数据捕获仅用于更新和删除

    我们的数据库插入量不大 每晚 200 500k 但更新量很少 每天可能几百个 我需要无限期地保留对插入行本身的所有更改的历史记录 但不是实际的插入 我很想使用更改数据捕获 但支持此操作所需的空间量不可用 如果我能弄清楚做以下其中一项 我的生
  • python从excel创建字典

    我有一个包含 2 列的 Excel 工作表 第一列是姓名 第二列是年龄 我想创建一个字典 其中名称是键 年龄是值 这是代码 但它错误地创建了字典 keyValues x value for x in worksheet col 0 data
  • WPF 的轻量级文本库?

    有谁知道有一个轻度标记文本到样式文本格式化库 即类似 Markdown 或 Textile NET 的东西 但它会生成本机 XAML 文档 或者更确切地说 FlowDocument 模型或类似的可以直接显示在 WPF 应用程序中 以避免使用
  • PHP 无法解析时间字符串

    我需要根据从表单收到的值创建一个日期时间 问题是该值像字符串一样被接收 2016 10 10T08 29 06 959Z 我需要像这样接收2016 10 10T08 29 06 959Z不带引号 因为如果我收到带引号的消息 则会出现下一个错
  • 当进程提升时,如何获取非提升会话的 Windows 身份验证 ID

    我需要获取由 GetTokenInformation 返回的 AuthenticationID 和登录站的用户的 TokenStatistics 类 无论我是否被提升 让我给你一些更多的信息 假设我这样做 var Result GetTok
  • CodeIgniter 路由在 Nginx 下不工作

    Ubuntu 16 04 参考设置this https www howtoforge com tutorial installing nginx with php7 fpm and mysql on ubuntu 16 04 lts lem
  • 无法解码Spring云流DefaultKafkaHeaderMapper中的json类型

    我们正在使用 spring cloud stream 并计划升级我们的 Kafka 版本 我们的应用程序使用spring cloud stream 2 0 0 spring kafka 2 1 7 与 apache kafka 服务器1 0