Kafka 顺序消费方案

2023-11-18

前言

    本文针对解决Kafka不同Topic之间存在一定的数据关联时的顺序消费问题。如存在Topic-insert和Topic-update分别是对数据的插入和更新,当insert和update操作为同一数据时,应保证先insert再update。

1、问题引入

    kafka的顺序消费一直是一个难以解决的问题,kafka的消费策略是对于同Topic同Partition的消息可保证顺序消费,其余无法保证。如果一个Topic只有一个Partition,那么这个Topic对应consumer的消费必然是有序的。不同的Topic的任何情况下都无法保证consumer的消费顺序和producer的发送顺序一致。
    如果不同Topic之间存在数据关联且对消费顺序有要求,该如何处理?本文主要解决此问题。

2、解决思路

    现有Topic-insert和Topic-update,数据唯一标识为id,对于id=1的数据而言,要保证Topic-insert消费在前,Topic-update消费在后。
    两个Topic的消费为不同线程处理,所以为了保证在同一时间内的同一数据标识的消息仅有一个业务逻辑在处理,需要对业务添加锁操作。 使用synchronized进行加锁的话,会影响无关联的insert和update的数据消费能力,如id=1的insert和id=2的update,在synchronized的情况下,无法并发处理,这是没有必要的,我们需要的是对于id=1的insert和id=1的update在同一时间只有一个在处理,所以使用细粒度锁来完成加锁的操作。
    细粒度锁实现:使用弱引用实现细粒度锁
    PS:如果为分布式系统,细粒度锁需要使用分布式锁的对应实现。
    在对insert和update加锁之后,其实还是没有解决消费顺序的问题,只是确保了同一时间只有一个业务在处理。 对于消费顺序异常的问题,也就是先消费了update再消费insert的情况,处理方式:消费到update数据,校验库中是否存在当前数据(也就是是否执行insert),如果没有,就将当前update数据存入缓存,key为数据标识id,在insert消费时检查是否存在id对应的update缓存,如果有,就证明当前数据的消费顺序异常,需执行update操作,再将缓存数据移除。

3、实现方案

消息发送:

kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");

监听代码示例:
KafkaListenerDemo.java

@Component
@Slf4j
public class KafkaListenerDemo {

    // 消费到的数据缓存
    private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
    // 数据存储
    private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
    private WeakRefHashLock weakRefHashLock;

    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
        this.weakRefHashLock = weakRefHashLock;
    }

    @KafkaListener(topics = "TOPIC_INSERT")
    public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
        // 模拟顺序异常,也就是insert后消费,这里线程sleep
        Thread.sleep(1000);

        String id = record.value();
        log.info("接收到insert :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            log.info("开始处理 {} 的insert", id);
            // 模拟 insert 业务处理
            Thread.sleep(1000);
            // 从缓存中获取 是否存在有update数据
            if (UPDATE_DATA_MAP.containsKey(id)){
                // 缓存数据存在,执行update
                doUpdate(id);
            }
            log.info("处理 {} 的insert 结束", id);
        }finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    @KafkaListener(topics = "TOPIC_UPDATE")
    public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{

        String id = record.value();
        log.info("接收到update :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            // 测试使用,不做数据库的校验
            if (!DATA_MAP.containsKey(id)){
                // 未找到对应数据,证明消费顺序异常,将当前数据加入缓存
                log.info("消费顺序异常,将update数据 {} 加入缓存", id);
                UPDATE_DATA_MAP.put(id, id);
            }else {
                doUpdate(id);
            }
        }finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    void doUpdate(String id) throws InterruptedException{
        // 模拟 update
        log.info("开始处理update::{}", id);
        Thread.sleep(1000);
        log.info("处理update::{} 结束", id);
    }

}

日志(代码中已模拟必现消费顺序异常的场景):

接收到update :: 1
消费顺序异常,将update数据 1 加入缓存
接收到insert :: 1
开始处理 1 的insert
开始处理update::1
处理update::1 结束
处理 1 的insert 结束

