Redis缓存与数据库双写一致性解决方案

2023-11-19

目录

 

1、冤孽的诞生

1.1 需求起因

1.2 策略之争

2、标准解决方案

2.1 延时双删策略

2.2 异步更新缓存(基于订阅binlog的同步机制)

3 、基于binlog订阅实现步骤

3.1 准备材料

3.2 代码实现


1、冤孽的诞生

1.1 需求起因

在高并发的业务场景下,数据库大多数情况都是用户并发访问最薄弱的环节。所以,就需要使用redis做一个缓冲操作,让请求先访问到redis,而不是直接访问MySQL等数据库 !

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

好了,我们现在引入缓存的概念,那么访问路程变成了如下:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

上面这个经典的读取缓存步骤一般没有什么问题,但是一旦涉及到数据更新:数据库和缓存更新,就容易出现缓存(Redis)和数据库间的数据一致性问题

有以下这些不一致的场景:

  • 当更新数据时,如更新某商品的库存,当前商品的库存是100,现在要更新为99,先更新数据库更改成99,然后更新缓存,发现更新缓存失败了,这意味着数据库存的是99,而缓存还是100,这导致数据库和缓存不一致

  • 如果删除了缓存Redis记录,还没有来得及删除对应的数据库记录,另一个线程就来读取,发现缓存为空,则去数据库中读取数据写入缓存,此时缓存中为脏数据。

因为写和读是并发的,没法保证顺序,就会出现缓存和数据库的数据不一致的问题。

1.2 策略之争

我们讨论三种更新策略:

  • 先更新数据库,再更新缓存

    这套方案,大家是普遍反对的。为什么呢?有如下两点原因。

    原因一(线程安全角度) 同时有请求A和请求B进行更新操作,那么会出现

    (1)、线程A更新了数据库

    (2)、线程B更新了数据库

    (3)、线程B更新了缓存

    (4)、线程A更新了缓存

    这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。

    原因二(业务场景角度) 有如下两点:

    (1)、如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到,缓存就被频繁的更新,浪费性能。

    (2)、如果你写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存。那么,每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。

  • 先删除缓存,再更新数据库

    该方案会导致不一致的原因是。同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:

    (1)、请求A进行写操作,删除缓存

    (2)、请求B查询发现缓存不存在

    (3)、请求B去数据库查询得到旧值

    (4)、请求B将旧值写入缓存

    (5)、请求A将新值写入数据库 上述情况就会导致不一致的情形出现。而且,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。

  • 先更新数据库,再删除缓存

    知名社交网站facebook也在论文《Scaling Memcache at Facebook》中提出,他们用的也是先更新数据库,再删缓存的策略。

    脸书很牛B,但是代表他用这套方案就没问题吗?不是的!

    假设这会有两个请求,一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生

    (1)请求A查询数据库,得一个旧值

    (2)请求B将新值写入数据库

    (3)请求B删除缓存

    (4)请求A将查到的旧值写入缓存,如果发生上述情况,确实是会发生脏数据。

    然而,发生这种情况的概率又有多少呢?

    发生上述情况有一个先天性条件,就是步骤(2)的写数据库操作比步骤(1)的读数据库操作耗时更短,才有可能使得步骤(3)先于步骤(4)。

    可是,大家想想,数据库的读操作的速度远快于写操作的(不然做读写分离干嘛,做读写分离的意义就是因为读操作比较快,耗资源少),因此步骤(2)耗时比步骤(1)更短,这一情形很难出现。

    但是难出现不代表不出现!因为我们的数据库写跟表数据量无关,而读会随着数据量的增加而变慢!

    但是从业务层面讲,对于海量数据一般都不会放入缓存,毕竟内存相对于机械硬盘都要贵很多!并且就算你不缺钱,当数据量达到1千万左右时,由于内存中不能存储如此大量数目的数据,频繁同磁盘进行数据交换(持久化),导致数据查询、存储性能的急剧下降,将导致服务不可用。当前还没有好的产品可以实现key-value保证数据完整性,千万级条数量级的,高效存储和查询支持产品。

  • 总结

    对于上述三种方案,我们抛弃第一种,但是第二种和第三种还有一个共同的毛病,那就是更新数据库和删除缓存不是一个原子性操作!所以引入了以下两种业界普遍使用的标准方案

2、标准解决方案

2.1 延时双删策略

2.1.1 基本流程图

 

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

