手把手教你构建源码级组件——Java指定共享线程数目的共享锁

2023-05-16

文章目录

  • 构造同步组件的步骤
    • 1. 定义内部类Syn
    • 2. 继承同步器,重写指定方法
    • 3. 调用同步器方法
  • 指定共享线程数目的共享锁实现
    • 代码实现
    • 测试Demo
    • 运行结果
    • 结果分析

构造同步组件的步骤

之前的学习中我们学习了AQS的原理,其中有许多构建锁与同步器的相关概念我们需要了解到:

  • 首先同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义;
  • 锁是面向使用者的,提供锁交互的实现;
  • 同步器是面向锁的实现者,简化了锁的实现方式,屏蔽了同步状态管理、线程排队、等待/唤醒等底层操作。

从代码层面,同步器是基于模板模式实现的,可以通过AQS可重写的方法进行子类具体功能实现:

例如下面是AQS中tryAcquire模板方法的源码(如果子类没实现会怕抛出异常)

/**
* 模板方法:
*  protected关键字
*  没有任何实现
* @param arg
* @return
*/
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

那么我们在构建同步组件的时候也就是需要实现以下几步:

1. 定义内部类Syn

随后将同步器组合在自定义同步组件的实现中,即定义内部类Syn继承AQS

public class XXX implements Lock {
    public class Sync extends AbstractQueuedSynchronizer{
	}
}

2. 继承同步器,重写指定方法

之后在Syn中重写AQS方法,根据同步器需求如下挑选实现不同方法

  • tryAcquire(int arg):独占式获取同步状态;
  • tryRelease(int arg):独占式释放同步状态;
  • tryAcquireShared(int arg):共享式获取同步状态,返回大于0的值表示获取成功,否则失败
  • tryReleaseShared(int arg):共享式释放锁
  • isHeldExclusively():当前线程是否在独占模式下被线程占用,一般该方法表示是否被当前线程占用

例如不可重入同步器:

public class XXX implements Lock {
	public class Sync extends AbstractQueuedSynchronizer{
	
        @Override
        protected boolean tryAcquire(int arg) {
            final Thread current = Thread.currentThread();
            if (compareAndSetState(0, 1)) {
                // 获取成功之后,当前线程是该锁的持有者,不需要再可重入数
                setExclusiveOwnerThread(current);
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
              return getState() == 1;
        }
        // 返回Condition,每个Condition都包含了一个队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }
}

3. 调用同步器方法

最后调用同步器提供的模板方法,即同步组件类实现Lock方法之后,在lock/unlock方法中调用内部类Syn的方法acquire(int arg)等方法

public class XXX implements Lock {
    