观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 顺序消费方案 的相关文章

  • 在java中切换imageIcon?

    我有很多在窗口中移动的平面 线程 我想根据平面的方向切换 ImageIcon 例如 如果飞机向右飞行 则飞机的 imageIcon 是向右的 然后飞机向左飞行 则将 imageIcon 交换为飞机向左 我怎样才能在方法paintCompon
  • Cassandra Pojo Sink Flink 中的动态表名称

    我是 Apache Flink 的新手 我正在使用 Pojo Sink 将数据加载到 Cassandra 中 现在 我在以下命令的帮助下指定表和键空间名称 Table注解 现在 我想在运行时动态传递表名称和键空间名称 以便可以将数据加载到用
  • 在 Java 中实现排列算法的技巧

    作为学校项目的一部分 我需要编写一个函数 该函数将接受整数 N 并返回数组 0 1 N 1 的每个排列的二维数组 该声明看起来像 public static int permutations int N 该算法描述于http www usn
  • 通过 RMI 的服务器,无需注册

    我有一个可以通过 RMI 连接的服务对象 目前我正在这样做 Server Registry r LocateRegistry createRegistry 1234 r bind server UnicastRemoteObject exp
  • 如何将 Excel 中的图表导出为图形

    我有一系列 Excel 电子表格 每个电子表格至少包含一页数据和一页根据数据创建的图表 我需要捕获 不从数据中重新生成 将现有图表作为网络友好图像 这可以通过 Java 或 Net 实现吗 我知道 POI 的东西 Java 不会这样做 或者
  • 如何使 java.text.NumberFormat 将 0.0d 格式设置为“0”而不是“+0”?

    需要带符号的结果 0 0d 除外 IE 123 45d gt 123 45 123 45d gt 123 45 0 0d gt 0 我调用format setPositivePrefix 在 DecimalFormat 的实例上 强制结果中
  • 如何在 Java 中用 \n 替换 \\n

    我有一个string test first n middle n last 现在我想更换所有 n by n 我试过了test replaceAll n n and test replaceAll n n 但它们不起作用 有人有解决办法吗 T
  • 具有 JPA、PostgreSQL 和 NULL 值的 JodaTime

    我试图将 JPA 的 JodaTime DateTime 字段保留到 PostgreSQL 但遇到了指向数据库 NULL 值的空指针的问题 我正在使用 NetBeans 7 beta 2 IDE 持久性实现是 EclipseLink 2 2
  • 何时使用 clone() 以及 addAll() 和 add() 的实际工作原理

    我正在使用 Java 和 MySQL 我的项目中有大约 60 个交易屏幕 我曾经用过add and addAll 复制的功能ArrayList 例如 List
  • Java ZIP - 如何解压缩文件夹?

    是否有任何示例代码 如何将 ZIP 中的文件夹部分解压到我想要的目录中 我已将文件夹 FOLDER 中的所有文件读取到字节数组中 如何从其文件结构创建 我不确定你所说的部分是什么意思 您的意思是在没有 API 帮助的情况下自己完成吗 如果您
  • 如何在生产中安全地更改会话 cookie 域或名称?

    我们最近意识到我们的会话 cookie 正在被写入我们网站的完全限定域名 www myapp com 例如 MYAPPCOOKIE 79D5DB83 domain www myapp com 我们希望将其切换为可以跨子域共享的cookie
  • 改造添加带有令牌和 ID 的标头

    我在获取经过身份验证的用户时遇到问题 在此之前我得到了令牌和用户 ID 现在我需要使用访问令牌和 ID 从服务器获取用户 我有标题格式 https i stack imgur com OQ87Y png 现在我尝试使用拦截器添加带有用户令牌
  • Kotlin 中的枚举类对于 Android 来说是否像 Java 中那样“昂贵”?

    Are EnumKotlin 中的类对于 Android 来说 昂贵 就像 Java 一样 还可以用吗 IntDefs or StringDefs在科特林 当我将 Kotlin Enum 类反编译为 Java 类时 底层仍然使用了 Java
  • android中ScrollView中的图像

    在我的应用程序中 我想放置一个 png 文件 并且希望它在横向和纵向模式下都被视为滚动图像 请建议代码或示例 要使您的 Imageview 在高度不适合时滚动 您可以在 xml 中的 ScrollView 内添加一个 ImageView 并
  • java.lang.NoClassDefFoundError: org/apache/commons/cli/ParseException

    我想将 apache cli 添加到我的应用程序中 但我有问题 当我尝试运行它时显示这些错误 Error A JNI error has occurred please check your installation and try aga
  • Guava MultiSet 与 Map?

    我对Multiset的理解是一个带有频率的集合 但是我总是可以使用Map来表示频率 还有其他原因使用Multiset吗 优点Multiset
  • Java并发锁和条件的使用

    我可以用object wait object notify and synchronized blocks解决生产者消费者类型的问题 同时我可以使用locks and conditions from java util concurrent
  • 为什么ArrayList的非静态内部类SubList有一个成员变量“parent”?

    java util ArrayList SubList 是 java util ArrayList 的非静态内部类 这意味着它保存对其封闭类的引用 我们可以使用ArrayList this来访问java util ArrayList的成员
  • 如何通过sparkSession向worker提交多个jar?

    我使用的是火花2 2 0 下面是我在 Spark 上使用的 java 代码片段 SparkSession spark SparkSession builder appName MySQL Connection master spark ip
  • Android,Volley请求,响应阻塞主线程

    使用 Volley 处理较大响应时会发生一些不好的事情 String url AppHelper DOMAIN service pages profile update json this infoTextView setText getS

