高阶源码分析:ConcurrentHashMap

2023-10-27

高阶源码分析-ConcurrentHashMap

一、文章导读:

这部分内容让大家读懂ConcurrentHashMap源码的底层实现从而在工作中合理去使用他并且在面试中能做到游刃有余,主要内容如下:

  • 核心构造方法
  • 核心成员变量
  • put方法过程数据的完整过程
  • get方法的无锁实现

二、文章讲解内容

JDK8中ConcurrentHashMap的结构是:数组+链表+红黑树。

​ 因为在hash冲突严重的情况下,链表的查询效率是O(n),所以jdk8中改成了单个链表的个数大于8时,数组长度小于64就扩容,数组长度大于等于64,则链表会转换为红黑树,这样以空间换时间,查询效率会变O(nlogn)。

​ 红黑树在Node数组内部存储的不是一个TreeNode对象,而是一个TreeBin对象,TreeBin内部维持着一个红黑树。

​ 在JDK8中ConcurrentHashMap最经点的实现是使用CAS+synchronized+volatile 来保证并发安全

一、JDK7中ConcurrentHashMap的实现

​ 在JDK1.7中ConcurrentHashMap是通过定义多个Segment来实现的分段加锁,一个Segment对应一个数组,如果多线程对同一个数组中的元素进行添加那么多个线程会去竞争同一把锁,他锁的是一个数组,有多少个segment那么就有多少把锁,这个力度还是比较粗的。

在这里插入图片描述

​ JDK8的实现是下文要重点探讨的内容,同时下文全部都是JDK8的实现。

二、核心构造方法
    /**
     * Creates a new, empty map with the default initial table size (16).
     	无参构造函数,new一个默认table容量为16的ConcurrentHashMap
     */
    public ConcurrentHashMap() {
    }
    /**
     * Creates a new, empty map with an initial table size
     * accommodating the specified number of elements without the need
     * to dynamically resize.
     *
     * @param initialCapacity The implementation performs internal
     * sizing to accommodate this many elements.
     * @throws IllegalArgumentException if the initial capacity of
     * elements is negative
     做了3件事:
     1.如果入参<0,抛出异常。
	 2.如果入参大于最大容量,则使用最大容量(是2的30次方)
	 3.tableSizeFor方法得到一个大于负载因子入参且最接近2的N次方的数作为容量
	 4.设置sizeCtl的值等于初始化容量。未对table进行初始化,table的初始化要在第一次put的时候进行。
     */
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

特别注意:未对table进行初始化,table的初始化要在第一次put的时候进行

三、核心成员变量
// 该字段控制table(也被称作hash桶数组)的初始化和扩容
private transient volatile int sizeCtl;
// table最大容量是2的30次方
private static final int MAXIMUM_CAPACITY = 1 << 30;
// table的默认初始化容量-扩容总是2的n次方
private static final int DEFAULT_CAPACITY = 16;
// table的负载因子。当前已使用容量 >= 负载因子*总容量的时候,进行resize扩容
private static final float LOAD_FACTOR = 0.75f;
// 存储数据的数组-注意添加了volatile
transient volatile Node<K,V>[] table;
// 当桶内链表长度>=8时,会将链表转成红黑树-注意需要和MIN_TREEIFY_CAPACITY结合起来用 
static final int TREEIFY_THRESHOLD = 8;
// table的总容量,要大于64,桶内链表才转换为树形结构,否则当桶内链表长度>=8时会扩容
static final int MIN_TREEIFY_CAPACITY = 64;
// 当桶内node小于6时,红黑树会转成链表。
static final int UNTREEIFY_THRESHOLD = 6;
四、put存储数据

