LockSupport中的park与unpark原理

2023-05-16

关于LockSupport

concurrent包的基础

Doug Lea 的神作concurrent包是基于AQS (AbstractQueuedSynchronizer)框架,AQS框架借助于两个类:Unsafe(提供CAS操作)和LockSupport(提供park/unpark操作)。因此,LockSupport可谓构建concurrent包的基础之一。理解concurrent包,就从这里开始。

两个重点

  • 操作对象

归根结底,LockSupport调用的Unsafe中的native代码: 


public native void unpark(Thread jthread); 
public native void park(boolean isAbsolute, long time);   

两个函数声明清楚地说明了操作对象:park函数是将当前Thread阻塞,而unpark函数则是将另一个Thread唤醒。

与Object类的wait/notify机制相比,park/unpark有两个优点:1. 以thread为操作对象更符合阻塞线程的直观定义;2. 操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性。

  • 关于许可

在上面的文字中,我使用了阻塞和唤醒,是为了和wait/notify做对比。其实park/unpark的设计原理核心是“许可”。park是等待一个许可。unpark是为某线程提供一个许可。如果某线程A调用park,那么除非另外一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。

有一点比较难理解的,是unpark操作可以再park操作之前。也就是说,先提供许可。当某线程调用park时,已经有许可了,它就消费这个许可,然后可以继续运行。这其实是必须的。考虑最简单的生产者(Producer)消费者(Consumer)模型:Consumer需要消费一个资源,于是调用park操作等待;Producer则生产资源,然后调用unpark给予Consumer使用的许可。非常有可能的一种情况是,Producer先生产,这时候Consumer可能还没有构造好(比如线程还没启动,或者还没切换到该线程)。那么等Consumer准备好要消费时,显然这时候资源已经生产好了,可以直接用,那么park操作当然可以直接运行下去。如果没有这个语义,那将非常难以操作。

  • 其它细节 
    理解了以上两点,我觉得应该把握了关键,其它细节就不是那么关键,也容易理解了,不作分析。

LockSupport是用来创建locks的基本线程阻塞基元,比如AQS中实现线程挂起的方法,就是park,对应唤醒就是unpark。JDK中有使用的如下


LockSupport提供的是一个许可,如果存在许可,线程在调用park的时候,会立马返回,此时许可也会被消费掉,如果没有许可,则会阻塞。调用unpark的时候,如果许可本身不可用,则会使得许可可用

 许可只有一个,不可累加

park源码跟踪

park的声明形式有一下两大块

 一部分多了一个Object参数,作为blocker,另外的则没有。blocker的好处在于,在诊断问题的时候能够知道park的原因

推荐使用带有Object的park操作

park函数作用

park用于挂起当前线程,如果许可可用,会立马返回,并消费掉许可。

  • park(Object): 恢复的条件为 1:线程调用了unpark; 2:其它线程中断了线程;3:发生了不可预料的事情
  • parkNanos(Object blocker, long nanos):恢复的条件为 1:线程调用了unpark; 2:其它线程中断了线程;3:发生了不可预料的事情;4:过期时间到了
  • parkUntil(Object blocker, long deadline):恢复的条件为 1:线程调用了unpark; 2:其它线程中断了线程;3:发生了不可预料的事情;4:指定的deadLine已经到了以park的源码为例
public static void park(Object blocker) {
   //获取当前线程
    Thread t = Thread.currentThread();
   //记录当前线程阻塞的原因,底层就是unsafe.putObject,就是把对象存储起来
    setBlocker(t, blocker);
    //执行park
    unsafe.park(false, 0L);
   //线程恢复后,去掉阻塞原因
    setBlocker(t, null);
}

从源码可以看到真实的实现均在 unsafe

unsafe.park

核心实现如下

JavaThread* thread=JavaThread::thread_from_jni_environment(env);
...
thread->parker()->park(isAbsolute != 0, time);

就是获取java线程的parker对象,然后执行它的park方法。Parker的定义如下

