我是如何用 redis 分布式锁来解决线上历史业务问题的

2023-10-31

近期发现,开发功能的时候发现了一个 mq 消费顺序错乱(历史遗留问题),导致业务异常的问题,看看我是如何解决的

问题抛出

首先,简单介绍一下情况:

线上 k8s 有多个 pod 会去消费 mq 中的消息,可是生产者发送的消息是期望一定要有序去消费,此时要表达的是,例如 生产者如果发送了 3 个通知消息,分别是

  • 1 系统已经在 / 组下面添加 a 组,你记得绑定策略 (例如 / 组绑定的是策略是:允许看视频类型的网站)
  • 2 系统已经在 /a 组下添加了 b 组, 你记得绑定策略(期望绑定的策略和他的父组策略一样)
  • 3 系统已经在 b 组下面添加 小 d 用户,你的绑定策略(期望绑定的策略和他的所在组一样)

此处,若有 3 个 pod 的分别拿到了上述 3 条消息,但是自身实际消费完毕的顺序可能是 先完成了 3 消息对应的业务逻辑,再是 2 消息 的业务逻辑,最后是 1 消息的业务逻辑

那么这个时候,小 d 用户就没有绑定上 允许看视频类型的网站 这一条策略,自然 b组 和 a 组也没有绑定上这条策略,这就和我们预期的完全不一致了

当然,实际情况对于单条单条的消息处理基本不会出现这种偏差,但是在批量处理的时候,就会出现实际业务处理顺序与期望不一致的情况,那么就是妥妥的线上问题了(小 d 上网的时候想看视频,可是一直看不了,于是就疯狂投诉。。。)

思考解决

对于这个问题如何解决呢?

我们知道,咱们使用 mq 的目的是为了做到去处理我们的异步逻辑,还能对流量进行削峰,服务间解耦

对于咱们的 A 服务,已经处理了关于添加用户的,添加组的逻辑,发送通知消息给到 B 服务的时候,B 服务自身的处理顺序,未按照既定的顺序真实按照顺序消费完毕,导致出现了业务问题

想法一

我们是期望 B 服务团队去添加批量接口,A 服务将需要通知的信息,排序好给到 B 服务,一个整包, B 服务的单个 pod 接收到这个大包,然后按照顺序处理消息即可,但是这个方式弊端比较明显

  • 当发送了多个批量大包消息的时候,B 服务如果自身处理不过来,也会导致类似的问题,无法根治
  • 需要 B 服务新增和修改的代码较多,肯定谈不下来
  • 而且对于绑定策略的服务来说,不仅仅是 B 服务,还有 C 服务,D 服务呢,他们都要改造… 这个想法就。。。

想法二

对于这一个业务,也不能去对整个架构大改,对于这些历史遗留问题,能少动就少动,兄弟们你们都懂的

于是便想出了使用 redis 分布式锁来处理,对于一个部署在 k8s 中服务的多个 pod 去抢占,谁先抢到锁,那么就谁消费 mq 中的消息,没有抢到锁的 pod ,那就过一会再抢

当然,对于其他类型的业务是没有影响的

如何去实现这个想法呢,我们可以模拟一下

  • 1 首先,我们设置一个 redis 的 key,例如 [服务名]_lockmq, 值的话咱们就任意设置,默认就用 服务名 做 value 吧,过期时间暂定 30 秒,有需求的可以调大
  • 2 如果设置成功,则处理成功之后的事情

    • 2.1 初始化 mq 消费者,并开启协程进行消费

      • 2.1.1 如果初始化失败,则直接返回,退到第 1 步
    • 2.2 对 redis 锁进行续期,此处咱们 10 秒续期一次

      • 如果续期失败,则直接返回,退到第 1 步
  • 3 若拿锁失败,则休息 10 秒再去拿锁