​ 先从JDK的源码中复制一段上头的代码,如下代码就完成了数据的添加,在添加的时候还完成了数组的初始化、扩容、CAS修改、分段锁的实现,大家大体的对该代码有一个认识即可我们后面会详细的画图分析该过程

	public V put(K key, V value) {
        return putVal(key, value, false);
    }
 
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 如果key或者value为空,抛出异常
        if (key == null || value == null) throw new NullPointerException();
        // 得到hash值
        int hash = spread(key.hashCode());
        // 用来记录所在table数组中的桶的中链表的个数,后面会用于判断是否链表过长需要转红黑树
        int binCount = 0;
        // for循环,直到put成功插入数据才会跳出
        for (Node<K,V>[] tab = table;;) { 
            // f=桶头节点  n=table的长度  i=在数组中的哪个下标  fh=头节点的hash值
            Node<K,V> f; int n, i, fh; 
            // 如果table没有初始化
            if (tab == null || (n = tab.length) == 0)               
                // 初始化table
                tab = initTable(); 
            // 根据数组长度减1再对hash值取余得到在node数组中位于哪个下标
            // 用tabAt获取数组中该下标的元素
            // 如果该元素为空
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 
                // 直接将put的值包装成Node用tabAt方法放入数组内这个下标的位置中
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            } 
            // 如果头结点hash值为-1,则为ForwardingNode结点,说明正再扩容,
            else if ((fh = f.hash) == MOVED)   
                // 调用hlepTransfer帮助扩容
                tab = helpTransfer(tab, f);
            // 否则锁住槽的头节点
            else {
                V oldVal = null;
                // 锁桶的头节点
                synchronized (f) {  
                    // 双重锁检测,看在加锁之前,该桶的头节点是不是被改过了
                    if (tabAt(tab, i) == f) {
                        // 如果桶的头节点的hash值大于0
                        if (fh >= 0) {
                            binCount = 1;
                            // 遍历链表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 如果遇到节点hash值相同,key相同,看是否需要更新value
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //如果到链表尾部都没有遇到相同的
                                if ((e = e.next) == null) {
                                    // 就生成Node挂在链表尾部,该Node成为一个新的链尾。
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果桶的头节点是个TreeBin
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            // 用红黑树的形式添加节点或者更新相同hash、key的值。
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }   
                if (binCount != 0) {
                    // 如果链表长度>需要树化的阈值
                    if (binCount >= TREEIFY_THRESHOLD) 
                        //调用treeifyBin方法将链表转换为红黑树,而这个方法中会判断数组值是否大于64,如果没有大于64则只扩容
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        // 如果是修改,不是新增,则返回被修改的原值
                        return oldVal;
                    break;
                }
            }
        }
        // 计数器加1,完成新增后,table扩容,就是这里面触发
        addCount(1L, binCount);
        // 新增后,返回Null
        return null;
    }
4.1 高效的hash算法
int hash = spread(key.hashCode());
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

​ 该算法其实在HashMap中我们就已经重点说过了,他既保存了key值得hash的高16位也保存了低16位,从而让key值在不同的数组中尽可能的散列,从而避免hash冲突。

4.2 数组的初始化
for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    if (tab == null || (n = tab.length) == 0)
		tab = initTable();

​ 注意是在循环中判断的table是否为空如果为空则会调用initTable完成数组的默认初始化。

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;

以上代码做了如下事情:

1.如果table为空,进入while准备开始初始化。

2.将sizeCtl赋值给sc。如果sizeCtl<0,线程等待,小于零时表示有其他线程在执行初始化。

3.如果能将sizeCtl设置为-1,则开始进行初始化操作

4.用户有指定初始化容量,就用用户指定的,否则用默认的16.

5.生成一个长度为16的Node数组,把引用给table。

6.重新设置sizeCtl=数组长度 - (数组长度 >>>2) 这是忽略符号的移位运算符,可以理解成 n - (n / 2)。

在这里插入图片描述

4.3 不冲突的数据添加过程
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
     new Node<K,V>(hash, key, value, null)))
        break;       // no lock when adding to empty bin
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

高效的寻址算法: (n - 1) & hash这个在我们的HashMap中也是讲过的了就不在赘述。

​ 通过高效的寻址算法得到一个索引并获取该索引的数据如果不为空则调用casTabAt方法进行CAS的原子修改,CAS在Java层面是无锁的所以速度会很快,但是他在硬件层面是有锁的,他会在硬件的拉链散列表中加锁。

​ 如果有多个线程同时hash到了该索引我们的CAS也能保证只有一个线程能添加成功其他的线程其实是走分段加锁的过程。

在这里插入图片描述

4.4 分段加锁策略
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }

​ 需要大家特别注意的是synchronized (f)锁了一个f对象,那么这个f对象是什么呢?其实就是我们高效寻址算法的到的一个下标中存储的对象,注意他锁的是我们寻址到的这个对象并没有锁这个数组,所以我们当前的锁一共有多少把呢?就看你的size有多少了默认是16那么就会有16把锁可以来并发的修改,这个其实就是JDK1.8的分段锁拉,他比1.7锁的粒度更细那么并发的效果就会更好。

在这里插入图片描述

五、无锁获取
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

​ 以上代码中的内容几乎我们都讲过了,所以就不在赘述,但是值得一说的是获取的过程中并没有对数组或者某一个Node元素添加锁,获取是无锁的所有性能高。

​ 还有一个问题需要注意的是获取是无锁的那么他如果出现多线程修改或者写入的时候他就有可能会出现可见性的问题,因为每一个线程都有自己的工作内存,那么ConcurrentHashMap是如何解决可见性的问题呢?

transient volatile Node<K,V>[] table;

​ 从变量的申明中我们可以看到存储数据的table其实是添加了volatile关键字的,所以其他线程修改了以后我们要求其他的线程把数据刷新主内存从而保证数据的可见性。

五、总结:

  1. JDK1.7 使用分段锁实现
  2. JDK1.8 使用CAS+synchronized+volatile 的具体实现
  3. put方法的复杂实现过程
  4. get方法的无锁实现尤其是volatile关键字的使用

有一个问题需要注意的是获取是无锁的那么他如果出现多线程修改或者写入的时候他就有可能会出现可见性的问题,因为每一个线程都有自己的工作内存,那么ConcurrentHashMap是如何解决可见性的问题呢?

transient volatile Node<K,V>[] table;

​ 从变量的申明中我们可以看到存储数据的table其实是添加了volatile关键字的,所以其他线程修改了以后我们要求其他的线程把数据刷新主内存从而保证数据的可见性。

五、总结:

  1. JDK1.7 使用分段锁实现
  2. JDK1.8 使用CAS+synchronized+volatile 的具体实现
  3. put方法的复杂实现过程
  4. get方法的无锁实现尤其是volatile关键字的使用
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

高阶源码分析:ConcurrentHashMap 的相关文章