2.1.2 伪代码

public void write(String key,Object data){
​
        redis.delKey(key);
​
        db.updateData(data);
​
        Thread.sleep(1000);
​
        redis.delKey(key);
​
    }
​
/*转化为中文描述就是
(1)先淘汰缓存
(2)再写数据库
(3)休眠1秒(根据具体的读操作业务时间来定)
(4)再次淘汰缓存
​
这么做,可以将1秒内所造成的缓存脏数据,再次删除。
那么,这个1秒怎么确定的,具体该休眠多久呢?
针对上面的情形,读者应该自行评估自己的项目的读数据业务逻辑的耗时。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上,加几百ms即可。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。
如果你用了mysql的读写分离架构怎么办?
ok,在这种情况下,造成数据不一致的原因如下,还是两个请求,一个请求A进行更新操作,另一个请求B进行查询操作。
(1)请求A进行写操作,删除缓存
(2)请求A将数据写入数据库了,
(3)请求B查询缓存发现,缓存没有值
(4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值
(5)请求B将旧值写入缓存
(6)数据库完成主从同步,从库变为新值
(7)请求A将缓存中B写入的旧值数据删除
​
上述情形,如果休眠时间没有考虑数据同步时间消耗,那么第七步先于第五步执行了会造成数据不一致。还是使用双删延时策略。只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms。
​
采用这种同步淘汰策略,吞吐量降低怎么办?
ok,那就将第二次删除作为异步的。自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后了,再返回。这么做,加大吞吐量。
第二次删除,如果删除失败怎么办?
这是个非常好的问题,因为第二次删除失败,就会出现如下情形。还是有两个请求,一个请求A进行更新操作,另一个请求B进行查询操作,为了方便,假设是单库:
(1)请求A进行写操作,删除缓存
(2)请求B查询发现缓存不存在
(3)请求B去数据库查询得到旧值
(4)请求B将旧值写入缓存
(5)请求A将新值写入数据库
(6)请求A试图去删除请求B写入的缓存值,结果失败了。
​
ok,这也就是说。如果第二次删除缓存失败,会再次出现缓存和数据库不一致的问题。咋办?
们需要提供一个保障重试的方案:
    定时任务,这样会压力太大,并且一直阻塞会影响性能
    消息队列,异步处理,可以让性能提升,但是对业务代码造成大量的侵入
    
最后的最后,这个延时的时间,你真的好把握吗?而且就算你能把握好,更新数据库和删除缓存的操作不是原子性的,你怎么解决掉?
    1、你可以使用重试删除,但是这样做的结果是以牺牲响应时间为代价
    2、另起一个异步线程处理,这又是以系统资源消耗作为代价并且凭空造成代码的复杂度
​

2.1.3 模拟代码

    /*
        延时双删
     */
    @RequestMapping("update2")
    public void update2() throws InterruptedException {
        //将redis中该缓存删除
        redisTemplate.delete("20200101010101");
​
        //这个位置另外一个读操作进程进来了
        Store oldStore = storeMapper.getStore("20200101010101");
​
        //写入数据库
        Store newStore = new Store("20200101010101", 97);
        storeMapper.update(newStore);
        try {
            Thread.sleep(3000);//这个地方需要自己去评估项目的读数据业务逻辑的耗时,然后加几百ms,如果是主从同步,还应该加上同步时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //读操作在这个位置进行数据缓存
        redisTemplate.opsForValue().append(oldStore.getCode(),JSON.toJSONString(oldStore));
​
        //再次删除缓存数据
        //问题,如果该处删除失败,则缓存里面还是旧数据(脏数据)
        Boolean isDelete = false;
        while (!isDelete){
            isDelete = redisTemplate.delete("20200101010101");
        }
        System.out.println("缓存删除成功");
    }
​

 

2.2 异步更新缓存(基于订阅binlog的同步机制)

2.2.1 基本流程图

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

这个也是我们今天的主要讲解,也是业界用得最多的方案,该方案的核心在于使用队列对读写操作进行排队操作,保证了数据的最终一致性,当然,性能相对上一种要差,但是还是那句话,数据的准确性才是最重要的!

2.2.2 初识Canal

阿里Canal主要是听过伪装成mysql从节点来向主节点拉取binlog日志解析成消息推送到MQ消息队列。

Canal在双写一致性中所处的位置:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)

  • canal解析binary log对象(原始为byte流)

  • canal将解析后的对象数据推送给监听的消息中间件(实时主动推送,rabbitmq需要是在线状态)

 

