关于Semaphore信号量的源码解读

2023-11-19

Semaphore的简单使用

利用Semaphore可以实现对线程数量的控制。比如如下的代码

class SemaphoreTest{
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);

        for (int i = 1; i <= 9; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();
                    TimeUnit.SECONDS.sleep(2);

                    System.out.println("线程"+Thread.currentThread().getName()+"抢到了车位。");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }

            },""+i).start();

        }
    }
}

首先看一下Semaphore的构造函数,这里可以看到Semaphore的构造函数调用了另外一个构造函数。

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

继续看一下这个被调用的构造函数,可以看到这个是Semaphore里面的一个静态内部类NonfairSync。从名字可以看出应该是一个非公平的东西,继承了Sync。

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
		#这里就是被调用的构造函数
        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

我们继续看一下这个Sync是什么,可以看到Sync继承了AbstractQueuedSynchronizer ,也就是我们常说的AQS,抽象队列同步器。而上面NonfairSync 调用了super的构造函数,也就是Sync类的构造函数,下面的setState(permits);

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
		#这里
        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

继续点进入setState(permits);的方法,这里调用的就是AQS的setState的方法,所以说Semaphore底层是基于AQS(抽象队列同步器实现的)。

protected final void setState(int newState) {
        state = newState;
    }

我们继续研究semaphore.acquire();方法具体是怎么实现的

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

上面方法调用了AQS的方法

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
            #这里返回false
        if (Thread.interrupted())
            throw new InterruptedException();
            #直接执行了这里的方法
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

这里要注意了,我们直接点进入tryAcquireShared(arg)方法是下面这样子的,这个是AQS的默认实现,千万不要以为我们的代码直接执行了这个函数。

protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

实际上执行的是下面这个函数,这个是Semaphore的静态内部类

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }
		//注意了,执行的是这个函数
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

这个函数跳转到了Semaphore的静态内部类Sync里面执行下面这个函数

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
            	//这个地方会获取state的值
                int available = getState();
                int remaining = available - acquires;
                //这个时候这个remaining如果是一个负数,直接返回,不是负数说明可以直接获取到锁,然后CAS直接更新state的状态
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

返回之后退出到刚才上面的这个方法,这部分可以得出,这个方法的流程

  • 获取同步状态值
  • 每个线程进来就减去请求的值,此处请求的值是1.然后用可用同步状态值减去请求的值得到同步状态剩余的值。
  • 如果请求的值大于可用的值或者CAS操作把可用值改为剩余可用的值那么就返回剩下可用的值。
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            //在这个判断如果小于0则直接对请求的线程进行一个入队的操作,我们主要分析 一下这个函数
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

这就是semphore处理锁的核心逻辑,我们在看一下sync调用的acquireSharedInterruptibly的方法。此方法主要的目的就是处理那些没有获取到锁的线程在队列中的一个处理。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //addWaiter是把当前节点设置为共享模式然后添加到AQS维护的双向队列的尾部。
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                /*通过for循环不断的进行自旋操作,去判断当前节点的前继节点是不是头节点,如果前继节点
                是头节点那么那么就去挣抢锁,如果争抢锁成功那么就把当前节点设置为头节点,同时唤醒队
                列中的所有节点,一块在去争夺锁。*/
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                /*如果线程中断或者阻塞那么就抛出异常。最后如果方法中抛出了异常那么就把当前节点先设
                置为取消状态然后在清除该节点。*/
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

最后看一下主函数里面的semaphore.release();方法。

public void release() {
        sync.releaseShared(1);
    }
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared方法也是AQS模版方法中的一个,它会调用Semaphore重写的方法,我们看一下tryReleaseShared释放方法在Semaphore中是怎么实现的

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

下面这个代码,就是唤醒阻塞队列中的线程,一旦唤醒线程,在同步队列中排队的队首(不是头结点)线程就会获取许可证,获取成功后,就执行相应的代码。