这样来处理的话,我们就可以应对多个 pod 来消费同一类消息的时候,保证同时只有一个 pod 在处理 mq 中的消息了,当然如果正在处理消息的 pod 出现了异常,对于其他 pod ,最晚会在 40 秒之后拿到锁,对于大量的消息来说,这个还是可以容忍的

对应的代码逻辑如下:

  • 简单连接 redis, redis 分布式锁的主逻辑如下

    • 连接 redis ,DB 默认为 0 号
var rdb = redis.NewClient(&redis.Options{
   Addr:     "localhost:6379",
   Password: "123456",
   DB:       0,
})

func LockMq(svrName string) {
   key := fmt.Sprintf("%s_lockmq", svrName)
   // 尝试加锁
   var set bool
   for {
      set = redisLock(key, svrName, time.Second*30)
      if set {
         log.Println("redisLock success ")
         if err := afterLockSuccess(key); err != nil {
            // 如果此处有err ,自然是 mq 初始化失败
            log.Println("mq init error: ", err)
         }else{
            log.Println("redisLock expire failed ")
         }
         time.Sleep(time.Second * 10)
         continue
      }
      // afterLockFailed()
      log.Println("redisLock failed ")
      time.Sleep(time.Second * 10)
   }
}
  • 基本的加锁实现

    • 设置 key , value , 过期时间为 30 秒
func redisLock(key, value string, duration time.Duration) bool {
   set, err := rdb.SetNX(context.TODO(), key, value, duration).Result()
   if err != nil {
      log.Println("setnx failed, error: ", err)
      return false
   }
   return set
}
  • 加锁成功之后,初始化 mq 客户端并进行消费,续期 redis 分布式锁
func afterLockSuccess(key string) error {
   // 初始化需要做的内容或者句柄
   // xxx
   // 对于此处的初始化 mq 句柄失败才返回 err
   ch := make(chan struct{}, 1)
   go func() {
      // 模拟消费消息
      for {
         select {
         case <-ch:
            log.Println("expire failed,mq close")
            return
         default:
            log.Println("is consuming msg")
            time.Sleep(time.Second * 2)
         }
      }
   }()

   for {
      time.Sleep(time.Second*10)
      // 续期
      set, err := rdb.PExpire(context.TODO(), key, time.Second*30).Result()
      if err != nil {
         log.Println("PExpire error!! ", err)
         return nil
      }
      if !set {
         ch <- struct{}{}
         log.Println("PExpire failed!!")
         return nil
      }
      log.Println("PExpire success!! ")
   }

}

具体的测试直接调用 LockMq 函数即可

func main(){
   go redislock.LockMq("helloworld")
   select{}
}

模拟启动多个 pod 去抢锁,抢到锁的执行业务,继续续期,抢不到锁的休息一会再接着抢

程序 a 先启动,程序 b 后启动

程序 a 日志如下:

程序 a 起来之后,启动一段时间之后,kill 掉 程序 a

程序 b 日志如下:

程序 b 先是获取锁失败,过 30s 左右,程序 b 能正常获取到锁

关于源码可以查看地址:https://github.com/qingconglaixueit/my_redis_demo

感谢阅读,欢迎交流,点个赞,关注一波 再走吧

欢迎点赞,关注,收藏

朋友们,你的支持和鼓励,是我坚持分享,提高质量的动力

好了,本次就到这里

技术是开放的,我们的心态,更应是开放的。拥抱变化,向阳而生,努力向前行。

我是阿兵云原生,欢迎点赞关注收藏,下次见~

可以进入地址进行体验和学习:https://xxetb.xet.tech/s/3lucCI

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