3 、基于binlog订阅实现步骤

3.1 准备材料

1、需要部署一个阿里巴巴的Canal服务端,用于订阅Mysql binlog日志并推送到MQ消息队列

#下载解压canal server
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz  #下载canal部署包
mkdir canal
tar -zxvf canal.deployer-1.1.1.tar.gz -C canal #解压
​
#编辑conf/canal.properties,修改MQ配置
canal.ip = 1 #canal服务器标识
canal.serverMode = rabbitmq # 指定rabbitmq
canal.mq.servers = 192.168.223.128 ## 注意不要加端口号,不然会报IPV6错误。
canal.mq.vhost=canal  #MQ虚拟机名称
canal.mq.exchange=exchange.trade #交换机名称,用于将消息发送到绑定的队列
canal.mq.username=guest #MQ登录账号,注意要有上面vhost的权限
canal.mq.password=guest #MQ密码
---------------------------------------------------------------------------------
    
#编辑conf/example/instance.properties实例配置,配置数据库信息
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.mysql.slaveId=1234 #不要与my.cnf中server_id相同,因为我要伪装为mysql的slave
canal.instance.master.address=192.168.223.128:3306 ## 数据库地址
canal.instance.defaultDatabaseName=test ## 数据库名
canal.mq.topic=example # 路由键,需要跟MQ中交换机队列的绑定路由key保持一致
​
​

2、安装RabbitMQ

mkdir /usr/local/rabbitmq;
cd /usr/local/rabbitmq;
​
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers;
​
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm;
​
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm;
​
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm;
​
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm;
​
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm;
​
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm;
​
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
#找到loopback_users  修改后台登录用户为[guest]
​
#启动rabbitmq:service rabbitmq-server start
#启动监控管理器:service rabbitmq-plugins enable rabbitmq_management
​
#开启端口:firewall-cmd --zone=public --add-port=15672/tcp --permanent 
#重启防火墙:firewall-cmd --reload

更多精彩请移步《RabbitMQ工作模型及Java编程》

3、安装mysql并开启binlog

rpm -Uvh http://dev.mysql.com/get/mysql-community-release-el7-5.noarch.rpm #下载
yum -y install mysql-community-server #rpm安装
​
#编辑my.cnf配置文件,开启binlog
vim /etc/my.cnf
​
#增加以下配置
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复(没有数据库主从不配也行)
​
#加入开机启动
systemctl enable mysqld
#启动mysql服务进程
systemctl start mysqld
#初始化,执行命令,重置密码 
mysql_secure_installation
#会依次出现以下问题。
    Set root password? [Y/n]
    是否设置root用户的密码 (y后【设置登录密码】)
    Remove anonymous users? [Y/n]
    是否删除匿名用户 (y)
    Disallow root login remotely? [Y/n]
    是否禁止root远程登录 (n)
    Remove test database and access to it? [Y/n]
    是否删除test数据库(y)
    Reload privilege tables now? [Y/n]
    是否重新加载授权信息 (y)
​
# 先进入mysql
mysql -u root -p
# 授权(root用户)远程连接权限(不建议)
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '远程登录密码' WITH GRANT OPTION;
FLUSH PRIVILEGES;
# 使用单独的远程登录用户(推荐)
GRANT ALL PRIVILEGES ON *.* TO '新用户名'@'%' IDENTIFIED BY '远程登录密码' WITH GRANT OPTION;
FLUSH PRIVILEGES;
​
#查看是否已经开启了binlog日志
#登录mysql后输入如下命令:
show variables like '%log_bin%';
​
| Variable_name                   | Value                          |
+---------------------------------+--------------------------------+
| log_bin                         | ON                             |
| log_bin_basename                | /var/lib/mysql/mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF                            |
| log_bin_use_v1_row_events       | OFF                            |
| sql_log_bin                     | ON  
​
#查看binlog日志:
#1、查看第一个binlog文件的内容
mysql> show binlog events;
+------------------+-----+-------------+-----------+-------------+---------------------------------------+
| Log_name         | Pos | Event_type  | Server_id | End_log_pos | Info                                  |
+------------------+-----+-------------+-----------+-------------+---------------------------------------+
| mysql-bin.000001 |   4 | Format_desc |         1 |         120 | Server ver: 5.6.49-log, Binlog ver: 4 |
| mysql-bin.000001 | 120 | Query       |         1 |         192 | BEGIN                                 |
| mysql-bin.000001 | 192 | Table_map   |         1 |         249 | table_id: 70 (test.goods_store)       |
| mysql-bin.000001 | 249 | Delete_rows |         1 |         294 | table_id: 70 flags: STMT_END_F        |
| mysql-bin.000001 | 294 | Xid         |         1 |         325 | COMMIT /* xid=11 */                   |
+------------------+-----+-------------+-----------+-------------+---------------------------------------+
​

