1.分布式并发问题
提交订单:商品超卖问题
2.如何解决分布式并发问题呢 ?
使⽤
redis
实现分布式锁
3.使⽤Redis实现分布式锁-代码实现
@Transactional
public Map<String,String> addOrder(String cids,Orders order) throws
SQLException {
logger.info("add order begin...");
Map<String, String> map = null;
//1.校验库存:根据cids查询当前订单中关联的购物⻋记录详情(包括库存)
String[] arr = cids.split(",");
List<Integer> cidsList = new ArrayList<>();
for (int i = 0; i < arr.length; i++) {
cidsList.add(Integer.parseInt(arr[i]));
}
//根据⽤户在购物⻋列表中选择的购物⻋记录的id 查询到对应的购物⻋记录
List<ShoppingCartVO> list =
shoppingCartMapper.selectShopcartByCids(cidsList);
//从购物⻋信息中获取到要购买的 skuId(商品ID) 以skuId为key写到redis中: 1
2 3
boolean isLock = true;
String[] skuIds = new String[list.size()]; //记录已经锁定的商品的ID
for (int i = 0; i <list.size() ; i++) {
String skuId = list.get(i).getSkuId(); //订单中可能包含多个商品,
每个skuId表示⼀个商品
Boolean ifAbsent =
stringRedisTemplate.boundValueOps(skuId).setIfAbsent("fmmall");
if(ifAbsent){
skuIds[i] = skuId;
}
isLock = isLock && ifAbsent;
}
//如果isLock为true,表示“加锁”成功
if(isLock){
try{
//1.⽐较库存: 当第⼀次查询购物⻋记录之后,在加锁成功之前,可能被其他
的并发线程修改库存
List<ShoppingCartVO> list =
shoppingCartMapper.selectShopcartByCids(cidsList);
boolean f = true;
String untitled = "";
for (ShoppingCartVO sc : list) {
if (Integer.parseInt(sc.getCartNum()) >
sc.getSkuStock()) {
f = false;
}
untitled = untitled + sc.getProductName() + ",";
}
if (f) {
//2.添加订单
//3.保存快照
//4.修改库存
//5.删除购物⻋
map = new HashMap<>();
logger.info("add order finished...");
map.put("orderId", orderId);
map.put("productNames", untitled);
}
}catch(Exception e){
e.printStackTrance();
}finally{
//释放锁
for (int m = 0; m < skuIds.length ; m++) {
String skuId = skuIds[m];
if(skuId!=null && !"".equals(skuId)){
stringRedisTemplate.delete(skuId);
}
}
}
return map;
}else{
//表示加锁失败,订单添加失败
// 当加锁失败时,有可能对部分商品已经锁定,要释放锁定的部分商品
for (int i = 0; i < skuIds.length ; i++) {
String skuId = skuIds[i];
if(skuId!=null && !"".equals(skuId)){
stringRedisTemplate.delete(skuId);
}
}
return null;
}
}
问题:
1.
如果订单中部分商品加锁成功,但是某⼀个加锁失败,导致最终加锁状态失败
——
需要对
已经锁定的部分商品释放锁
2.
在成功加锁之前,我们根据购物⻋记录的
id
查询了购物⻋记录(包含商品库存),能够直接
使⽤这个库存进⾏库存校验?
——
不能,因为在查询之后加锁之前可能被并发的线程修改了库存;因此在进⾏库存⽐较之
前需要重新查询库存。
3.
当当前线程加锁成功之后,执⾏添加订单的过程中,如果当前线程出现异常导致⽆法释放
锁,这个问题⼜该如何解决呢?
4.解决因线程异常导致⽆法释放锁的问题
解决⽅案:在对商品进⾏加锁时,设置过期时间,这样⼀来及时线程出现故障⽆法释放
锁,在过期时间结束时也会⾃动
“
释放锁
”
问题:当给锁设置了过期时间之后,如果当前线程
t1
因为特殊原因,在锁过期前没有完成业
务执⾏,将会释放锁,同时其他线程(
t2
)就可以成功加锁了,当
t2
加锁成功之后,
t1
执⾏结
束释放锁就会释放
t2
的锁
,
就会导致
t2
在⽆锁状态下执⾏业务。
5.解决因t1过期释放t2锁的问题
-
在释放锁的时候,先获取当前商品在redis中对应的value,如果获取的值与当前value相 同,则释放锁
问题:当释放锁的时候,在查询并判断
“
这个锁是当前线程加的锁
”
成功之后,正要进⾏删除时
锁过期了,并且被其他线程成功加锁,⼀样会导致当前线程删除其他线程的锁。
-
Redis的操作都是原⼦性的
-
要解决如上问题,必须保证查询操作和删除操作的原⼦性——使⽤lua脚本
使⽤
lua
脚本
-
在resources⽬录下创建unlock.lua,编辑脚本:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
@Bean
public DefaultRedisScript<List> defaultRedisScript(){
DefaultRedisScript<List> defaultRedisScript = new
DefaultRedisScript<>();
defaultRedisScript.setResultType(List.class);
defaultRedisScript.setScriptSource(new ResourceScriptSource(new
ClassPathResource("unlock.lua")));
return defaultRedisScript; }
@AutoWired
private DefaultRedisScript defaultRedisScript;
//执⾏lua脚本
List<String> keys = new ArrayList<>();
keys.add(skuId);
List rs = stringRedisTemplate.execute(defaultRedisScript,keys ,
values.get(skuId));
System.out.println(rs.get(0));
6.看⻔狗机制
看⻔⼝线程:⽤于给当前
key
延⻓过期时间,保证业务线程正常执⾏的过程中,锁不会过期。
7.分布式锁框架-Redisson
基于
Redis+
看⻔狗机制的分布式锁框架
7.1 Redisson介绍
Redisson
在基于
NIO
的
Netty
框架上,充分的利⽤了
Redis
键值数据库提供的⼀系列优势,在
Java
实⽤⼯具包中常⽤接⼝的基础上,为使⽤者提供了⼀系列具有分布式特性的常⽤⼯具
类。使得原本作为协调单机多线程并发程序的⼯具包获得了协调分布式多机多线程并发系统
的能⼒,⼤⼤降低了设计和研发⼤规模分布式系统的难度。同时结合各富特⾊的分布式服
务,更进⼀步简化了分布式环境中程序相互之间的协作
7.2 在SpringBoot应⽤中使⽤Redisson
<dependency>
<groupId>
org.redisson
</groupId>
<artifactId>
redisson
</artifactId>
<version>
3.12.0
</version>
</dependency>
redisson
:
addr
:
singleAddr
:
host
:
redis
:
//47.96.11.185
:
6370
password
:
12345678
database
:
0
@Configuration
public class RedissonConfig {
@Value("${redisson.addr.singleAddr.host}")
private String host;
@Value("${redisson.addr.singleAddr.password}")
private String password;
@Value("${redisson.addr.singleAddr.database}")
private int database;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress(host).setPassword(password).se
tDatabase(database);
return Redisson.create(config);
}
}
-
在秒杀业务实现中注⼊RedissonClient对象
7.3 Redisson⼯作原理
“
看⻔狗
”
Redisson
⼯作原理图
7.4 Redisson使⽤扩展
7.4.1 Redisson单机连接
redisson
:
addr
:
singleAddr
:
host
:
redis
:
//47.96.11.185
:
6370
password
:
12345678
database
:
0
@Configuration
public class RedissonConfig {
@Value("${redisson.addr.singleAddr.host}")
private String host;
@Value("${redisson.addr.singleAddr.password}")
private String password;
@Value("${redisson.addr.singleAddr.database}")
private int database;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress(host).setPassword(password).se
tDatabase(database);
return Redisson.create(config);
}
}
7.4.2 Redisson集群连接
redisson
:
addr
:
cluster
:
hosts
:
redis
:
//47.96.11.185
:
6370,...,redis
:
//47.96.11.185
:
6373
password
:
12345678
-
RedissonConfig——RedissonClient对象
@Configuration
public class RedissonConfig {
@Value("${redisson.addr.cluster.hosts}")
private String hosts;
@Value("${redisson.addr.cluster.password}")
private String password;
/**
* 集群模式
* @return
*/
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useClusterServers().addNodeAddress(hosts.split("
[,]"))
.setPassword(password)
.setScanInterval(2000)
.setMasterConnectionPoolSize(10000)
.setSlaveConnectionPoolSize(10000);
return Redisson.create(config);
}
}
7.4.3 Redisson主从连接
redisson
:
addr
:
masterAndSlave
:
masterhost
:
redis
:
//47.96.11.185
:
6370
slavehosts
:
redis
:
//47.96.11.185
:
6371,redis
:
//47.96.11.185
:
6372
password
:
12345678
database
:
0
- RedissonConfig --- RedissonClient
@Configuration
public class RedissonConfig3 {
@Value("${redisson.addr.masterAndSlave.masterhost}")
private String masterhost;
@Value("${redisson.addr.masterAndSlave.slavehosts}")
private String slavehosts;
@Value("${redisson.addr.masterAndSlave.password}")
private String password;
@Value("${redisson.addr.masterAndSlave.database}")
private int database;
/**
* 主从模式
* @return
*/
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useMasterSlaveServers()
.setMasterAddress(masterhost)
.addSlaveAddress(slavehosts.split("[,]"))
.setPassword(password)
.setDatabase(database)
.setMasterConnectionPoolSize(10000)
.setSlaveConnectionPoolSize(10000);
return Redisson.create(config);
}
}
7.5 分布式锁总结
7.5.1 分布式锁特点
1
、互斥性
和我们本地锁⼀样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥。
2
、可重⼊性
同⼀个节点上的同⼀个线程如果获取了锁之后那么也可以再次获取这个锁。
3
、锁超时
和本地锁⼀样⽀持锁超时,加锁成功之后设置超时时间,以防⽌线程故障导致不释放锁,防
⽌死锁。
4
、⾼效,⾼可⽤
加锁和解锁需要⾼效,同时也需要保证⾼可⽤防⽌分布式锁失效,可以增加降级。
redission
是基于
redis
的,
redis
的故障就会导致
redission
锁的故障,因此
redission
⽀持单节
点
redis
、
reids
主从、
reids
集群
5
、⽀持阻塞和⾮阻塞
和
ReentrantLock
⼀样⽀持
lock
和
trylock
以及
tryLock(long timeOut)
。
7.5.2 锁的分类
1
、乐观锁与悲观锁
乐观锁
悲观锁
2
、可重⼊锁和⾮可重⼊锁
可重⼊锁:当在⼀个线程中第⼀次成功获取锁之后,在此线程中就可以再次获取
⾮可重⼊锁
3
、公平锁和⾮公平锁
公平锁:按照线程的先后顺序获取锁
⾮公平锁:多个线程随机获取锁
4
、阻塞锁和⾮阻塞锁
阻塞锁:不断尝试获取锁,直到获取到锁为⽌
⾮阻塞锁:如果获取不到锁就放弃,但可以⽀持在⼀定时间段内的重试
——
在⼀段时间内如果没有获取到锁就放弃
7.5.3 Redission的使⽤
1
、获取锁
——
公平锁和⾮公平锁
//
获取公平锁
RLock lock
=
redissonClient
.
getFairLock
(
skuId
);
//
获取⾮公平锁
RLock lock
=
redissonClient
.
getLock
(
skuId
);
2
、加锁
——
阻塞锁和⾮阻塞锁
//
阻塞锁(如果加锁成功之后,超时时间为
30s
;加锁成功开启看⻔狗,剩
5s
延⻓过期时间)
lock
.
lock
();
//
阻塞锁(如果加锁成功之后,设置⾃定义
20s
的超时时间)
lock
.
lock
(
20
,
TimeUnit
.
SECONDS
);
//
⾮阻塞锁(设置等待时间为
3s
;如果加锁成功默认超时间为
30s
)
boolean
b
=
lock
.
tryLock
(
3
,
TimeUnit
.
SECONDS
);
//
⾮阻塞锁(设置等待时间为
3s
;如果加锁成功设置⾃定义超时间为
20s
)
boolean
b
=
lock
.
tryLock
(
3
,
20
,
TimeUnit
.
SECONDS
);
3
、释放锁
lock
.
unlock
();
4
、应⽤示例
//
公平⾮阻塞锁
RLock lock
=
redissonClient
.
getFairLock
(
skuId
);
boolean
b
=
lock
.
tryLock
(
3
,
20
,
TimeUnit
.
SECONDS
);
8.
分布式锁释放锁代码优化
HashMap map
=
null
;
加锁
try
{
if
(
isLock
){
校验库存
if
(
库存充⾜
){
保存订单
保存快照
修改库存
删除购物⻋
map
=
new
HashMap
();
...
}
}
}
catch
(
Exception e
){
e
.
printStackTrace
();
}
finally
{
释放锁
}
return
map
;
/**
* 保存订单业务
*/
@Transactional
public Map<String, String> addOrder(String cids, Orders order)
throws SQLException {
logger.info("add order begin...");
Map<String, String> map = null;
//1.校验库存:根据cids查询当前订单中关联的购物⻋记录详情(包括库存)
String[] arr = cids.split(",");
List<Integer> cidsList = new ArrayList<>();
for (int i = 0; i < arr.length; i++) {
cidsList.add(Integer.parseInt(arr[i]));
}
//根据⽤户在购物⻋列表中选择的购物⻋记录的id 查询到对应的购物⻋记录
List<ShoppingCartVO> list =
shoppingCartMapper.selectShopcartByCids(cidsList);
//加锁
boolean isLock = true;
String[] skuIds = new String[list.size()];
Map<String, RLock> locks = new HashMap<>(); //⽤于存放当前订单的锁
for (int i = 0; i < list.size(); i++) {
String skuId = list.get(i).getSkuId();
boolean b = false;
try {
RLock lock = redissonClient.getLock(skuId);
b = lock.tryLock(10, 3, TimeUnit.SECONDS);
if (b) {
skuIds[i] = skuId;
locks.put(skuId, lock);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
isLock = isLock & b;
}
//如果isLock为true,表示“加锁”成功
try {
if (isLock){
//1.检验库存
boolean f = true;
String untitled = "";
list =
shoppingCartMapper.selectShopcartByCids(cidsList);
for (ShoppingCartVO sc : list) {
if (Integer.parseInt(sc.getCartNum()) >
sc.getSkuStock()) {
f = false;
}
untitled = untitled + sc.getProductName() + ",";
}
if (f) {
//如果库存充⾜,则进⾏下订单操作
logger.info("product stock is OK...");
//2.保存订单
order.setUntitled(untitled);
order.setCreateTime(new Date());
order.setStatus("1");
//⽣成订单编号
String orderId =
UUID.randomUUID().toString().replace("-", "");
order.setOrderId(orderId);
int i = ordersMapper.insert(order);
//3.⽣成商品快照
for (ShoppingCartVO sc : list) {
int cnum = Integer.parseInt(sc.getCartNum());
String itemId = System.currentTimeMillis() +
"" + (new Random().nextInt(89999) + 10000);
OrderItem orderItem = new OrderItem(itemId,
orderId, sc.getProductId(), sc.getProductName(),
sc.getProductImg(), sc.getSkuId(), sc.getSkuName(), new
BigDecimal(sc.getSellPrice()), cnum, new
BigDecimal(sc.getSellPrice() * cnum), new Date(), new Date(), 0);
orderItemMapper.insert(orderItem);
//增加商品销量
}
//4.扣减库存:根据套餐ID修改套餐库存量
for (ShoppingCartVO sc : list) {
String skuId = sc.getSkuId();
int newStock = sc.getSkuStock() -
Integer.parseInt(sc.getCartNum());
ProductSku productSku = new ProductSku();
productSku.setSkuId(skuId);
productSku.setStock(newStock);
productSkuMapper.updateByPrimaryKeySelective(productSku);
//5.删除购物⻋:当购物⻋中的记录购买成功之后,购物⻋中对应
做删除操作
for (int cid : cidsList) {
shoppingCartMapper.deleteByPrimaryKey(cid);
}
map = new HashMap<>();
logger.info("add order finished...");
map.put("orderId", orderId);
map.put("productNames", untitled);
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
//释放锁
for (int i = 0; i < skuIds.length; i++) {
String skuId = skuIds[i];
if (skuId != null && !"".equals(skuId)) {
locks.get(skuId).unlock();
System.out.println("-----------------------
unlock");
}
}
}
return map; }