1.0 缓存击穿
概念
一些redis的key过期,同时大量数据请求过期的key或者redis不存在的key,导致大量请求打到数据库,导致数据库瘫痪!!!
解决方案
1. 设置热点数据永不过期。
2. 对热点数据加锁(分布式锁)。
代码实现(初始化项目)
商品表
各层级
package com.fei.pojo.entity;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* (Goods)表实体类
*
* @author makejava
* @since 2023-04-13 22:09:35
*/
@SuppressWarnings("serial")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Goods {
//商品id@TableId
private Integer id;
//商品名称
private String name;
//商品价格
private Integer price;
//商品描述
private String describe;
//0-上架 1-下架
private Integer state;
}
service
package com.fei.service.impl;
import com.fei.mapper.GoodMapper;
import com.fei.pojo.entity.Goods;
import com.fei.service.GoodService;
import com.fei.util.RedisCache;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Service("goodService")
public class GoodServiceImpl implements GoodService {
@Resource
GoodMapper goodMapper;
@Resource
RedisCache redisCache;
@Override
public Goods selectGoodById(Integer id) {
//方案一 最初始化的状态
//判断缓存中是否存在
Goods good = (Goods)redisCache.getCacheObject(id+"");
if(Objects.isNull(good)){
Goods goods = goodMapper.selectGoodById(id);
//判断查询商品对象是否为空
if(!Objects.isNull(goods)) {
redisCache.setCacheObject(goods.getId().toString(), goods, 2, TimeUnit.HOURS);
}
}
return goodMapper.selectGoodById(id);
}
@Override
public List<Goods> selectGoods() {
return goodMapper.selectGoods();
}
}
1.1 避免缓存击穿代码实现
判断是否为热点数据-->是热点数据申请锁-->申请成功进行加分布式锁-->去数据库查该热点数据-->将数据设置为永不过期-->释放锁-->申请失败则进行自旋去申请锁
首先分布式锁存在一些问题
1. 锁的内容出现异常导致一直被占用
可以通过设置redis过期时间来解决
2. A锁没有过期,B锁把A锁释放
可以用UUID解决
3. 分布式锁,设置锁,判断锁,删除锁存在原子性问题
可以使用lua脚本解决
在resouce下写lua脚本
if redis.call("get",KEYS[1])==ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
注入到spring中
@Bean
public DefaultRedisScript<Boolean> script(){
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
//lock.lua脚本位于 application.yml同级目录
redisScript.setLocation(new ClassPathResource("lock.lua"));
redisScript.setResultType(Boolean.class);
return redisScript;
}
查询商品解决缓存击穿代码如下
package com.fei.service.impl;
import com.fei.controller.UserController;
import com.fei.mapper.GoodMapper;
import com.fei.pojo.entity.Goods;
import com.fei.service.GoodService;
import com.fei.util.RedisCache;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Service
public class GoodServiceImpl implements GoodService {
private Logger logger = LoggerFactory.getLogger(GoodServiceImpl.class);
@Resource
GoodMapper goodMapper;
@Resource
RedisCache redisCache;
@Resource
RedisTemplate redisTemplate;
@Autowired
RedisScript<Boolean> script;
String LOCK_PREFIX = "lock:good:id:";
@Override
public Goods selectGoodById(Integer id) {
// //判断缓存中是否存在
// Goods good = (Goods)redisCache.getCacheObject(id+"");
// if(Objects.isNull(good)){
// Goods goods = goodMapper.selectGoodById(id);
// //判断查询商品对象是否为空
// if(!Objects.isNull(goods)) {
// redisCache.setCacheObject(goods.getId().toString(), goods, 2, TimeUnit.HOURS);
// }
// }
// return goodMapper.selectGoodById(id);
//判断缓存中是否存在
Goods good = (Goods)redisCache.getCacheObject(id.toString());
logger.info("在缓存中取出id=={}的商品信息为{}",id,good);
if(Objects.isNull(good)){
//先判断这个id是不是爆款商品
//这里假设 id = 1 的属于爆款商品
if(id == 1){
//分布式锁的key
String lockKey = LOCK_PREFIX + id;
// 每一把锁都有自己独有的uuid
String uuid = UUID.randomUUID().toString();
//设置分布式锁
ValueOperations valueOperations = redisTemplate.opsForValue();
//对锁进行占位
Boolean isLock = valueOperations.setIfAbsent(lockKey, uuid, 300, TimeUnit.SECONDS);
logger.info("分布式锁的状态{}",isLock);
if(isLock){
Goods goods = goodMapper.selectGoodById(id);
logger.info("获得加锁的key-->{}",valueOperations.get(lockKey));
if(!Objects.isNull(goods)){
valueOperations.set(goods.getId().toString(),goods);
Boolean result=(Boolean)redisTemplate.execute(script, Collections.singletonList(lockKey),uuid);
return goods;
}
return goods;
}else {
logger.info("申请锁失败,进行自旋");
try {
Thread.sleep(1500);
}catch (Exception e){
e.printStackTrace();
}
return selectGoodById(id);
}
}
Goods goods = goodMapper.selectGoodById(id);
//判断查询商品对象是否为空
if(!Objects.isNull(goods)) {
redisCache.setCacheObject(goods.getId().toString(), goods, 2, TimeUnit.HOURS);
}
return goods;
}
return good;
}
@Override
public List<Goods> selectGoods() {
return goodMapper.selectGoods();
}
}
自己语言叙述为:
首先判断缓存中是否存在 --> 不存在再判断是否是热点数据 --> 如果是热点数据则使用 分布式锁 用lua脚本保持原子性 存入缓存返回数据--> 不是热点数据则直接查询数据库,存储缓存!!返回数据
2.0 缓存穿透
描述
当用户发起请求,先去缓存中查找,没找到又去持久层数据库中查找也没找到,这种算是查询失败。在高并发情况下出现大量这种查询失败,导致持久层压力过大,甚至宕机,叫做缓存穿透。
解决方案
1. 布隆过滤器
2. 将查找的不存在的数据进行缓存null(缓存过期时间设置短一点,节省空间)
2.1 缓存穿透代码的实现
2.1.1 导入布隆过滤器所需要的依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
2.1.2 写一个布隆过滤器的工具类
package com.fei.util;
import com.fei.mapper.GoodMapper;
import com.fei.pojo.entity.Goods;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import java.util.List;
/**
* 布隆过滤器的工具类
*/
public class BloomFilterUtil {
//布隆过滤器的容量
public static final int CATACITY = 1000000;
private static BloomFilter<Integer> bloomFilter=BloomFilter.create(Funnels.integerFunnel(),CATACITY);
public static BloomFilter<Integer> getBloomFilter(){
return bloomFilter;
}
/**
* 判断是否包含key(有可能有误差)
* @param i
* @return
*/
public static boolean mayContains(Integer i){
return bloomFilter.mightContain(i);
}
}
2.1.3
将商品的id在初始化的时候都加载到布隆过滤器中
package com.fei.config;
import com.fei.mapper.GoodMapper;
import com.fei.pojo.entity.Goods;
import com.fei.service.impl.GoodServiceImpl;
import com.fei.util.BloomFilterUtil;
import com.google.common.hash.BloomFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
@Order(3)
public class InitClass implements ApplicationRunner {
private Logger logger = LoggerFactory.getLogger(InitClass.class);
@Resource
GoodMapper goodMapper;
@Override
public void run(ApplicationArguments args) throws Exception {
List<Goods> goodsList = goodMapper.selectGoods();
logger.info("初始化商品id到布隆过滤器中,商品id---->{}",goodsList);
BloomFilter<Integer> bloomFilter = BloomFilterUtil.getBloomFilter();
for(Goods good : goodsList){
logger.info("初始化商品id到布隆过滤器中,商品id---->{}",good.getId());
bloomFilter.put(good.getId());
}
}
}
2.1.4 改进后的代码
package com.fei.service.impl;
import com.fei.controller.UserController;
import com.fei.mapper.GoodMapper;
import com.fei.pojo.entity.Goods;
import com.fei.service.GoodService;
import com.fei.util.BloomFilterUtil;
import com.fei.util.RedisCache;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Service
public class GoodServiceImpl implements GoodService {
private Logger logger = LoggerFactory.getLogger(GoodServiceImpl.class);
@Resource
GoodMapper goodMapper;
@Resource
RedisCache redisCache;
@Resource
RedisTemplate redisTemplate;
@Autowired
RedisScript<Boolean> script;
String LOCK_PREFIX = "lock:good:id:";
@Override
public Goods selectGoodById(Integer id) {
// //判断缓存中是否存在
// Goods good = (Goods)redisCache.getCacheObject(id+"");
// if(Objects.isNull(good)){
// Goods goods = goodMapper.selectGoodById(id);
// //判断查询商品对象是否为空
// if(!Objects.isNull(goods)) {
// redisCache.setCacheObject(goods.getId().toString(), goods, 2, TimeUnit.HOURS);
// }
// }
// return goodMapper.selectGoodById(id);
//判断缓存中是否存在
Goods good = (Goods)redisCache.getCacheObject(id.toString());
//布隆过滤器过滤
logger.info("在缓存中取出id=={}的商品信息为{}",id,good);
boolean isExist = BloomFilterUtil.mayContains(id);
if(isExist) {
logger.info("布隆过滤器是否存在值------>{}", isExist);
if (Objects.isNull(good)) {
//先判断这个id是不是爆款商品
//这里假设 id = 1 的属于爆款商品
if (id == 1) {
//分布式锁的key
String lockKey = LOCK_PREFIX + id;
// 每一把锁都有自己独有的uuid
String uuid = UUID.randomUUID().toString();
//设置分布式锁
ValueOperations valueOperations = redisTemplate.opsForValue();
//对锁进行占位
Boolean isLock = valueOperations.setIfAbsent(lockKey, uuid, 300, TimeUnit.SECONDS);
logger.info("分布式锁的状态{}", isLock);
if (isLock) {
Goods goods = goodMapper.selectGoodById(id);
logger.info("获得加锁的key-->{}", valueOperations.get(lockKey));
if (!Objects.isNull(goods)) {
valueOperations.set(goods.getId().toString(), goods);
Boolean result = (Boolean) redisTemplate.execute(script, Collections.singletonList(lockKey), uuid);
return goods;
}
return goods;
} else {
logger.info("申请锁失败,进行自旋");
try {
Thread.sleep(1500);
} catch (Exception e) {
e.printStackTrace();
}
return selectGoodById(id);
}
}
Goods goods = goodMapper.selectGoodById(id);
//判断查询商品对象是否为空
if (!Objects.isNull(goods)) {
redisCache.setCacheObject(goods.getId().toString(), goods, 2, TimeUnit.HOURS);
} else {
//缓存空对象
//因为布隆过滤器可能有错误值进来,所以这里缓存null
redisCache.setCacheObject(id.toString(), null, 200, TimeUnit.SECONDS);
}
return goods;
}
}else {
//布隆过滤器不存在
//缓存空对象
redisCache.setCacheObject(id.toString(), null, 200, TimeUnit.SECONDS);
}
return good;
}
@Override
public List<Goods> selectGoods() {
return goodMapper.selectGoods();
}
}
3.0 缓存雪崩
概念
缓存层出现错误,不能正确运行,导致所有请求打到存储层,造成存储层挂掉的情况。
解决方案
1. 建立redis集群,达到高可用
2. 加随机因子(根据商品的冷热程度)
3. 限流降级(已经发生的情况下)
4. 加锁排队(同缓存击穿)
3.1 代码的实现
这里主要写2. 加随机因子预防缓存雪崩,主要根据热度的不同加不同随机因子
这里假设id =1 的为爆款 id=2的为除爆款外的第二梯度,id = 3的为第三梯度
实际开发中可以按商品类别分类热门程度等等
实现加随机因子代码如下:
package com.fei.service.impl;
import com.fei.controller.UserController;
import com.fei.mapper.GoodMapper;
import com.fei.pojo.entity.Goods;
import com.fei.service.GoodService;
import com.fei.util.BloomFilterUtil;
import com.fei.util.RedisCache;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Service
public class GoodServiceImpl implements GoodService {
private Logger logger = LoggerFactory.getLogger(GoodServiceImpl.class);
@Resource
GoodMapper goodMapper;
@Resource
RedisCache redisCache;
@Resource
RedisTemplate redisTemplate;
@Autowired
RedisScript<Boolean> script;
String LOCK_PREFIX = "lock:good:id:";
@Override
public Goods selectGoodById(Integer id) {
// //判断缓存中是否存在
// Goods good = (Goods)redisCache.getCacheObject(id+"");
// if(Objects.isNull(good)){
// Goods goods = goodMapper.selectGoodById(id);
// //判断查询商品对象是否为空
// if(!Objects.isNull(goods)) {
// redisCache.setCacheObject(goods.getId().toString(), goods, 2, TimeUnit.HOURS);
// }
// }
// return goodMapper.selectGoodById(id);
//判断缓存中是否存在
Goods good = (Goods)redisCache.getCacheObject(id.toString());
//布隆过滤器过滤
logger.info("在缓存中取出id=={}的商品信息为{}",id,good);
boolean isExist = BloomFilterUtil.mayContains(id);
if(isExist) {
logger.info("布隆过滤器是否存在值------>{}", isExist);
if (Objects.isNull(good)) {
//先判断这个id是不是爆款商品
//这里假设 id = 1 的属于爆款商品
if (id == 1) {
//分布式锁的key
String lockKey = LOCK_PREFIX + id;
// 每一把锁都有自己独有的uuid
String uuid = UUID.randomUUID().toString();
//设置分布式锁
ValueOperations valueOperations = redisTemplate.opsForValue();
//对锁进行占位
Boolean isLock = valueOperations.setIfAbsent(lockKey, uuid, 300, TimeUnit.SECONDS);
logger.info("分布式锁的状态{}", isLock);
if (isLock) {
Goods goods = goodMapper.selectGoodById(id);
logger.info("获得加锁的key-->{}", valueOperations.get(lockKey));
if (!Objects.isNull(goods)) {
valueOperations.set(goods.getId().toString(), goods);
Boolean result = (Boolean) redisTemplate.execute(script, Collections.singletonList(lockKey), uuid);
return goods;
}
return goods;
} else {
logger.info("申请锁失败,进行自旋");
try {
Thread.sleep(1500);
} catch (Exception e) {
e.printStackTrace();
}
return selectGoodById(id);
}
}
//非爆款
Goods goods = goodMapper.selectGoodById(id);
Random random = new Random();
int timeOut = 0;
//判断查询商品对象是否为空
if (!Objects.isNull(goods)) {
if(id == 2){
timeOut = 3600+random.nextInt(1800);
redisCache.setCacheObject(goods.getId().toString(), goods, timeOut, TimeUnit.SECONDS);
}
if(id == 3){
timeOut = 1800 + random.nextInt(300);
redisCache.setCacheObject(goods.getId().toString(), goods, timeOut, TimeUnit.SECONDS);
}
} else {
//缓存空对象
//因为布隆过滤器可能有错误值进来,所以这里缓存null
redisCache.setCacheObject(id.toString(), null, 200, TimeUnit.SECONDS);
}
return goods;
}
}else {
//布隆过滤器不存在
//缓存空对象
redisCache.setCacheObject(id.toString(), null, 200, TimeUnit.SECONDS);
}
return good;
}
@Override
public List<Goods> selectGoods() {
return goodMapper.selectGoods();
}
}
4.0 缓存一致性问题
概念
Redis缓存一致性解决方案主要思考的是删除缓存和更新数据库的先后顺序
解决方案
1. 先删除缓存后更新数据库
存在的问题是可能会数据不一致,一般使用延时双删来解决,即先删除缓存,再更新数据库,休眠X秒后再次淘汰缓存。第二次删除可能导致吞吐率降低,可以考虑进行异步删除。
2. 先更新数据库后删除缓存
存在的问题是会可能会更新失败,可以采用延时删除。但由于读比写快,发生这一情况概率较小。
4.1 代码实现
自定义注解
package com.fei.annotation;
import java.lang.annotation.*;
/**
* 延时双删注解
*/
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target(ElementType.METHOD)
public @interface DelayedDoubleDeletion {
boolean open() default true;
}
切面类
package com.fei.pojo;
import com.fei.annotation.DelayedDoubleDeletion;
import com.fei.util.RedisCache;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.CodeSignature;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.Set;
@Aspect
@Component
public class DelayedDoubleDeletionAspect {
private Logger logger = LoggerFactory.getLogger(DelayedDoubleDeletionAspect.class);
@Autowired
private StringRedisTemplate stringRedisTemplate;
private RedisCache redisCache;
/**
* 切入点
*切入点,基于注解实现的切入点 加上该注解的都是Aop切面的切入点
*
*/
@Pointcut("@annotation(com.fei.annotation.DelayedDoubleDeletion)")
public void pointCut(){
}
/**
* 环绕通知
* 环绕通知非常强大,可以决定目标方法是否执行,什么时候执行,执行时是否需要替换方法参数,执行完毕是否需要替换返回值。
* 环绕通知第一个参数必须是org.aspectj.lang.ProceedingJoinPoint类型
* @param proceedingJoinPoint
*/
@Around("pointCut()")
public Object aroundAdvice(ProceedingJoinPoint proceedingJoinPoint){
logger.info("----------- 环绕通知 -----------");
logger.info("环绕通知的目标方法名:{}",proceedingJoinPoint.getSignature().getName());
Signature signature1 = proceedingJoinPoint.getSignature();
MethodSignature methodSignature = (MethodSignature)signature1;
//方法对象
Method targetMethod = methodSignature.getMethod();
//反射得到自定义注解的方法对象
DelayedDoubleDeletion annotation =
targetMethod.getAnnotation(DelayedDoubleDeletion.class);
Object[] values = proceedingJoinPoint.getArgs();
String[] names = ((CodeSignature) proceedingJoinPoint.getSignature()).getParameterNames();
//获取自定义注解的方法对象的参数即name
boolean isTure = annotation.open();
if(isTure){
logger.info("获得参数值---->{}",values[0].toString());
//先删除缓存
if(!StringUtils.isEmpty(redisCache.getCacheObject(values[0].toString()))){
redisCache.deleteObject(values[0].toString());
}
}
//执行加入双删注解的改动数据库的业务 即controller中的方法业务
Object proceed = null;
try {
proceed = proceedingJoinPoint.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
//开一个线程 延迟1秒(此处是1秒举例,可以改成自己的业务)
// 在线程中延迟删除 同时将业务代码的结果返回 这样不影响业务代码的执行
new Thread(() -> {
try {
Thread.sleep(1000);
if(!StringUtils.isEmpty(redisCache.getCacheObject(values[0].toString()))){
redisCache.deleteObject(values[0].toString());
}
logger.info("-----------1秒钟后,在线程中延迟删除完毕 -----------");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//返回业务代码的值
return proceed;
}
}
在这里我缓存的是id,所以我延时双删,删除的是缓存的id,通常key为id,value为商品信息,这里不多改变,理解意思即可
@RequestMapping("/updateGood")
@DelayedDoubleDeletion()
public ResponseResult updateGood(Integer id,String goodName){
goodService.updateGood();
return new ResponseResult(200,"更新成功");
}
这里redis缓存相关的问题就结束了!!!
下一篇 准备写一些关于实际项目中线程池的用法实战!!!