分别启动mysql,rabbitmq,canal

service mysql start #启动mysql
service rabbitmq-server start #启动rabbitmq
canal目录/bin/startup.sh #启动canal服务

问题:

 {"identity":{"slaveId":-1,"sourceAddress":{"address":"ydt1","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000026","position":551,"serverId":1,"timestamp":1594283137000}}
2020-07-31 17:27:24.973 [destination = example , address = /192.168.223.128:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000026,position=551,serverId=1,gtid=,timestamp=1594283137000] cost : 617ms , the next step is binlog dump
2020-07-31 17:27:25.106 [destination = example , address = /192.168.223.128:3306 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O error while reading from client socket
​
#如果出现了以上问题,可能是mysql数据库的binlog日志位置不对,重新设置一下
#先找出当前mysql的binlog日志position,进入mysql客户端,输入如下命令:
show master status;
#找到当前binlog以及position,编辑canal目录/conf/example/meta.dat元数据脚本
vim /usr/local/canal/conf/example/meta.dat
将----》"journalName":"mysql-bin.000003","position":499改为自己查到的或者比查到的小即可
#或者直接将该文件删除,重新生成当前数据库执行命令的position位置对应的元数据脚本
​

3.2 代码实现

3.2.1 pom.xml

<dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.2</version>
        </dependency>
​
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
​
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.3</version>
        </dependency>

3.2.2 application.yml

spring:
  rabbitmq:
    virtual-host: canal
    host: 192.168.223.128
    publisher-confirms: true
  #数据源
  datasource:
    url: jdbc:mysql://192.168.223.128:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=UTC
    username: root
    password: root
    driver-class-name: com.mysql.jdbc.Driver
  redis:
    host: 192.168.223.128
​

 

3.2.3 消息队列配置类

package com.ydt.test.message;
​
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
​
@Configuration
public class DirectRabbitConfig {
 
    //队列 起名:exchange.trade.canal
    @Bean
    public Queue TestDirectQueue() {
        return new Queue("exchange.trade.canal",true);
    }
 
    //Direct交换机 起名:exchange.trade
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("exchange.trade");
    }
 
    //绑定  将队列和交换机绑定, 并设置用于匹配键:example
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("example");
    }
}

3.2.4 消息监听者类

package com.ydt.test.message;
​
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
​
import java.util.Map;
​
@Component
public class DirectReceiver {
​
    @Autowired
    private StringRedisTemplate redisTemplate;
​
    @RabbitListener(queues = "exchange.trade.canal")
    public void process(Message message){
        String json = new String(message.getBody());
        System.out.println("消费的消息:" + json);
        Map map = JSON.parseObject(json,Map.class);
        JSONArray array = null;
        String sqlType = (String) map.get("type");
        if(StringUtils.endsWithIgnoreCase("SELECT",sqlType)){
            array = JSONArray.parseArray((String)map.get("data"));
        }else{
            array = (JSONArray)map.get("data");
        }
        if(array == null){
            return;
        }
        JSONObject object = array.getJSONObject(0);
       /* if(StringUtils.endsWithIgnoreCase("UPDATE",sqlType)
                || StringUtils.endsWithIgnoreCase("INSERT",sqlType)
                || StringUtils.endsWithIgnoreCase("SELECT",sqlType)){
            redisTemplate.boundValueOps(object.get("code").toString()).set(object.toString());
        }else if(StringUtils.endsWithIgnoreCase("DELETE",sqlType)){
            redisTemplate.delete(object.get("code").toString());
        }*/
        if(StringUtils.endsWithIgnoreCase("SELECT",sqlType)){
            redisTemplate.boundValueOps(object.get("code").toString()).set(object.toString());
        }else{
            redisTemplate.delete(object.get("code").toString());
        }
    }
}
​

 

3.2.5 Controller调用