随机推荐

  • maven 配置篇 之 settings.xml 整理中...

    maven 配置 settings xml maven2比起maven1 来说 需要配置的文件少多了 主要集中在pom xml和settings xml中 先来说说settings xml settings xml对于maven 来说相 当
  • 【JS】jsx/tsx是什么?为什么 Vue3 的组件库都在使用 jsx/tsx?如何在Vue3中使用 jsx/tsx?

    前几天得到一个疑问 为什么 React 要用 JSX 语法 这样显得代码好像 很耦合 按照传统应推荐 html js css 模版分离 这样设计的优越性到底在哪里 一 jsx是什么 JSX 即 javascript Xml的缩写 是Face
  • Springboot 封装ssh 服务

    前提条件 Linux 已经启动SSH 服务或Windows 启动SSH 服务 整体项目结构 第一步 BuleSky 的pom xml 文件
  • CMake——cmake_minimum_required

    命令简介 cmake minimum required用于设定cmake的最低版本 cmake minimum required VERSION
  • 入门强化学习(Q-learning→DQN→DDQN)

    基础 只要具备CNN分类算法的基本认识 讲解内容 内容主要通过 飞翔小鸟 游戏为例 探究如下2个问题 强化学习原理 以Q learning算法为例 深度强化学习原理 以DQN DDQN算法为例 然后大概讲下DDQN算法 如何在更复杂的解锁任
  • OLED透明屏厚度:引领未来显示技术的革命

    OLED透明屏作为一种未来显示技术 其薄度在整个行业中具有重要意义 在这篇文章中 尼伽将详细介绍OLED透明屏厚度的优势 技术挑战以及应用案例 旨在为读者全面了解OLED透明屏的发展前景 一 OLED透明屏厚度的优势 OLED透明屏采用柔性
  • springboot项目中对文件夹进行监控

    需要的依赖
  • Quartz 基本使用

    Quartz 基本使用 一 Quartz的核心概念 二 Quartz的几个常用API 三 Quartz的使用 四 Quartz核心详解 五 JobListener 六 TriggerListener 七 SchedulerListener
  • 更改:为硬件保留的内存

    电脑 联想thinkbook16P 系统 win11 内存 16G 更改前 2 3G 更改后 827MB 一 关机 不同的型号电脑进入boss模式的按键不同 我的是按F1 自己去找进入boss模式的方式 二 进入boss模式 进入boss模
  • ARM运行可执行文件出现/usr/lib/libstdc++.so.6: version `CXXABI_ARM_1.3.3' not found解决

    1 关于Linux PC上出现这种问题容易解决 直接下载个高版本的libstdc so 6 0 x复制到 usr lib中 软连接一下就好了 ln s libstdc so 6 0 x libstdc so 6 2 但是在ARM板上执行可执
  • 1 映射与函数

    文章目录 集合 集合表示法 区间与领域 两个逻辑量词 映射 函数 函数的图形 反函数 集合 集合表示法 区间与领域 两个逻辑量词 映射 函数 函数的图形 反函数
  • JavaScript一种将数据库记录建立层级关系的处理方法

    JavaScript一种将数据库记录建立层级关系的处理方法 背景 方案 提示 背景 项目开展中 有些数据往往具有层级关系 在数据中用ID Parent来标示 那么在前端如何有效的还原这种层级关系 而其是很多层级的情况 方便的将数据加载的树
  • c#处理3种json数据的实例介绍

    这篇文章主要介绍了c 处理包含数组 对象的复杂json数据的方法 需要的朋友可以参考下 网络中数据传输经常是xml或者json 现在做的一个项目之前调其他系统接口都是返回的xml格式 刚刚遇到一个返回json格式数据的接口 通过例子由易到难
  • Linux安装iptables 替换firewall

    1 查看当前防火墙状态 systemctl status firewalld service 2 关闭防火墙 并查看防火墙状态 systemctl stop firewalld service 停止firewall 3 禁止防火墙开机启动
  • Springboot项目在Jenkins+Docker中实现自动化部署

    Springboot项目在Jenkins Docker中实现自动化部署 一 环境准备 1 项目开发环境 2 Jenkins docker运行环境 二 Docker安装 三 Jenkins安装 四 创建一个Springboot项目 1 使用I
  • 太突然!北大方正破产了!负债3029亿元!

    点击上方 Python高校 关注 未未干货立马到手 来源 中国基金报 chinafundnews 记者 乔麦 体量超3000亿的中国最大校企方正集团 债务危机迎来新进展 日前 方正集团旗下6家上市公司齐发提示性公告表示 北京银行申请对方正集
  • Tomcat调优【精简版】

    Tomcat调优 优化Tomcat内存分配 调整Tomcat启动脚本contalina sh 设置tomcat启动时分配的内存很可使用的最大内存 CATALINA OPTS 调整Tomcat线程池 Tomcat默认使用的线程池 Thread
  • Mysql中符号@的作用

    select a 变量名 如果你不加的话 会认为这是一个列名 但是这列不存在 就报错了 变量名 定义一个用户变量 对该用户变量进行赋值 用户变量赋值有两种方式 一种是直接用 号 另一种是用 号 其区别在于 使用set命令对用户变量进行赋值时
  • Premiere Pro CC2019安装资料及安装教程

    简介 Adobe Premiere是一款常用的视频编辑软件 由Adobe公司推出 现在常用的版本有CS4 CS5 CS6 CC 2014 CC 2015 CC 2017 CC 2018以及CC 2019版本 Adobe Premiere是一
  • Kafka 顺序消费方案

    Kafka 顺序消费方案 前言 1 问题引入 2 解决思路 3 实现方案 前言 本文针对解决Kafka不同Topic之间存在一定的数据关联时的顺序消费问题 如存在Topic insert和Topic update分别是对数据的插入和更新 当