class Parker : public os::PlatformParker {
private:
   //表示许可
  volatile int _counter ; 
  Parker * FreeNext ;
  JavaThread * AssociatedWith ; // Current association
public:
  Parker() : PlatformParker() {
    //初始化_counter
    _counter       = 0 ; 
    FreeNext       = NULL ;
    AssociatedWith = NULL ;
  }
protected:
  ~Parker() { ShouldNotReachHere(); }
public:
  void park(bool isAbsolute, jlong time);
  void unpark();

  // Lifecycle operators  
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;

};

它继承了os::PlatformParker,内置了一个volatitle的 _counter。PlatformParker则是在不同的操作系统中有不同的实现,以linux为例

class PlatformParker : public CHeapObj {
  protected:
    //互斥变量类型
    pthread_mutex_t _mutex [1] ; 
   //条件变量类型
    pthread_cond_t  _cond  [1] ;

  public:        
     ~PlatformParker() { guarantee (0, "invariant") ; }

  public:
    PlatformParker() {
      int status;
     //初始化条件变量,使用    pthread_cond_t之前必须先执行初始化
      status = pthread_cond_init (_cond, NULL);
      assert_status(status == 0, status, "cond_init”);
      // 初始化互斥变量,使用    pthread_mutex_t之前必须先执行初始化
      status = pthread_mutex_init (_mutex, NULL);
      assert_status(status == 0, status, "mutex_init");
    }
} 

上述代码均为POSIX线程接口使用,所以pthread指的也就是posixThread

parker实现如下

void Parker::park(bool isAbsolute, jlong time) {
  if (_counter > 0) {
       //已经有许可了,用掉当前许可
      _counter = 0 ;
     //使用内存屏障,确保 _counter赋值为0(写入操作)能够被内存屏障之后的读操作获取内存屏障事前的结果,也就是能够正确的读到0
      OrderAccess::fence();
     //立即返回
      return ;
  }

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

 if (Thread::is_interrupted(thread, false)) {
 // 线程执行了中断,返回
    return;
  }

  if (time < 0 || (isAbsolute && time == 0) ) { 
    //时间到了,或者是代表绝对时间,同时绝对时间是0(此时也是时间到了),直接返回,java中的parkUtil传的就是绝对时间,其它都不是
   return;
  }
  if (time > 0) {
  //传入了时间参数,将其存入absTime,并解析成absTime->tv_sec(秒)和absTime->tv_nsec(纳秒)存储起来,存的是绝对时间
    unpackTime(&absTime, isAbsolute, time);
  }

 //进入safepoint region,更改线程为阻塞状态
  ThreadBlockInVM tbivm(jt);

 if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
  //如果线程被中断,或者是在尝试给互斥变量加锁的过程中,加锁失败,比如被其它线程锁住了,直接返回
    return;
  }
//这里表示线程互斥变量锁成功了
  int status ;
  if (_counter > 0)  {
    // 有许可了,返回
    _counter = 0;
    //对互斥变量解锁
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    OrderAccess::fence();
    return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.  
// (This allows a debugger to break into the running thread.)  
 //debug用
sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
//将java线程所拥有的操作系统线程设置成 CONDVAR_WAIT状态 ,表示在等待某个条件的发生
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
//将java的_suspend_equivalent参数设置为true
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
  if (time == 0) {
    //把调用线程放到等待条件的线程列表上,然后对互斥变量解锁,(这两是原子操作),这个时候线程进入等待,当它返回时,互斥变量再次被锁住。
  //成功返回0,否则返回错误编号
    status = pthread_cond_wait (_cond, _mutex) ;
  } else {
  //同pthread_cond_wait,只是多了一个超时,如果超时还没有条件出现,那么重新获取胡吃两然后返回错误码 ETIMEDOUT
    status = os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime) ;
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
   //WorkAroundNPTLTimedWaitHang 是JVM的运行参数,默认为1
  //去除初始化
      pthread_cond_destroy (_cond) ;
//重新初始化
      pthread_cond_init    (_cond, NULL);
    }
  }
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");

