使用ReentrantLock+Condition模拟PV操作,实现多线程竞争数据库连接池资源、资源耗尽后阻塞等待、归还资源后唤醒阻塞线程的场景(代码中为10个线程竞争5个数据库连接资源)
ConnectionPool.class(连接池)
package demo.lock.db;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
/**
* 模拟数据库连接池
*/
public class ConnectionPool {
/** 连接池最大连接数量 */
private static final int POOL_SIZE = 5;
public static List<Connection> connections;
/** 信号量 */
public static volatile int signal = POOL_SIZE;
/** 锁 */
public static ReentrantLock lock = new ReentrantLock();
/** 等待条件 */
public static Condition condition = lock.newCondition();
/**
* <p>
* 初始化数据库连接池
* </p>
*/
public static void init() {
connections = new ArrayList<>(POOL_SIZE);
for (int i = 0; i < POOL_SIZE; i++) {
connections.add(new Connection("connection-" + i));
}
}
/**
* <p>
* 使用ReentrantLock将获取数据连接逻辑包装为一个P操作原语
* <p>
* 抢占到锁后,如果检测到信号量不大于0,将自身加入Condition对象的阻塞队列中,并释放锁
* </p>
* <p>
* 继续持有锁或被唤醒后重新持有锁时,尝试自旋来获取数据库连接
* </p>
*
* @return 获取到的数据库连接对象
* @throws Exception
*/
public static Connection getConnection() throws Exception {
// 进行P操作,首先获取锁对象
lock.lock();
try {
// 此时抢占到锁对象,但由于信号量小于等于0,也就是没有资源可用,所以阻塞自身,同时释放锁
if (signal <= 0) {
condition.await();
}
// 持有或重新持有锁对象时,进行自旋操作尝试获取数据库连接
while (true) {
if (signal > 0) {
// 从连接池中获取一个空闲连接
List<Connection> freeConnections = connections.stream()
.filter(x -> x.state.equals(ConnectionState.FREE)).collect(Collectors.toList());
Connection currConnection =
CollectionUtils.isEmpty(freeConnections) ? null : freeConnections.get(0);
if (null == currConnection) {
return null;
}
currConnection.state = ConnectionState.BUSY;
// 获取连接成功,信号量自减1
signal--;
System.out.println("当前线程:" + Thread.currentThread().getName() + " 抢到数据库连接:"
+ currConnection.getName() + " 当前连接池中空闲连接数:" + signal);
return currConnection;
}
}
} finally {
// 释放锁对象
lock.unlock();
}
}
/**
* <p>
* 使用ReentrantLock将归还数据连接逻辑包装为一个V原语
* </p>
* <p>
* 在归还连接成功,并将信号量自增之后,唤醒Condition的等待队列中的一个线程
* </p>
*
* @param connection 数据库连接对象
*/
public static void repayConnection(Connection connection) {
if (null == connection) {
return;
}
// 进行V操作,首先获取锁对象
lock.lock();
try {
connections.forEach(x -> x.state = connection.equals(x) ? ConnectionState.FREE : x.state);
// 信号量自增1,表示归还1个资源
signal++;
System.out.println("当前线程:" + Thread.currentThread().getName() + " 归还数据库连接:" + connection.getName()
+ " 当前连接池中空闲连接数:" + signal);
// 唤醒该condition的等待队列中的一个线程
condition.signal();
} finally {
// 释放锁资源
lock.unlock();
}
}
}
Connection.class(连接对象)
package demo.lock.db;
/**
* 数据库连接对象
*/
public class Connection {
/**
* 连接名
*/
String name;
/**
* 当前状态
*/
ConnectionState state = ConnectionState.FREE;
public Connection(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "当前连接:" + ",状态:" + state.state;
}
}
ConnectionState.class(连接状态枚举)
package demo.lock.db;
/**
* 数据库连接当前状态枚举
*/
public enum ConnectionState {
FREE(0, "空闲"), BUSY(1, "忙碌");
/**
* 状态码 0-空闲 1-忙碌
*/
int code;
/**
* 状态名
*/
String state;
ConnectionState(int code, String state) {
this.code = code;
this.state = state;
}
}
Application.class(程序入口)
package demo.lock;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import demo.lock.db.Connection;
import demo.lock.db.ConnectionPool;
public class Application {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(10, 10, 30, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
public void execute() {
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
Connection connection = ConnectionPool.getConnection();
// 随机占用0-1秒时间后归还连接
Thread.sleep((long)(Math.random() * 1000));
ConnectionPool.repayConnection(connection);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
public void shutdown() {
if (null == executor) {
return;
}
executor.shutdown();
}
public static void main(String[] args) throws InterruptedException {
ConnectionPool.init();
Application application = new Application();
application.execute();
Thread.sleep(10000L);
application.shutdown();
}
}
运行结果