   ........
   	private final Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void unlock() {
        sync.release(1);
    }
    ........

}

具体请看下面的实验部分

具体请看下面的实验部分

指定共享线程数目的共享锁实现

代码实现

package com.yyl.threadtest.utils;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class MyShareLock implements Lock {

    // 可以看到共享等待队列中的线程
    public Collection<Thread> getSharedQueuedThreads(){
        return syn.getSharedQueuedThreads();
    }
    private final Syn syn = new Syn(3);

    private static final class Syn extends AbstractQueuedSynchronizer {
        int newShareCount=0;
        Syn(int shareCount){
            if (shareCount <= 0) {
                throw new IllegalArgumentException("share count must large than zero");
            }
            // 设置初始共享同步状态
            setState(shareCount);
        }

        /**
         * 共享锁指定数目
         * @param reduceShareCount
         * @return
         */
        @Override
        protected int tryAcquireShared(int reduceShareCount) {

            for (;;){
                int currentShareCount = getState();
                newShareCount = currentShareCount- reduceShareCount;
                if (newShareCount < 0 ||
                        compareAndSetState(currentShareCount,newShareCount)) {
                    // newShareCount大于等于0才说明获取锁成功
                    if (newShareCount >= 0) {
//                        System.out.println(Thread.currentThread().getName()+" hold lock, current share count is "+newShareCount+", "+new Date());
                    }
                    // newShareCount小于0表示获取失败所以需要返回
                    // compareAndSetState(currentShareCount,newShareCount)为true自然表示成功需要返回
                    return newShareCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int returnShareCount) {
            for (;;){
                int currentShareCount = getState();
                newShareCount = currentShareCount + returnShareCount;
                if (compareAndSetState(currentShareCount,newShareCount)) {
                // System.out.println(Thread.currentThread().getName() +" release lock, current share count is "+newShareCount+", "+new Date());
                    return true;
                }
            }
        }
        protected int getShareCount(){
            return getState();
        }
    }

    /**
     * 调用内部同步器Syn的acquireShare方法
     */
    @Override
    public void lock() {
        syn.acquireShared(1);
    }
    /**
     * 调用内部同步器Syn的releaseShared方法
     */
    @Override
    public void unlock() {
        syn.releaseShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        if (Thread.interrupted()) {
            throw new IllegalStateException();
        }
        syn.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

测试Demo

package com.yyl.threadtest.utils;

import java.util.Date;

public class ShareLockTest {
    public static void main(String[] args) {
        final MyShareLock lock = new MyShareLock();
        class Worker extends Thread {
            @Override
            public void run() {
                // 一直不停在获取锁
                while (true) {
                    lock.lock();
                    try {
                        System.out.println(Thread.currentThread().getName() +" hold lock, "+new Date());
                        // System.out.println(lock.getSharedQueuedThreads());
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                        System.out.println(Thread.currentThread().getName() +" release lock, "+new Date());
                    }
                }
            }

        }
        for (int i = 0; i < 10; i++) {
            Worker worker = new Worker();
            // 以守护进程运行,VM退出不影响运行,这里只是为了一个打印效果,去掉注释一直打印
            worker.setDaemon(true);
            worker.start();
        }
        // 每隔一秒换行
        for (int j = 0; j < 10; j++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println();
        }
    }
}

运行结果

Thread-0 hold lock, Thu Feb 23 12:55:28 CST 2023
Thread-1 hold lock, Thu Feb 23 12:55:28 CST 2023
Thread-2 hold lock, Thu Feb 23 12:55:28 CST 2023

Thread-0 release lock, Thu Feb 23 12:55:29 CST 2023
Thread-4 hold lock, Thu Feb 23 12:55:29 CST 2023
Thread-5 hold lock, Thu Feb 23 12:55:29 CST 2023
Thread-1 release lock, Thu Feb 23 12:55:29 CST 2023
Thread-3 hold lock, Thu Feb 23 12:55:29 CST 2023
Thread-2 release lock, Thu Feb 23 12:55:29 CST 2023

Thread-5 release lock, Thu Feb 23 12:55:31 CST 2023
Thread-7 hold lock, Thu Feb 23 12:55:31 CST 2023
Thread-3 release lock, Thu Feb 23 12:55:31 CST 2023
Thread-9 hold lock, Thu Feb 23 12:55:31 CST 2023
Thread-4 release lock, Thu Feb 23 12:55:31 CST 2023
Thread-6 hold lock, Thu Feb 23 12:55:31 CST 2023

Thread-8 hold lock, Thu Feb 23 12:55:32 CST 2023
Thread-6 release lock, Thu Feb 23 12:55:32 CST 2023
Thread-9 release lock, Thu Feb 23 12:55:32 CST 2023
Thread-7 release lock, Thu Feb 23 12:55:32 CST 2023
Thread-1 hold lock, Thu Feb 23 12:55:32 CST 2023
Thread-0 hold lock, Thu Feb 23 12:55:32 CST 2023

Thread-8 release lock, Thu Feb 23 12:55:33 CST 2023
Thread-3 hold lock, Thu Feb 23 12:55:33 CST 2023
Thread-2 hold lock, Thu Feb 23 12:55:33 CST 2023
Thread-1 release lock, Thu Feb 23 12:55:33 CST 2023
Thread-5 hold lock, Thu Feb 23 12:55:33 CST 2023
Thread-0 release lock, Thu Feb 23 12:55:33 CST 2023

Thread-5 release lock, Thu Feb 23 12:55:34 CST 2023
Thread-6 hold lock, Thu Feb 23 12:55:34 CST 2023
Thread-9 hold lock, Thu Feb 23 12:55:34 CST 2023
Thread-4 hold lock, Thu Feb 23 12:55:34 CST 2023
Thread-3 release lock, Thu Feb 23 12:55:34 CST 2023
Thread-2 release lock, Thu Feb 23 12:55:34 CST 2023

Thread-4 release lock, Thu Feb 23 12:55:35 CST 2023
Thread-7 hold lock, Thu Feb 23 12:55:35 CST 2023
Thread-8 hold lock, Thu Feb 23 12:55:35 CST 2023
Thread-9 release lock, Thu Feb 23 12:55:35 CST 2023
Thread-1 hold lock, Thu Feb 23 12:55:35 CST 2023
Thread-6 release lock, Thu Feb 23 12:55:35 CST 2023

Thread-1 release lock, Thu Feb 23 12:55:36 CST 2023
Thread-8 release lock, Thu Feb 23 12:55:36 CST 2023
Thread-3 hold lock, Thu Feb 23 12:55:36 CST 2023
Thread-7 release lock, Thu Feb 23 12:55:36 CST 2023
Thread-5 hold lock, Thu Feb 23 12:55:36 CST 2023
Thread-0 hold lock, Thu Feb 23 12:55:36 CST 2023

Thread-4 hold lock, Thu Feb 23 12:55:37 CST 2023
Thread-5 release lock, Thu Feb 23 12:55:37 CST 2023
Thread-3 release lock, Thu Feb 23 12:55:37 CST 2023
Thread-0 release lock, Thu Feb 23 12:55:37 CST 2023
Thread-2 hold lock, Thu Feb 23 12:55:37 CST 2023
Thread-9 hold lock, Thu Feb 23 12:55:37 CST 2023

Thread-6 hold lock, Thu Feb 23 12:55:38 CST 2023
Thread-2 release lock, Thu Feb 23 12:55:38 CST 2023
Thread-4 release lock, Thu Feb 23 12:55:38 CST 2023
Thread-1 hold lock, Thu Feb 23 12:55:38 CST 2023
Thread-8 hold lock, Thu Feb 23 12:55:38 CST 2023
Thread-9 release lock, Thu Feb 23 12:55:38 CST 2023


Process finished with exit code 0

结果分析

该指定共享线程数量N的共享锁的最终目的就是多个线程可以持有锁(同步状态),达到共享线程数量N(代码中默认为2)时,其它线程将进入Queue等待获取同步结果,同一时刻只能最多有N个线程持有锁

同样地,我们分析开头运行结果:

Thread-0 hold lock, Thu Feb 23 12:55:28 CST 2023
Thread-1 hold lock, Thu Feb 23 12:55:28 CST 2023
Thread-2 hold lock, Thu Feb 23 12:55:28 CST 2023

Thread-0 release lock, Thu Feb 23 12:55:29 CST 2023
Thread-4 hold lock, Thu Feb 23 12:55:29 CST 2023
Thread-5 hold lock, Thu Feb 23 12:55:29 CST 2023
Thread-1 release lock, Thu Feb 23 12:55:29 CST 2023
Thread-3 hold lock, Thu Feb 23 12:55:29 CST 2023
Thread-2 release lock, Thu Feb 23 12:55:29 CST 2023

10个线程不停竞争锁,一开始Thread-0、Thread-1、Thread-2在12:55:28时刻同时获取到了锁,此时已经达到共享数量的最大值,即N,之后持有锁1秒,Thread-0、Thread-1、Thread-2在12:55:29时刻立马释放锁,同时Thread-3、Thread-4、Thread-5立马退出等待队列立马竞争持有锁。

从结果来看,完全是符合ShareLock共享锁功能的:同一时刻最多允许N个线程持有锁,其它线程等待持有线程释放锁

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

手把手教你构建源码级组件——Java指定共享线程数目的共享锁 的相关文章

随机推荐

  • PINN深度学习求解微分方程系列三:求解burger方程逆问题

    下面我将介绍内嵌物理知识神经网络 xff08 PINN xff09 求解微分方程 首先介绍PINN基本方法 xff0c 并基于Pytorch的PINN求解框架实现求解含时间项的一维burger方程逆问题 内嵌物理知识神经网络 xff08 P
  • 深度学习求解微分方程系列五:PINN求解Navier-Stokes方程正逆问题

    下面我将介绍内嵌物理知识神经网络 xff08 PINN xff09 求解微分方程 首先介绍PINN基本方法 xff0c 并基于Pytorch的PINN求解框架实现求解含时间项的二维Navier Stokes方程 内嵌物理知识神经网络 xff
  • Keras解读:使用Model()类构建网络模型的底层原理

    目录 一 前言 xff1a 二 topology py脚本简述 三 继承了Layer类的子类一般要实现 xff08 重写 xff09 以下methods xff1a 四 Node layer tensor在网络构建过程中的关系 建议结合源码
  • 【Paraview教程】第一章安装与基础介绍

    1 Paraview介绍 1 1基本介绍 ParaView是一个开源的 xff0c 跨平台的数据处理和可视化程序 ParaView用户可以迅速的建立起可视化环境利用定量或者是定性的手段去分析数据 利用它的批量处理能力可以在三维空间内在工具栏
  • 一种基于物理信息极限学习机的PDE求解方法

    作者 PINN山里娃 xff0c 作者主页 研究方向 物理信息驱动深度学习 不确定性 人工智能 偏微分方程 极限学习机 该作者聚焦深度学习模型与物理信息结合前沿研究 xff0c 提供了一系列AI for science研究进展报告及代码实现
  • js文件分片上传,断点续传

    前言 文件上传是一个老生常谈的话题了 xff0c 在文件相对比较小的情况下 xff0c 可以直接把文件转化为字节流上传到服务器 xff0c 但在文件比较大的情况下 xff0c 用普通的方式进行上传 xff0c 这可不是一个好的办法 xff0
  • JavaScript 大文件分片上传处理

    一 功能性需求与非功能性需求 要求操作便利 xff0c 一次选择多个文件和文件夹进行上传 xff1b 支持PC端全平台操作系统 xff0c Windows Linux Mac 支持文件和文件夹的批量下载 xff0c 断点续传 刷新页面后继续
  • JS实现浏览器端大文件分片上传

    IE的自带下载功能中没有断点续传功能 xff0c 要实现断点续传功能 xff0c 需要用到HTTP协议中鲜为人知的几个响应头和请求头 一 两个必要响应头Accept Ranges ETag 客户端每次提交下载请求时 xff0c 服务端都要添
  • Lua识别Jwt令牌业务

    文章目录 业务场景业务实现lua resty jwt安装令牌识别令牌测试 业务场景 如果想使用Lua识别用户令牌 xff0c 我们需要引入lua resty jwt模块 xff0c 是用于 ngx lua 和 LuaJIT 的 Lua 实现
  • WebSocket从入门到实战

    文章目录 WebSocketWebSocket 介绍WebSocket APIWebSocket 对象WebSocket属性WebSocket事件 xff1a WebSocket方法 xff1a WebSocket 实例客户端服务端代码链接
  • Error:java: 无效的源发行版: 13

    文章目录 问题原因解决 之前是英文的报错 xff0c 也是一样 xff0c 再发一遍 问题 原因 出现这个错误的原因主要是因为 JDK 版本问题 xff0c 有两个原因 xff0c 一个是编译器版本不匹配 xff0c 一个是当前项目 JDK
  • springboot项目中的bootstrap.yml配置不生效(没有自动提示)

    文章目录 问题原因及解决原因1 xff1a 原因2 xff1a 问题 xff08 1 xff09 新创建一个 springboot项目 xff0c 添加了 bootstrap yml 文件 xff0c 发现文件并没有如预期变成绿色叶子 xf
  • 人工智能——分类器性能指标之ROC曲线、AUC值

    文章目录 ROC曲线ROC曲线概念ROC曲线坐标系ROC曲线重要概念案例 xff1a 画ROC曲线 AUC值为什么使用Roc和Auc评价分类器 二分类模型预测的结果是否足够好 xff0c ROC和AUC是重要指标 ROC曲线 ROC曲线概念
  • Ubuntu18.04美化桌面(主题、图标)

    首先 xff0c 安装主题配置工具Tweaks xff0c 命令如下 xff1a sudo apt get install gnome tweak tool 之后在软件菜单中找到Tweaks图标 xff0c 打开该软件 xff0c 界面如下
  • Redis 事务支持回滚吗?

    文章目录 Redis 事务支持回滚吗 xff1f 官方解释DISCARD 命令取消事务 Redis 事务支持回滚吗 xff1f 首选 xff0c Redis 事务不支持回滚 MySQL 在执行事务时 xff0c 会提供回滚机制 xff0c
  • Docker 安装 mysql 8.0.29

    文章目录 安装拉取镜像启动容器 配置 xff08 可跳过 xff0c 建议弄上 xff09 使用启动控制台登录navicat远程登录 删除 安装 拉取镜像 span class token function docker span pull
  • SpringBoot 启动打印 Banner:佛祖保佑,永无BUG!

    最近新学了一招 打印完了 xff0c 一点bug不出 xff0c 果然 xff0c 程序的尽头是玄学 教程 在Resources目录下新建 banner txt 输入内容如下 span class token comment ooOoo s
  • 一文就懂AQS!

    文章目录 AQS介绍AQS概念AQS模式分类AQS核心思想 AQS源码结构CLH同步队列state同步状态独占式同步状态获取与释放流程图总结 xff1a 共享式同步状态获取与释放 看了很多帖子 xff0c 原理说啥的都有 xff0c 算了还
  • 手把手教你构建源码级组件——Java互斥不可重入锁

    文章目录 构造同步组件的步骤1 定义内部类Syn2 继承同步器 xff0c 重写指定方法3 调用同步器方法 互斥不可重入锁实现代码实现测试Demo运行结果结果分析 构造同步组件的步骤 之前的学习中我们学习了AQS的原理 xff0c 其中有许
  • 手把手教你构建源码级组件——Java指定共享线程数目的共享锁

    文章目录 构造同步组件的步骤1 定义内部类Syn2 继承同步器 xff0c 重写指定方法3 调用同步器方法 指定共享线程数目的共享锁实现代码实现测试Demo运行结果结果分析 构造同步组件的步骤 之前的学习中我们学习了AQS的原理 xff0c