#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
 //等待结束后,许可被消耗,改为0  _counter = 0 ;
//释放互斥量的锁
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  // If externally suspended while waiting, re-suspend 
    if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
//加入内存屏障指令
  OrderAccess::fence();
}

从park的实现可以看到

  1. 无论是什么情况返回,park方法本身都不会告知调用方返回的原因,所以调用的时候一般都会去判断返回的场景,根据场景做不同的处理
  2. 线程的等待与挂起、唤醒等等就是使用的POSIX的线程API
  3. park的许可通过原子变量_count实现,当被消耗时,_count为0,只要拥有许可,就会立即返回

OrderAccess::fence();

在linux中实现原理如下

inline void OrderAccess::fence() {
  if (os::is_MP()) {
#ifdef AMD64
  // 没有使用mfence,因为mfence有时候性能差于使用 locked addl
    __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
#else    __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
#endif  }
}

内存重排序网上的验证

ThreadBlockInVM tbivm(jt)
这属于C++新建变量的语法,也就是调用构造函数新建了一个变量,变量名为tbivm,参数为jt。类的实现为

class ThreadBlockInVM : public ThreadStateTransition {
 public:
  ThreadBlockInVM(JavaThread *thread)
  : ThreadStateTransition(thread) {
    // Once we are blocked vm expects stack to be walkable    
    thread->frame_anchor()->make_walkable(thread);
   //把线程由运行状态转成阻塞状态
    trans_and_fence(_thread_in_vm, _thread_blocked);
  }
  ...
};

_thread_in_vm 表示线程当前在VM中执行,_thread_blocked表示线程当前阻塞了,他们是globalDefinitions.hpp中定义的枚举

//这个枚举是用来追踪线程在代码的那一块执行,用来给 safepoint code使用,有4种重要的类型,_thread_new/_thread_in_native/_thread_in_vm/_thread_in_Java。形如xxx_trans的状态都是中间状态,表示线程正在由一种状态变成另一种状态,这种方式使得 safepoint code在处理线程状态时,不需要对线程进行挂起,使得safe point code运行更快,而给定一个状态,通过+1就可以得到他的转换状态
enum JavaThreadState {
  _thread_uninitialized     =  0, // should never happen (missing initialization) 
_thread_new               =  2, // just starting up, i.e., in process of being initialized 
_thread_new_trans         =  3, // corresponding transition state (not used, included for completeness)  
_thread_in_native         =  4, // running in native code  . This is a safepoint region, since all oops will be in jobject handles
_thread_in_native_trans   =  5, // corresponding transition state  
_thread_in_vm             =  6, // running in VM 
_thread_in_vm_trans       =  7, // corresponding transition state 
_thread_in_Java           =  8, //  Executing either interpreted or compiled Java code running in Java or in stub code  
_thread_in_Java_trans     =  9, // corresponding transition state (not used, included for completeness) 
_thread_blocked           = 10, // blocked in vm 
_thread_blocked_trans     = 11, // corresponding transition state 
_thread_max_state         = 12  // maximum thread state+1 - used for statistics allocation
};

父类ThreadStateTransition中定义trans_and_fence如下

void trans_and_fence(JavaThreadState from, JavaThreadState to) { transition_and_fence(_thread, from, to);} //_thread即构造函数传进来de thread
// transition_and_fence must be used on any thread state transition
// where there might not be a Java call stub on the stack, in
// particular on Windows where the Structured Exception Handler is
// set up in the call stub. os::write_memory_serialize_page() can
// fault and we can't recover from it on Windows without a SEH in
// place.
//transition_and_fence方法必须在任何线程状态转换的时候使用
static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) {
  assert(thread->thread_state() == from, "coming from wrong thread state");
  assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states");
//标识线程转换中
    thread->set_thread_state((JavaThreadState)(from + 1));

  // 设置内存屏障,确保新的状态能够被VM 线程看到
if (os::is_MP()) {
    if (UseMembar) {
      // Force a fence between the write above and read below     
        OrderAccess::fence();
    } else {
      // Must use this rather than serialization page in particular on Windows      
        InterfaceSupport::serialize_memory(thread);
    }
  }

  if (SafepointSynchronize::do_call_back()) {
    SafepointSynchronize::block(thread);
  }
//线程状态转换成最终的状态,对待这里的场景就是阻塞
  thread->set_thread_state(to);

  CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();)
}