我是如何用 redis 分布式锁来解决线上历史业务问题的 的相关文章

  • 我的 Redis 自动生成的密钥

    我不知道我的 Redis 版本 4 0 9 到底发生了什么 我正在运行一个应用程序并使用 Redis 来存储我的数据库 但是 然后 Redis 自动创建 3 个新键 Backup1 Backup2 Backup3 并删除我的所有数据 这是我
  • 如何在多个Lua State(多线程)之间传递数据?

    我在中启动Redis连接池redis lua 通过从 C 调用 我得到了redis lua state 此 Lua 状态全局启动一次 仅在其他线程中启动get从中 当有一个 HTTP 请求 工作线程 时 我需要从redis lua stat
  • 如果另一个键中的计数器低于零,则从集合中原子删除一个项目?

    雷迪斯2 0 3 在我的 Redis DB 中 我有一组项目 每个项目都有一个与其关联的计数器 MULTI SADD items set foo INCRBY items foo 10000 EXEC 新项目会以随机间隔添加到集合中 当用户
  • 如何将node.js管道传输到redis?

    我有很多数据要插入 SET INCR 到redis DB 所以我正在寻找pipeline http redis io topics pipelining 质量插入 http redis io topics mass insert通过node
  • Redis SYNC 套接字上的错误情况:连接被拒绝

    在我的 django 应用程序中使用 celery 和 redis 一切都工作正常 直到我遇到了问题 redis 文件的位置已更改 redis 无法访问它们 经过查找 原来这是由于网络随机攻击造成的 需要添加confg 我添加文件后 一段时
  • 如何设置和获取Redis中存储的对象?

    我试图在 redis 中存储一个对象 当我获取该对象时 它似乎不起作用 I tried u User new u name blankman redis set test u x redis get test x name error 我想
  • Redis发布/订阅:查看当前订阅了哪些频道

    我目前有兴趣查看我拥有的 Redis 发布 订阅应用程序中订阅了哪些频道 当客户端连接到我们的服务器时 我们将它们注册到如下所示的通道 user user id 这样做的原因是我希望能够看到谁 在线 目前 我在不知道客户端是否在线的情况下盲
  • 如何在Redis中从hmset()切换到hset()?

    我收到弃用警告 即 Redis hmset 已弃用 请改用 Redis hset 但是 hset 采用第三个参数 我不知道是什么name应该是 info users 10 timestamp datetime utcnow strftime
  • 使用 Sentinels 升级 Redis 的最佳实践?

    我有 3 个 Redis 节点 由 3 个哨兵监视 我进行了搜索 文档似乎不清楚如何最好地升级此类配置 我目前使用的是 3 0 6 版本 我想升级到最新的 5 0 5 我对这方面的程序有几个疑问 升级两个大版本可以吗 我在我们的暂存环境中执
  • 有没有办法让特定的key在集群模式下定位到特定的redis实例上?

    我想让我的多锁位于不同的redis实例上 我发现redission可以指定一个实例来执行命令 但是如果该命令与key相关 则指定的实例会将命令传输到另一个实例 你能给我一些建议吗 你可以 但这并不是微不足道的 首先 Redis 在键中使用大
  • redis dump.rdb / 保存小文件

    Context 我正在使用redis 数据库小于 100 MB 但是 我想进行每日备份 我也在 Ubuntu Server 12 04 上运行 当输入 redis cli save 我不知道 dump rdb 保存到哪里 因为 redis
  • Spring Redis删除不删除key

    我正在尝试删除一个 Redis 键 但由于某种原因它没有删除 但也没有抛出异常 这是我要删除的代码 import com example service CustomerService import com example model Cu
  • Redis 队列工作程序在 utcparse 中崩溃

    我正在尝试按照以下教程获得基本的 rq 工作 https blog miguelgrinberg com post the flask mega tutorial part xxii background jobs https blog m
  • 如何配置Lettuce Redis集群异步连接池

    我正在配置我的生菜重新分配池 当我按照官方文档配置时 连接池无法正常初始化 无法获取连接 官方文档指出 RedisClusterClient clusterClient RedisClusterClient create RedisURI
  • Spring Data Redis 覆盖默认序列化器

    我正在尝试创建一个RedisTemplatebean 将具有更新的值序列化器来序列化对象JSONredis 中的格式 Configuration class RedisConfig Bean name redisTemplate Prima
  • Laravel Redis 配置

    我目前正在使用 Laravel 和 Redis 创建一个应用程序 几乎一切都工作正常 我按照文档中的说明扩展了身份验证 用户可以订阅 登录 注销 我可以创建内容 所有内容都存储在 Redis 中 但我有一个问题 我无法运行 php arti
  • 使用环境变量在 redis.conf 中设置动态路径

    我有一个环境变量MY HOME其中有一个目录的路径 home abc 现在 我有一个redis conf文件 我需要像这样设置这个路径 redis conf pidfile MY HOME local var pids redis pid
  • Redis 中存储整数和字符串的区别

    这两个命令有什么区别吗 LPUSH myset 123 LPUSH myset 123 我想存储大约 500 万个整数 并且我想以最有效的方式做到这一点 不 没有什么区别 两者都存储为字符串 从redis io http redis io
  • 为什么单个 Redis 实例不是线程安全的?

    https github com xetorthio jedis wiki Getting started https github com xetorthio jedis wiki Getting started 在多线程环境中使用Jed
  • 没有适用于机器人的 Laravel 会话

    我在大型 Laravel 项目和 Redis 存储方面遇到问题 我们将会话存储在 Redis 中 我们已经有 28GB 的 RAM 然而 它的运行速度仍然相对较快 达到了极限 因为我们有来自搜索引擎机器人的大量点击 每天超过 250 000