private void doReleaseShared() {
    for (;;) {  // 死循环
        Node h = head;  
        if (h != null && h != tail) {  // 至少存在两个节点
            int ws = h.waitStatus;  // 获取状态值
            if (ws == Node.SIGNAL) {  // h若是SIGNAL状态,那么其后继节点即为要唤醒的节点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);  // 唤醒头节点的后继节点
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)  // 头结点未变,直接退出,头结点被改变的唯一条件是有其他的线程修改了头结点
            break;
    }
}

最后介绍一下公平锁和非公平锁

公平锁,就是只有队首的节点线程,可以获取许可证,有其他的线程在获取许可证时,会被加入到队尾,等待获取锁。
非公平锁,因为在对列中排队的线程,只有头结点的后继节点有资格可以获取锁,而在获取许可证时,有其他的线程(不是同步对列中的线程)进入,尝试去获取许可证,这两个线程都有可能获取到许可证,这就是非公平锁的特点。

从上面的分析中可以看到,多态的特性,很多方法在执行的时候并不是我们直接点的进去的,而是根据实际的类型觉得调用哪些方法,所以分析的时候千万不要只是点进去某个方法,不然运行的时候可能不是这个方法。

参考文章

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

关于Semaphore信号量的源码解读 的相关文章

  • 在 Kotlin 中声明静态属性?

    My Java code public class Common public static ModelPengguna currentModelPengguna public class Common companion object v
  • 向Java类库添加函数

    我使用的 Java 类库在很多方面都不完整 有很多类我认为应该内置其他成员函数 但是 我不确定添加这些成员函数的最佳实践 让我们调用不足的基类A class A public A long arbitrary arguments publi
  • 多线程 - 比单线程慢

    当我使用多个线程而不是单线程运行程序时 它会变慢 不是应该更快吗 该程序应该遍历从起始目录开始的所有目录 并查找并打印所有名为 X 的文件 代码如下 while done pthread mutex lock lock if list is
  • 从 Golang 调用 C 函数

    我想在 Golang 中编写控制器逻辑并处理 json 和数据库 同时在 C 中使用我的数学处理模型 在我看来 调用 C 函数的开销必须尽可能低 就像设置寄存器 rcx rdx rsi rdi 一样 执行一些操作fastcall 并获取 r
  • 根据 Active Directory 策略检查密码[重复]

    这个问题在这里已经有答案了 我有一个允许用户更改其 AD 密码的前端 有没有办法获取特定用户及其属性 长度 复杂性 的密码策略 例如细粒度 有没有办法根据此特定策略检查字符串 xyz121 编辑 我不想检查活动目录中存储的当前密码 我想检查
  • Web 文本编辑器中的 RTF 格式

    网络上是否有支持 RTF 格式文档输入的文本编辑器 我知道这对 webdev 来说有点奇怪 但我需要从数据库中读取 RTF 文档 并在基于 Web 的文本编辑器中对其进行编辑 然后将其存储回 RTF 中 在我在转换工具上投入太多资金之前 我
  • 确定相关词的编程方式?

    使用网络服务或软件库 我希望能够识别与词根相关的单词 例如 座位 和 安全带 共享词根 座位 但 西雅图 不会被视为匹配 简单的字符串比较对于这类事情似乎是不可行的 除了定义我自己的字典之外 是否有任何库或 Web 服务不仅可以返回单词定义
  • 如何更改 JAX-WS Web 服务的地址位置

    我们目前已经公开了具有以下 URL 的 JAX RPC Web 服务 http xx xx xx xx myservice MYGatewaySoapHttpPort wsdl http xx xx xx xx myservice MYGa
  • List 或其他类型上的 string.Join

    我想将整数数组或列表转换为逗号分隔的字符串 如下所示 string myFunction List
  • ebean 映射到 BYTEA 的数据类型是什么?

    我有一个游戏 2 0 2 需要在数据库中存储一些文件的应用程序 我们使用 Ebean 作为 ORM 我相信我的数据库中需要一个 BYTEA 列来存储该文件 但我不确定在我的模型中使用什么数据类型 我应该使用某种Blob 或者只是一个byte
  • 删除数组时出现访问冲突异常

    删除分配的内存时 出现 访问冲突读取位置 异常 如下所示 我有一个针对 Visual Studio 2010 工具集 v100 C 编译器编译的本机 dll 我有一个针对它的托管 dll 包装器 它是针对工具集 v90 编译的 因为我想以
  • XCode std::thread C++

    对于学校的一个小项目 我需要创建一个简单的客户端 服务器结构 它将在路由器上运行 使用 openWRT 并且我试图在这个应用程序中使用线程做一些事情 我的 C 技能非常有限 所以我在internet https stackoverflow
  • 按 Enter 继续

    这不起作用 string temp cout lt lt Press Enter to Continue cin gt gt temp cout lt lt Press Enter to Continue cin ignore 或更好 in
  • 假布尔值=真?

    我在一本书中找到了这段代码 并在 Netbeans 中执行了它 boolean b false if b true System out println true else System out println false 我只是不明白为什
  • Axis2 的 wsdl2java 在 RPC/Encoded 样式 Web 服务上失败

    Axis2 有替代方案吗 或者让它工作的方式 例如不同的数据绑定 Retrieving document at Exception in thread main org apache axis2 wsdl codegen CodeGener
  • 包含从代码隐藏 (ASP.NET C#) 到 ASPX 中的图像概述的图像列表 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • Web API 2.0 使用 pascalcase 模型接收驼峰式命名的 JSON 数据

    我正在尝试对我的 Web API 进行 PUT 调用 我在 WebApiConfig cs 中设置了以下内容 以处理以驼峰形式将数据发送回我的 Web 项目 config Formatters JsonFormatter Serialize
  • 在两个点之间创建一条曲线,每个点都具有标准化向量

    因此 我需要一种写入方法来在两点之间创建一条曲线 每个点都有一个指向任意方向的归一化向量 我一直在尝试设计这样一种方法 但一直无法理解数学 在这里 由于一张图片胜过一千个文字 这就是我所需要的 在图中 矢量垂直于红线 我相信向量需要进行相同
  • 如何获取运行或段落的高度

    我找到了Run or Paragraph in FlowDocument现在我需要知道HEIGHT of it i e while navigator CompareTo flowDocViewer Document ContentEnd
  • MyBatis 枚举的使用

    我知道以前有人问过这个问题 但我无法根据迄今为止找到的信息实施解决方案 所以也许有人可以向我解释一下 我有一个表 状态 它有两列 id 和 name id是PK 我不想使用 POJO Status 而是使用枚举 我创建了这样一个枚举 如下所

随机推荐