操作系统线程状态的一般取值
在osThread中给定了操作系统线程状态的大致取值,它本身是依据平台而定

enum ThreadState {
 ALLOCATED,                    // Memory has been allocated but not initialized  
INITIALIZED,                  // The thread has been initialized but yet started 
RUNNABLE,                     // Has been started and is runnable, but not necessarily running  
MONITOR_WAIT,                 // Waiting on a contended monitor lock  
CONDVAR_WAIT,                 // Waiting on a condition variable  
OBJECT_WAIT,                  // Waiting on an Object.wait() call  
BREAKPOINTED,                 // Suspended at breakpoint  
SLEEPING,                     // Thread.sleep()  
ZOMBIE                        // All done, but not reclaimed yet
};

unpark 源码追踪
实现如下

void Parker::unpark() {
  int s, status ;
 //给互斥量加锁,如果互斥量已经上锁,则阻塞到互斥量被解锁
//park进入wait时,_mutex会被释放
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ; 
  //存储旧的_counter
  s = _counter; 
//许可改为1,每次调用都设置成发放许可
  _counter = 1;
  if (s < 1) {
     //之前没有许可
     if (WorkAroundNPTLTimedWaitHang) {
      //默认执行 ,释放信号,表明条件已经满足,将唤醒等待的线程
        status = pthread_cond_signal (_cond) ;
        assert (status == 0, "invariant") ;
        //释放锁
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
     } else {
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
        status = pthread_cond_signal (_cond) ;
        assert (status == 0, "invariant") ;
     }
  } else {
   //一直有许可,释放掉自己加的锁,有许可park本身就返回了
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}

从源码可知unpark本身就是发放许可,并通知等待的线程,已经可以结束等待了。

总结

park/unpark能够精准的对线程进行唤醒和等待。
linux上的实现是通过POSIX的线程API的等待、唤醒、互斥、条件来进行实现的
park在执行过程中首选看是否有许可,有许可就立马返回,而每次unpark都会给许可设置成有,这意味着,可以先执行unpark,给予许可,再执行park立马自行,适用于producer快,而consumer还未完成的场景参考地址。
 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

LockSupport中的park与unpark原理 的相关文章

  • C#操作Word Aspose.Words组件介绍及使用 基本介绍与DOM概述

    1 基本介绍 Aspose Words是一个商业 NET类库 xff0c 可以使得应用程序处理大量的文件任务 Aspose Words支持Doc xff0c Docx xff0c RTF xff0c HTML xff0c OpenDocum
  • C# Aspose.Word 操作word文档(利用模板)

    上一篇我们介绍了用书签的方式来填充word中数据 xff0c 今天介绍第二种方法操作word xff01 依旧是先自己建好word模板 xff0c 然后这里就不需要插入书签了 1 建立模板 能看到红色标示的区域依旧用到了书签 xff0c 没
  • C#使用Aspose.Words操作word文档(利用模板2)

    最近接到个需求 xff0c 由于客服这边要导出大量有一定规则的word文件 xff0c 里面的内容希望系统自动填充 xff0c 例如 这里我使用Aspose Words dll这个类库 xff0c 1 首先 xff0c 我们需要创建模板文件
  • aspose 生成word 简单的文档操作

    1 使用Aspose Words 插件 这个插件的好处是 xff0c 发布网站的服务不需要安装office xff0c 也可以进行数据生成word文档 在生成word问当前需要我们先做好一个word模板 xff0c 需要在 xff1a 后边
  • Aspose.Word 的常见使用(不用模板创建)

    起因 因项目需要 xff0c 而且使用html转Word的时候 xff0c 样式不兼容问题 xff0c 于是只能使用Aspose Word通过代码生成 下面是通过DocumentBuilder来设计Word的 xff0c 但是和使用模型拼接
  • FileAlterationListenerAdaptor监听文件和文件夹

    背景 项目中有需要监听文件 文件夹的需求 xff0c 以便在文件 文件夹发生变化时出发相应的业务流程 这里使用Spring Boot 43 Apache Commons IO方案 另外 xff0c Apache Commons IO涉及到多
  • C# 设置word文档页面大小

    我们知道 xff0c 在MS word中 xff0c 默认的页面大小是letter 8 5 x11 xff0c 除此之外 xff0c word还提供了其他一些预定义的页面大小 xff0c 如Legal 5 4 x14 xff0c A3 11
  • C# 设置Word文档中图片的大小

    在创建Word文档时 xff0c 我们经常需要向文档中插入图片 xff0c 但插入图片的大小有时候可能会太大或太小 xff0c 这时候我们就需要对图片的大小进行调整 xff0c 使得图片与文章更加协调 美观 这篇文章将介绍如何使用Free
  • C#无损高质量压缩图片实现代码

    最近 xff0c 项目上涉及到了图像压缩 xff0c 发现原有的图像压缩功能 xff0c 虽然保证了图像的大小300K以内 xff0c 但是压缩后的图像看的不在清晰 xff0c 并且 xff0c 限定了图片的Height或者是Width 在
  • c# Bitmap byte[] Stream 文件相互转换

    byte 转图片 public static Bitmap BytesToBitmap byte Bytes MemoryStream stream 61 null try stream 61 new MemoryStream Bytes
  • bitmap与memoryStream转换bug

    image Save PicMainMs System Drawing Imaging ImageFormat Png Bitmap RawFormat获取格式会有异常情况encode为空的bug xff0c 所以设为System Draw
  • C#类的属性遍历及属性值获取

    1 定义一个类 public class Person public string Name get set public int ID get set 2 获取属性 方法一 定义一个类的对象获取 Person p 61 new Perso
  • c# 遍历对象属性给对象赋值

    using System using System Collections Generic using System Linq using System Web using System Web UI using System Web UI
  • C#中5步完成word文档打印的方法

    在日常工作中 xff0c 我们可能常常需要打印各种文件资料 xff0c 比如word文档 对于编程员 xff0c 应用程序中文档的打印是一项非常重要的功能 xff0c 也一直是一个非常复杂的工作 特别是提到Web打印 xff0c 这的确会很
  • 保存文件对话框实例

    string savePath 61 34 34 SaveFileDialog sfd 61 new SaveFileDialog sfd Filter 61 34 txt files docx docx All files 34 sfd
  • 项目移植,项目环境问题

    1 调用windows组件时遇到的问题 提示Interop Microsoft Office Core等找不到接口 xff0c 可在引用处将其dll的嵌入式互操作属性改为false 2 配置框架问题 确认 netframework版本是否合
  • javabean拷贝,list拷贝,工具类

    可以实现单个对象拷贝 xff0c List拷贝 xff0c 源和目标类属性字段不一致时也可以拷贝 回调类 xff1a 用于处理在拷贝过程中源和目标类字段不一致的情况 lt p gt Title BeanCopierUtilCallBack
  • c++:json字符串拼接,json对象组装

    c 43 43 算法使用json输出最终结果给java使用 xff0c 于是 xff0c 开始了json对象的组装之旅 首先 xff0c 对不同数据类型 xff0c 封装不同的数据组装函数 拼接int std string getKeyVa
  • C/C++由字符串转JSON/JSON转字符串/数组解析/数组添加

    字符串转成JSON xff08 其中str为字符串 xff09 1 2 3 4 5 Json Reader Reader Json Value DevJson Reader parse str DevJson int dev id 61 D

随机推荐