随机推荐

  • 硬盘的几点真相

    因为想自己修下硬盘 所以先找了远古时代的笔记本硬盘拆了看看结构 结果发现几个很有意思的地方 首先就是那个 do not cover 的小孔 相信很多人都注意到了硬盘上有个很小的小孔 从外面看进去好像是个微型可调电容 周围一圈文字 do no
  • Matlab 随机森林工具箱的配置使用

    总结一下Matlab随机森林工具箱的配置和使用 配置环境Matlab2019a windows10 vs2017 1 下载Matlab随机森林工具箱 下载地址http www buaapress com cn mzs file detail
  • java持久层框架数据源加密

    学习目标 java持久层框架数据源加密 文章目录 学习目标 1 mybatis框架数据源加密 https www cnblogs com melovemingming p 10699613 html 这里是引用https www cnblo
  • 基于HAL库的stm32的OLED显示屏显示(模拟I2C,四脚,0.96寸)

    参考视频 江科大oled程序移植stm32hal库 freertos学习 cpu使用率 哔哩哔哩 bilibili STM32入门教程 2023持续更新中 哔哩哔哩 bilibili 第一步 STM32CubeMX配置 一 时钟树配置 高速
  • vs2012配置python_Visual Studio 2012 Ultimate 上安装 Python 开发插件 PTVS

    1 我的环境 操作系统 32位 Win7 旗舰版 Service Pack 1 VS版本 Microsoft Visual Studio Ultimate 2012 版本 11 0 50727 1 RTMREL Python解释器版本 Py
  • CAD颗粒密堆积2D插件 球体重力堆积 离散元建模 分子热运动

    插件简介 CAD颗粒密堆积2D插件可用于生成二维状态下重力堆积的随机颗粒 插件可指定投放区域 颗粒的粒径范围 颗粒间的间距 颗粒个数等信息 同时可模拟颗粒弹性及摩擦摩擦系数 插件采用物理引擎对颗粒行为进行模拟 可实现颗粒在力场作用下的堆积
  • c语言小游戏 精简_C语言实现消消乐小游戏

    本文实例为大家分享了C语言实现消消乐小游戏的具体代码 供大家参考 具体内容如下 代码 include include include include include include include include using namespa
  • 线性代数-坐标系变换

    问题描述 已知一个全局坐标系 还有若干局部坐标系 如何将局部坐标系的坐标转成全局坐标系的坐标 反过来又如何进行 这里的坐标系都是直角坐标系 本文通过下面几个方面的研究来回答上面的问题 1 简单示例 2 求解过程 3 nodejs编程验证 简
  • 【数据结构与算法】数据结构知识点总结

    文章目录 前言 一 数组 一 知识点 二 常用操作代码示例 1 声明数组 2 初始化数组 3 访问数组元素 4 修改数组元素 5 遍历数组 6 数组作为函数参数 二 链表 一 知识点 二 常用操作代码示例 1 定义链表节点结构体 2 创建链
  • 从零开始到设计Python+Selenium自动化测试框架-如何开始

    如何开始学习web ui自动化测试 如何选择一门脚本语言 选择什么自动化测试工具 本人已经做测试快5年 很惭愧 感觉积累不够 很多测试都不会 三年多功能测试 最近两年才开始接触和学习自动化测试 打算写一个系列文章 关于如何从零开始到会设计和
  • 服务器修改编码格式,设置服务器编码格式

    设置服务器编码格式 内容精选 换一换 在异构计算架构中 昇腾AI处理器与CPU通过PCIe总线连接在一起来协同工作 Host CPU所在位置称为主机端 Host 是指与昇腾AI处理器所在硬件设备相连接的X86服务器 ARM服务器或者Wind
  • java基础之集合

    集合 基础里的重头戏来喽 一 集合概述 集合和数组的区别 概述 集合是java中用来存放多个 引用数据类型 数据的容器 它是解决了数组的一些弊端的一个多数据容器 他有 的操作集合的方法 比如增加和删除方法 java中集合的体系使用接口和类进
  • RAC重建OCR/Voting disk遇到的一些故障

    author skate time 2010 05 09 我的测试环境 母系统 win2003虚拟软件 vmware3 2 1guest系统 centos4 7oracle db oracle10 2 1 以下是我在重建rac的ocr vo
  • 全球分布式云大会:AntDB超融合流式实时数仓,打造分布式数据库新纪元

    日前 全球分布式云大会北京站在北京金茂万丽酒店举办 亚信科技AntDB数据库受邀参会 会上技术负责人北陌发表以 AntDB超融合流式实时数仓 打造分布式数据库新纪元 为主题的演讲 通过分享AntDB在数据库前沿技术的研发实践 与参会嘉宾一起
  • springboot的安全性

    如何实现 Spring Boot 应用程序的安全性 为了实现 Spring Boot 的安全性 我们使用 spring boot starter security 依赖项 并且必须添加安全配置 它只需要很少的代码 配置类将必须扩展WebSe
  • BM8 链表中倒数最后k个结点

    struct ListNode int val struct ListNode next ListNode int x val x next nullptr class Solution public 代码中的类名 方法名 参数名已经指定
  • 代码块11

    import numpy as np rot matrix 0 5 np asarray 1 1 1 1 1 1 1 1 np sqrt 2 np sqrt 2 0 0 0 0 np sqrt 2 np sqrt 2 判断矩阵是否正交 pr
  • flash开发ipa踩的坑

    1 xcode中找到libclang rt ios a添加到AIRSDK lib aot lib中 2 ios 打包ane 要添加 如果是 tbd则当dylib一样处理 去AIRSDK lib aot stub里面看有没有 dylib需要在
  • C++类和动态内存分配

    1 动态内存和类 静态数据成员在类声明中声明 在包含类方法的文件中初始化 初始化时使用作用域运算符来指出静态成员所属类 如果静态成员是整形或枚举型const 则可以在类声明中初始化 在构造函数中使用new来分配内存时 必须在相应的析构函数中
  • 高阶源码分析:ConcurrentHashMap

    高阶源码分析 ConcurrentHashMap 一 文章导读 这部分内容让大家读懂ConcurrentHashMap源码的底层实现从而在工作中合理去使用他并且在面试中能做到游刃有余 主要内容如下 核心构造方法 核心成员变量 put方法过程