SpringBoot3集成Kafka优雅实现信息消费发送

2023-05-16

前言

       首先,你的JDK是否已经是8+了呢?
       其次,你是否已经用上SpringBoot3了呢?
       最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。


一、场景说明

       这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据解析处理入库TDengine。

二、使用步骤

1.引入库

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

       简简单单,就这一个依赖就够了。

2.配置

spring:
  #kafka配置
  kafka:
    #bootstrap-servers: 192.168.200.72:9092,192.168.200.73:9092
    #bootstrap-servers: 192.168.200.83:9092,192.168.200.84:9092
    bootstrap-servers: localhost:9092
    client-id: dc-device-flow-analyze
    consumer:
      group-id: dc-device-flow-analyze-consumer-group
      max-poll-records: 10
      #Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      auto-offset-reset: earliest
      #是否开启自动提交
      enable-auto-commit: false
      #自动提交的时间间隔
      auto-commit-interval: 1000
    producer:
      acks: 1
      batch-size: 4096
      buffer-memory: 40960000
      client-id: dc-device-flow-analyze-producer
      compression-type: zstd
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 3
      properties:
        spring.json.add.type.headers: false
        max.request.size: 126951500
    listener:
      ack-mode: MANUAL_IMMEDIATE
      concurrency: 1  #推荐设置为topic的分区数
      type: BATCH #开启批量监听

#消费topic配置
xiaotian:
  analyze:
    device:
      flow:
        topic:
          consumer: device-flow

3.消费


import com.xiaotian.datagenius.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 消费者listener
 *
 * @author zhengwen
 **/
@Slf4j
@Component
public class KafkaListenConsumer {

    @Autowired
    private DataTransService dataTransService;

    /**
     * 设备流水listenner
     *
     * @param records 消费信息
     * @param ack     Ack机制
     */
    @KafkaListener(topics = "${xiaotian.analyze.device.flow.topic.consumer}")
    public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {
        log.debug("=====设备流水deviceFlowListen消费者接收信息====");
        try {

            for (ConsumerRecord record : records) {
                log.debug("---开启线程解析设备流水数据:{}", record.toString());
                //具体service里取做逻辑
                dataTransService.deviceFlowTransSave(record);
            }

        } catch (Exception e) {
            log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);
        } finally {
            //手动提交偏移量
            ack.acknowledge();
        }
    }


}

       消费与SpringBoot2的写法一样,没有任何改变。

4.发布信息


import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.easylinkin.datagenius.core.Result;
import com.easylinkin.datagenius.core.ResultGenerator;
import com.easylinkin.datagenius.vo.KafkaMessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/**
 * kafka信息管理
 *
 * @author zhengwen
 **/
@Slf4j
@RestController
@RequestMapping("/kafka/push")
public class KafkaPushController {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * kafka的信息push发送
     *
     * @param kafkaMessageVo kafka信息对象
     * @return 推送结果
     */
    @PostMapping("/sendMsg")
    public Result sendMsg(@RequestBody KafkaMessageVo kafkaMessageVo) {

        String topic = kafkaMessageVo.getTopic();
        String msg = kafkaMessageVo.getMessage();
        log.debug(msg);
      
        JSON msgJson = JSONUtil.parseObj(msg);
        /* springboot2的写法
        ListenableFuture<SendResult<String, Object>> listenableFuture =  kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);

        //发送成功后回调
        SuccessCallback successCallback = new SuccessCallback() {
            @Override
            public void onSuccess(Object result) {
                log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
            }
        };
        //发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), ex);
            }
        };
        listenableFuture.addCallback(successCallback, failureCallback);
        */
        //SpringBoot3的写法
        CompletableFuture<SendResult<String, Object>> completableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);

        //执行成功回调
        completableFuture.thenAccept(result -> {
            log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
        });
        //执行失败回调
        completableFuture.exceptionally(e -> {
            log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), e);
            return null;
        });

        return ResultGenerator.genSuccessResult();
    }
}

       这个发送信息就与springBoot2的写法一致了。原ListenableFuture类已过时了,现在SpringBoot3、JDK8+用CompletableFuture监听信息发送结果。