package com.ydt.test.controller;
​
import com.alibaba.fastjson.JSON;
import com.ydt.test.domain.Store;
import com.ydt.test.mapper.StoreMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
​
import java.util.HashMap;
import java.util.Map;
​
@RestController
public class MessageController {
​
​
    @Autowired
    private StoreMapper storeMapper;
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @Autowired
    private RedisTemplate redisTemplate;
​
    /**
     * 案例1
     */
    @RequestMapping("update1")
    public void update1(){
        Store store = new Store("20200101010101", 98);
        storeMapper.update(store);
        int i = 1/0;
        redisTemplate.delete("20200101010101");
    }
​
    @RequestMapping("get")
    public String getMessage(){
        //查询操作
        Store store = storeMapper.getStore("20200101010101");
        System.out.println("-----------我进行了查询,现在我要开始进行redis缓存了-----------");
        //同一数据源
        Map map = new HashMap();
        map.put("type", "SELECT");
        map.put("data", "[{'code':'20200101010101','store':"+store.getStore()+"}]");
        rabbitTemplate.convertAndSend("exchange.trade", "example", JSON.toJSONString(map));
        return  "";
    }
}
​

3.2.6 启动测试

1、先将库存表中库存修改为111,会通过canal伪slave拿到binlog日志,然后推送到rabbitmq

2、然后调用get方法,拿到数据库中数据,同时将数据推送到rabbitmq

3、重复1操作,将库存改为321

4、重复2操作

打开监听,启动服务可以看到我们的消费者会按照顺序消费队列中的数据!

DirectReceiver消费者收到消息  : {"data":[{"code":"20200101010101","store":"111"}],"database":"test","es":1596198062000,"id":13,"isDdl":false,"mysqlType":{"code":"varchar(255)","store":"int(11)"},"old":[{"store":"123"}],"pkNames":["code"],"sql":"","sqlType":{"code":12,"store":4},"table":"goods_store","ts":1596198062527,"type":"UPDATE"}
DirectReceiver消费者收到消息  : {"data":"[{'code':'20200101010101','store':111}]","type":"SELECT"}
DirectReceiver消费者收到消息  : {"data":[{"code":"20200101010101","store":"321"}],"database":"test","es":1596198075000,"id":14,"isDdl":false,"mysqlType":{"code":"varchar(255)","store":"int(11)"},"old":[{"store":"111"}],"pkNames":["code"],"sql":"","sqlType":{"code":12,"store":4},"table":"goods_store","ts":1596198075520,"type":"UPDATE"}
DirectReceiver消费者收到消息  : {"data":"[{'code':'20200101010101','store':321}]","type":"SELECT"}
​

 

 

 

 

 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Redis缓存与数据库双写一致性解决方案 的相关文章