随机推荐

  • 数组模拟栈和队列

    全文目录 数组实现栈 数组实现队列 数组实现的循环队列 数组实现栈 元素的出入只在栈顶进行 所以在实现的时候只需要标记栈顶就行了 因为每次插入元素都需要先 tt 所以 tt 可以从 1 开始 tt表示栈顶 int stk N tt 1 向栈
  • (16) 基于图卷积神经网络的轨道交通流量预测

    交通预见未来 16 基于图卷积神经网络的轨道交通流量预测 1 文章信息 Predicting Station Level Short Term Passenger Flow in a Citywide Metro Network Using
  • 计算机c盘突然少了几个G,做系统时c盘显示0容量-关于Windows系统c盘突然没了十几个g...

    既然系统默认装软件是c盘 那为什么不把c盘空间做大点呢 C盘是系统盘 多大容量完全是自己分出来的 你的c盘容量应该是别人给分的 一般为了电脑速度快一些 会用SSD做系统盘 你说你就一个1T的硬盘 那其实可以完全分成一个区 也就是就一个C盘
  • 如何提取OneDrive文件直链?

    原理 原链接 https xxxx my sharepoint com x g personal xx xx xx xxxxxxxxxx 直链 https xxxx my sharepoint com personal xx xx xx l
  • CNN之手写数字识别(Handwriting Recognition)

    CNN之手写数字识别 Handwriting Recognition 目录 CNN之手写数字识别 Handwriting Recognition 1 常用的包 2 常见概念 3 手写数字识别器实现 3 1 数据准备 3 2 构建网络 3 3
  • VB基础语法

    一 基础概念 1 1变量 变量的定义格式 Dim Private Static Public 变量名 As 数据类型 Dim Private Static Public 就是我之前所说的权限的意思 As 数据类型 这个是可选项 你可以为这个
  • 准备数据集

    目录 介绍 足够的数据集 收集图像 调整图像大小 下一步 在这里 我们简要说明了数据集的要求 然后 我们提出了收集数据的方法 在Internet上搜索图像 搜索视频并从中上传帧 然后 我们提供一些找到的视频的参考 然后 我们说明使用可用工具
  • defer和async的区别

    没有 defer 或 async 浏览器会立即加载并执行指定的脚本 立即 指的是在渲染该 script 标签之下的文档元素之前 也就是说不等待后续载入的文档元素 读到就加载并执行 有 async 加载和渲染后续文档元素的过程将和 scrip
  • python读取docx文件,并进行一些操作

    python读取docx文件 1 安装包 先前试用过很多包 都不管用 读取文件时候会出现如下错误 pywintypes com error 2147352567 发生意外 0 Kingsoft WPS 文档保存失败 3011 2147467
  • 从零开始学python 07——字典

    一 字典 1 字典的定义 通过 里面的数据都是以键值对保存 key value 字典中可以存在多个键值对 用逗号隔开 注意点 字典中的key一般都是字符串类型 也可以是数值类型 字典中的key一般不要相同 如果出现多个相同的key 以最后一
  • python 点名随机+人脸识别

    基于tkinter写的随机点名窗口程序 运行截图 主窗口 点名操作 人脸识别操作 具体代码如下 主窗口 import random import tkinter import tkinter as tk import threading i
  • win7安装visual studio 2015出现安装包丢失或损坏

    win r 输入 certmgr msc 查看有没有选中的两个证书 如果没有需要从其他电脑导入 然后直接点击安装界面重试 即可继续安装
  • 海关爬虫7代(圣佛版)

    声明 代码仅作学习交流用途 代码分享者与创作者不承担任何由他人恶意运行而导致的责任 勿擅自修改限制频率的参数 勿恶意攻击网页 请学习浏览者遵守社会公德与法律秩序 爬虫导致的网页崩溃等损失由计算机操作者负全部责任 造成严重后果的需要承担刑事责
  • vue顶部菜单加左侧菜单_物流项目之用户登录、主页面、顶部菜单授权

    工程搭建分析 freight parent 父工程 打包方式pom 管理jar包的版本号 所有module都应该继承父工程 为什么不在freight parent定义所有jar包 而是定义版本号呢 项目部署到tomcat需要打war包 如果
  • hive中distribute by、sort by、cluster by

    1 背景 hive中有一个store表 字段分别是 商店所属人标识 merid 商户余额 money 商店名称 name 求每个法人下属的商店的余额按照降序排序 merid money name B 10 store B 4 A 12 st
  • 区块链技术2---BTC的数据结构

    1 Hash pointers 哈希指针 和普通指针相比 哈希指针除了保存地址还保存哈希值 2 Block chain 区块链中的区块通过哈希指针相连 这里的哈希指针的哈希值是对前一个区块的整体取哈希值 包括前一个区块的哈希指针 因此区块链
  • python3.7安装dlib (Wind10)

    使用pip install dlib 提示失败 原因 https pypi org project dlib files 查看说明最新版本dlib 19 20 0 不支持Python3 7 解决方案 整理了下网上说的方案大致如下 一 编译安
  • android 悬浮组件实现

    项目需求 需要实现一个每个页面都存在的悬浮按钮 可以拖动 跟随整个项目的生命周期 即应用登录之后显示悬浮按钮 应用退出之后 隐藏悬浮按钮 特殊页面隐藏悬浮按钮 应用后台展示之后 隐藏悬浮按钮 应用恢复前台展示 显示悬浮按钮 准备工作 添加权
  • js提示“没有权限”的问题(转载)

    当某个互联网运营商的网站上规模之后 他们都会考虑将网站部署到主域名相同 子域名不同的服务器集群上 以此来构建一个聚合的应用 同时 希望能够利用 JavaScript 在不同子域的网页间相互操作 实现一个对用户来说 无缝 的应用 这时 跨域操
  • 我是如何用 redis 分布式锁来解决线上历史业务问题的

    近期发现 开发功能的时候发现了一个 mq 消费顺序错乱 历史遗留问题 导致业务异常的问题 看看我是如何解决的 问题抛出 首先 简单介绍一下情况 线上 k8s 有多个 pod 会去消费 mq 中的消息 可是生产者发送的消息是期望一定要有序去消