总结

1、SpringBoot3真香
2、Kafka的集成已经非常成熟了,资料也多。
       我这里这个SpringBoot3集成Kafka发送信息目前觉得是独家,你能找到的应该都还是使用的ListenableFuture类。
       好了,就写到这里,希望能帮到大家,uping!!!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot3集成Kafka优雅实现信息消费发送 的相关文章

  • 计算广告读书笔记

    计算广告 广告主 媒体 用户 用户画像 ROI 进化 合约广告 多个合约在线分配问题 gt 竞价广告 交易终端TD 广告网络ADN gt 实时竞价RTB 广告交易平台ADX 需求方平台DSP 品牌广告 效果广告 点击率CTR 点击价值 到达
  • 【CentOS7离线ansible-playbook自动化安装CDH5.16(内附离线安装包地址,及自动化脚本)】

    CentOS7 离线环境 使用ansible自动部署CDH5 16 前言 本文介绍如何使用作者开发的自动化脚本 离线部署cdh集群 只需要简单的配置下yum源和cdh集群节点IP等几个参数 就可实现一键部署cdh集群 省去配置mysql n
  • librdkafka的使用和介绍

    librdkafka的使用介绍 librdkafka是kafka的c语言接口 下面简单的介绍一下其接口 1 rd kafka conf set设置全局配置 2 rd kafka topic conf set设置topic配置 3 rd ka
  • Kafka消息阻塞

    转自 http jis117 iteye com blog 2279519 hi all 大家都很关心kafka消息阻塞的情况 感谢RoctetMQ给我们的教训 Kafka上线也有一段时间了 确实有出现过消息阻塞的情况 虽然不影响业务而且用
  • springboot本机启动elasticjob抛出异常HostException(ip is null)

    1 使用的elasticjob版本为3 0 1 2 本机的IPV4在校验isReachable 返回false 可能是使用无线网 导致ip验证问题 3 最后引入Groovy解决 引入包
  • kafka学习笔记(一)简介

    这是对我找到的学习资料的整理 非手打 参考 https kafka apachecn org intro html https blog csdn net weixin 39468305 article details 106346280
  • Kafka——集群

    文章目录 集群 1 搭建个集群 2 集群发送消息 3 集群消费 3 1 Procuder 3 2 Consumer 4 消费顺序 集群 对于kafka来说 一个单独的broker意味着kafka集群中只有一个节点 要想增加kafka集群中的
  • 仿kafka实现java版时间轮

    系统定时 超时 在我们平时的项目开发中 会设置系统的超时时间 比如在http接口中设置超时时间 在定时调度中也会用到 在jdk的开发的实现Timer和ScheduledThreadPoolExecutor DelayQueue定时调度中使用
  • springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区

    springboot集成kafka实战项目 kafka生产者 消费者 创建topic 指定消费分区 前言 本项目代码可直接集成到你现有的springboot项目中 功能包括 1 kafka生产者配置 2 kafka消费者配置 指定分区消费
  • Kafka : KafkaProducer Closing the kafka producer with timeoutMillis

    1 美图 2 背景 一段kafka写入程序 不晓得为啥突然发现很多奇怪的日志 kafka 多线程发送数据 然后在本地是可以的 在服务器上是偶现的 我写了一个本地程序多线程生产数据 发现是没有问题的 Test public void mult
  • kafka配置内外网访问

    listeners 学名叫监听器 其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务 advertised listeners 和 listeners 相比多了个 advertised Advertised 的
  • Kafka入门基础知识学习笔记-Kafka只是消息引擎吗

    学习极客时间 Kafka核心技术与实战 入门 03 05 作者 胡夕 Apache Kafka 的一名代码贡献者 目前在社区的 Patch 提交总数位列第 22 位 应该说算是国内比较活跃的贡献者了 胡夕老师 赠言 聪明人也要下死功夫 最近
  • java版kafka producer实现

    需求 1 kafka server已经配置完全 且设定了访问限制 基于这一点 必须要设定认证 及预先分配的账号密码 2 由于项目开发环境是java 且不允许使用LogStash 基于这一点 必须实现一个java版的producer 先贴一份
  • WebSocket + kafka实时推送数据(springboot纯后台)

    逻辑 kafka订阅消费者主题 消费后通过webSocket推送到前端 kafka vue financial webSocket 学习引用 SpringBoot2 0集成WebSocket 实现后台向前端推送信息 World Of Mos
  • Kafka 架构及原理分析

    Kafka 架构及原理分析 文章目录 Kafka 架构及原理分析 简介 使用场景 架构 Broker Topic 副本机制 存储 消费分组 消费编号 数据多写支持 基于 binlog 实现主从复制 Kafka 的进阶功能 消息幂等性 事务
  • kafka的新API 得到最新一条数据

    业务的需要 需要得到最新的一条消息从kafka中 但是发现ConsumerRecords 这个对象并没有 get index 这种方式的获取并且只能 iterator 或者增强for 循环这种方式来循环 记录 但是有一个count 可以得到
  • MQ - KAFKA 高级篇

    kafak是一个分布式流处理平台 提供消息持久化 基于发布 订阅的方式的消息中间件 同时通过消费端配置相同的groupId支持点对点通信 适用场景 构造实时流数据管道 用于系统或应用之间可靠的消息传输 数据采集及处理 例如连接到一个数据库系
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比