随机推荐

  • python3 字符串format 输出

    gt gt gt help FORMATING Traceback most recent call last File
  • 潜艇来袭(Qt官方案例-2维动画游戏)

    一 游戏介绍 1 开始界面 启动程序 进入开始界面 2 开始新游戏 点击菜单 File New Game 或者Ctrl N 进入新游戏 开始新游戏之后 会有一个海底的潜艇 和水面舰艇对战 计算机 自动控制潜艇 海底潜艇会隔段时间发射一枚鱼雷
  • Node输出日志的正确姿势

    背景 每个程序员都喜欢在有问题的代码中插入一些日志的方法来帮助调试程序 比如System out println或console log 解决后 就会将这些语句删除 周而复始 但是通过系统日志输出的日志格式都是这种 output conso
  • Android加载webView--setWebChromeClient默认不会显示弹窗

    1 设置自定义浏览器客户端 webView setWebChromeClient new MyWebChromeClient 2 onJsAlert就是弹窗 只会有一个 自定义弹出框 class MyWebChromeClient exte
  • VSCode关闭vue语法检查

    今天碰到一个这样的错误 Component name School should always be multi word 意思是组件名称 School 应该总是多个单词 解决办法 在vue config js中添加这样一句代码 lintO
  • QT实现动态翻译和语言切换

    QT GUI提供语言动态转换机制并辅以相应的工具方便programmer实现界面的多语言实时动态切换功能 实现语言动态切换的方法 一个注意 五个步骤 一个注意 实现QT工程的语言切换功能的一个关键点是所有的字符串都需要tr修饰符 例如 m
  • 【数据结构初阶】第七节.树和二叉树的基本操作

    作者简介 大家好 我是未央 博客首页 未央 303 系列专栏 Java初阶数据结构 每日一句 人的一生 可以有所作为的时机只有一次 那就是现在 文章目录 前言 一 二叉树的快速构建 二 二叉树的遍历 2 1 前序遍历 2 2 中序遍历 2
  • 从计数器到分频电路

    一 计数器 1 计数器代码 计数器 顾名思义就是在时钟的节拍下进行计数 一个简单的N位计数器的代码如下所示 这个计数器从0计数到2 N 1 共计数了2 N个数 也就是N位计数器 1 module count parameter N 8 2
  • 微信小程序登录弹框问题

    1 getUserInfo 相信刚接触微信小程序开发的人都在想 官方给出的这个是什么意思 我来解释一下吧 还记得我们在最开始使用微信小程序的时候吗 第一次进一个微信小程序的时候会直接弹出来个框 询问我们是否允许哟用户获取信息 微信官方觉得这
  • C++11 之 std::function & std::bind & lambda 表达式

    文章目录 std function std bind lambda 表达式 总结 c 11新增了 std function std bind lambda 表达式等封装使函数调用更加方便 std function 讲 std functio
  • 生活之你为什么不学习

    最近在别人的空间 我看到了八句话 你有什么理由不学习 感觉说得挺有鸡血的 1 你不能把这个世界让给你所鄙视的人 2 成功的速度一定要超过父母老去的速度 3 可怕的不是别人比你优秀 而是比你优秀的人比你还努力 4 我努力的目的是让我的妈妈买东
  • 【华为OD机试python】按身高和体重排队【2023 B卷

    华为OD机试 真题 点这里 华为OD机试 真题考点分类 点这里 题目描述 某学校举行运动会 学生们按编号 1 2 3 n 进行标识 现需要按照身高由低到高排列 对身高相同的人 按体重由轻到重排列 对于身高体重都相同的人 维持原有的编号顺序关
  • 最全的交叉编译Makefile讲解

    最近正在搞交叉编译 参考很多博客 学习了一下Makefile的编写 记录一下Makefile内代码是什么意思 代码如下 简单的hello ko的makefile ifneq KERNELRELEASE obj m hello o else
  • [CUDA] 快速入门CUDA(1)-基本了解和HelloWorld

    CUDA基础 文章目录 CUDA基础 1 CUDA简介 2 GPU和CPU架构的不同之处 3 查看GPU硬件信息 4 需要建立的基本概念 5 总结 1 CUDA简介 CUDA的全程是Computer Unified Device Archi
  • 树莓派4B(buster)的源更换为北外(清华)国内源

    树莓派4B buster 的源更换为北外 清华 国内源 1 登陆到树莓派 ssh pi your raspi IP 2 备份源文件 sudo cp etc apt sources list etc apt sources list bak
  • GoogLeNet论文详解

    GoogLeNet 1 Introduction 得益于深度学习的优势和更强大的卷积神经网络的出现 图像分类和目标检测的准确率发生了令人意想不到的进步 在2014年的ILSVRC比赛中 GoogLeNet取得了第一名的成绩 所用模型参数不足
  • 详解如何修改Linux文件权限

    参考 详解如何修改Linux文件权限 Linux文件权限详解 在Linux系统中 可以使用chmod命令来修改文件的权限 该命令用于更改文件或目录的读取 r 写入 w 和执行 x 权限 以下是一些详细的说明和示例 使用数字表示权限 r 读取
  • Golang教程:(十六)结构体

    原文 https golangbot com structs 欢迎来到Golang系列教程的第十六篇 什么是结构体 结构体 struct 是用户自定义的类型 它代表若干字段的集合 有些时候将多个数据看做一个整体要比单独使用这些数据更有意义
  • element el-table render-header自定义复选框

    项目中需要对列表数据进行批量处理 表头增加复选框 并关联列表数据 el table提供解决方法 实现多选非常简单 手动添加一个el table column 设type属性为selection即可 尝试后在我的项目中不适用 于是找到另一种r
  • Redis缓存与数据库双写一致性解决方案

    目录 1 冤孽的诞生 1 1 需求起因 1 2 策略之争 2 标准解决方案 2 1 延时双删策略 2 2 异步更新缓存 基于订阅binlog的同步机制 3 基于binlog订阅实现步骤 3 1 准备材料 3 2 代码实现 1 冤孽的诞生 1