随机推荐

  • 判断是否有值,0也是有值的情况

    span class token operator span span class token function isNaN span span class token punctuation span span class token f
  • 基于高德地图SDK进行搜索

    高德地图SDK使用地址http lbs amap com 地图设置 define GDMAPKEY 64 34 key 34 import 34 ViewController h 34 import lt MapKit MapKit h g
  • Microsoft Visual C++ Build Tools.exe安装包损坏

    Python3安装支持库的过程中经常会遇到 Microsoft Visual C 14 0 is required 此时就需要安装Visual C build tools生成工具 在运行build tool安装时 提示安装包损坏 翻墙也无效
  • debian图形界面安装

    安装GNOME中文桌面环境 安装基本的X系统 apt get install x window system core 安装GNOME桌面环境 apt get install gnome 到现在为止 xff0c 我们已成功安装完成gnome
  • Qt 调试时 程序异常结束

    在调试时 xff0c 关闭窗口 xff0c 应用程序输出窗口提示 Qt 调试时 程序异常结束 21 20 48 程序异常结束 21 20 48 The process was ended forcefully 21 20 48 G proj
  • c#webservice的简单示例

    是webservice 就概念上来说 xff0c 可能比较复杂 xff0c 不过我们可以有个宏观的了解 xff1a webservice就是个对外的接口 xff0c 里面有 函数可供外部客户调用 xff08 注意 xff1a 里面同样有客户
  • 实现常规厂家&品牌&型号业务对接物联网平台(snack3加json赋能)

    前言 之前介绍过通过snack3快速对接物模型 xff0c 不知道大家还有没有影响 记得还留了一个作业给大家想想 xff0c 就是这么兼容多型号 多版本 xff0c 这次就来跟大家分享下这么集成多型号 一 物模型文件调整 上次是利用snac
  • 组合OSS服务实现打包业务文件zip下载

    前言 实现文件打包成zip下载 xff0c 支持zip包含目录 文件 废话不多说 xff0c 直接上码 一 设计思路 后端组织文件 xff0c 打包成zip上传到OSS存储返回文件名称给前端前端根据返回的文件名称 xff08 url xff
  • Spring Boot + Disruptor = 王炸!!

    01 背景 工作中遇到项目使用Disruptor做消息队列 对你没看错 不是Kafka 也不是rabbitmq Disruptor有个最大的优点就是快 还有一点它是开源的哦 下面做个简单的记录 02 Disruptor介绍 Disrupto
  • Dbeaver连接ES问题一站解决

    前言 最近几天一直做ES的TPS测试 xff0c 每次看数据ES的数据都在嫌麻烦 xff08 在postman指定索引通过url请求查看数据 xff09 最后决定还是整整Dbeaver连接ES 一 当前境况 1 ES版本比较老 xff0c
  • Dbeaver连接TDengine时序数据库

    前言 还是结合上一阶段的工作 xff0c 为TPS满足合同里的要求 xff0c 预研数据库切换为TDengine 所以查看数据的工具我得能连上去看 xff0c 习惯了Dbeaver xff0c 所以先把Dbeaver整的能连接使用 一 Db
  • idea+ApifoxUploader+Apifox真是内外双修,香

    前言 最近部门为整合后端组 前端组 测试组 需求组 产品组等组之间的工作流程 xff0c 旨在提高协调与高效 xff0c 其中之一就是希望开发组 xff08 后端 前端 xff09 开发的接口能及时更新 xff0c 测试组能做接口测试 xf
  • SpringBoot3+最新MybatisPlus+Mysql与TDengine双数据源

    前言 昨天写的idea 43 Apifox uploader插件 43 apifox新年第一天上班就上榜了 xff0c 真是不错 今天来补一篇 xff0c 本来应该是在前一篇之前发的 实际上就是最新的springBoot集成最新的mybat
  • 业务平台扩展支持TDengine时序数据库方案

    1 场景与架构 1 1业务架构 这里涉及项目隐私 xff0c 架构图不方便公开 大致情况就是 xff1a 应用层的园区畅行 生态宜居 安全守护是我方要交付的系统 平台层的物联网感知中台是我方平台 1 2数据架构 从数据架构看 xff0c 园
  • 物联网平台+业务平台基本架构设计与优化想法

    前言 目前的交付底座有点老 xff0c 而且集成的有点杂 xff0c 计划是要升级下 xff0c 先说想法 xff0c 看领导做不做 1 业务平台定位 我们的愿景 xff1a 通过物联平台赋能 xff0c 让数据产生价值 为客户提供可视化的
  • Qt 正则表达式匹配失败的一个原因

    在Qt中做正则表达式时 xff0c 遇到一个很坑爹的问题 xff0c 还是经验不足导致 在正则表达式中 xff0c 有很多需要元字符 xff0c 是需要使用普通字符加转义符号搭配使用的 比如 w xff0c s 对于这类字符 xff0c 在
  • C++对象模型(整理)

    C 43 43 对象模型 1 何为C 43 43 对象模型 xff1f C 43 43 对象模型可以概括为以下2部分 xff1a 语言中直接支持面向对象程序设计的部分 面向对象程序设计部分 xff1a 如构造函数 析构函数 虚函数 继承 x
  • MybatisPlus多表查询之零sql编写实现

    1 前言 年初节奏还没有快起来 xff0c 适合做做技术前瞻 xff0c 无论是对个人还是团队都是好事 真要说分享 xff0c 其实感觉也没啥好分享的 xff0c 就像接手新项目一样 xff0c 代码都是自己看 xff0c 别人讲的再多 x
  • 物联网设备流水入库TDengine改造方案

    1 存储机制改造 针对提出的流水查询需求 xff0c 结合设备上报数据讨论后 xff0c 初步定以下几种方案 1 1 ES gt TDengine 1 ES入库有延时 2 ES没有提供数据监听API xff08 不能主动监听 xff0c 实
  • SpringBoot3集成Kafka优雅实现信息消费发送

    前言 首先 xff0c 你的JDK是否已经是8 43 了呢 xff1f 其次 xff0c 你是否已经用上SpringBoot3了呢 xff1f 最后 xff0c 这次分享的是SpringBoot3下的kafka发信息与消费信息 一 场景说明