ConcurrentHashMap概述

2023-11-02

1. 前言

为什么要使用 ConcurrentHashMap

主要基于两个原因:

  1. 在并发编程中,jdk1.7的情况下使用 HashMap 可能造成死循环,而jdk1.8 中有可能会造成数据丢失
  2. HashTable 效率非常低下

2. ConcurrentHashMap 结构

ConcurrentHashMap 为了提高本身的并发能力,在内部采用了一个叫做 Segment 的结构,一个 Segment 其实就是一个类 Hash Map 的结构,Segment 内部维护了一个链表数组,我们用下面这一幅图来看下 ConcurrentHashMap 的内部结构,从下面的结构我们可以了解到,ConcurrentHashMap 定位一个元素的过程需要进行两次Hash操作,第一次 Hash 定位到 Segment,第二次 Hash 定位到元素所在的链表的头部,因此,这一种结构的带来的副作用是 Hash 的过程要比普通的 HashMap 要长,但是带来的好处是写操作的时候可以只对元素所在的 Segment 进行操作即可,不会影响到其他的 Segment,这样,在最理想的情况下,ConcurrentHashMap 可以最高同时支持 Segment 数量大小的写操作(刚好这些写操作都非常平均地分布在所有的 Segment上),所以,通过这一种结构,ConcurrentHashMap 的并发能力可以大大的提高。我们用下面这一幅图来看下ConcurrentHashMap的内部结构详情图,如下:

不难看出,ConcurrentHashMap采用了二次hash的方式,第一次hash将key映射到对应的segment,而第二次hash则是映射到segment的不同桶(bucket)中。

为什么要用二次hash,主要原因是为了构造分离锁,使得对于map的修改不会锁住整个容器,提高并发能力。当然,没有一种东西是绝对完美的,二次hash带来的问题是整个hash的过程比hashmap单次hash要长,所以,如果不是并发情形,不要使用concurrentHashmap。

JAVA7之前ConcurrentHashMap主要采用锁机制,在对某个Segment进行操作时,将该Segment锁定,不允许对其进行非查询操作,而在JAVA8之后采用CAS无锁算法,这种乐观操作在完成前进行判断,如果符合预期结果才给予执行,对并发操作提供良好的优化.

让我们先看JDK1.7的ConcurrentHashMap的原理分析

2.1 jdk 1.7 中结构

Segment默认是16,按理说最多同时支持16个线程并发读写,初始化时也可以指定Segment数量,每一个Segment都会有一把锁,保证线程安全。

该结构的优劣势
1、坏处是这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长。
2、好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上)。
所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。

构造方法

ConcurrentHashMap(int initialCapacity, float loadFactor,int currencyLevel),initialCapacity为数组的总大小,loadFactor为加载因子,currencyLevel为并发度,可以认为是Segment 数组的长度

假设initialCapacity=32,currencyLevel=16,那么每一个Segment 中的HashEntry 数组的大小为initialCapacity/currencyLevel=2。

2.2 jdk 1.8 中结构

JDK1.8的currentHashMap参考了1.8HashMap的实现方式,采用了数组+链表+红黑树的实现方式,其中大量的使用CAS操作.CAS(compare and swap)的缩写,也就是我们说的比较交换。
CAS是一种基于锁的操作,而且是乐观锁。java的锁中分为乐观锁和悲观锁。悲观锁是指将资源锁住,等待当前占用锁的线程释放掉锁,另一个线程才能够获取线程.乐观锁是通过某种方式不加锁,比如说添加version字段来获取数据。
CAS操作包含三个操作数(内存位置,预期的原值,和新值)。如果内存的值和预期的原值是一致的,那么就转化为新值。CAS是通过不断的循环来获取新值的,如果线程中的值被另一个线程修改了,那么A线程就需要自旋,到下次循环才有可能执行。
JDK8中彻底放弃了Segment转而采用的是Node,其设计思想也不再是JDK1.7中的分段锁思想。
Node:保存keyvaluekeyhash值的数据结构。其中valuenext都用volatile修饰,保证并发的可见性。
在JDK8中ConcurrentHashMap的结构,引入了红黑树,红黑树是一种性能非常好的二叉查找树,其查找性能为O(logN),早期完全采用链表结构时Map的查找时间复杂度为O(N),JDK8中ConcurrentHashMap在链表的长度大于某个阈值的时候会将链表转换成红黑树进一步提高其查找性能。

3. 实现

3.1 JDK 1.7 中的实现

3.1.1 初始化

ConcurrentHashMap 的初始化是通过位运算来初始化 Segment 的大小的(ssize 表示),通过concurrentLevel 计算得出。

int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
}
复制代码

ssize 用位于运算来计算(ssize <<=1),所以 Segment 的大小取值都是以2的N次方,Segment 的大小 ssize 默认为16.

每一个 Segment 元素下的 HashEntry 的初始化也是按照位于运算来计算,用 cap 来表示

int cap = 1;
while (cap < c)
    cap <<= 1;
复制代码

HashEntry 大小的计算也是2的N次方(cap <<=1), cap 的初始值为1,所以 HashEntry 最小的容量为2.

根据SegmentShift和SegmentMask定位到哪个Segment

其中,concurrencyLevel 一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。

3.1.2 get 操作

Segment 的 get 操作实现非常简单和高效,先经过一次再散列,然后使用这个散列值通过散列运算定位到 Segment,再通过散列算法定位到元素。

public V get(Object key){
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key,hash);
}
复制代码

get 操作的高效之处在于整个 get 过程都不需要加锁,除非读到空的值才会加锁重读。原因就是将使用的共享变量定义成 volatile 类型。

transient volatile int count;
volatile V value;
复制代码

3.1.3 put 操作

对于 ConcurrentHashMap 的数据插入,这里要进行两次 Hash 去定位数据的存储位置

 public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
     //(1)
        int hash = hash(key);
     //(2)
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
     //(3)
        return s.put(key, hash, value, false);
}

代码(1)计算key的hash值

代码(2)根据hash值,segmentShift,segmentMask定位到哪个Segment。

代码(3)将键值对保存到对应的segment中。

可以看到首先是通过 key 定位到 Segment,之后在对应的 Segment 中进行具体的 put。 Segment 中进行具体的 put的源码如下:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 无论如何,确保获取锁 scanAndLockForPut会去查找是否有key相同Node
    ConcurrentHashMap.HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        ConcurrentHashMap.HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        ConcurrentHashMap.HashEntry<K,V> first = entryAt(tab, index);
        for (ConcurrentHashMap.HashEntry<K,V> e = first;;) {
            // 更新已存在的key
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new ConcurrentHashMap.HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 判断是否需要扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

当执行put操作时,会经历两个步骤:

  1. 判断是否需要扩容
  2. 定位到添加元素的位置,将其放入 HashEntry 数组中

插入过程会进行第一次 key 的 hash 来定位 Segment 的位置,如果该 Segment 还没有初始化,即通过 CAS 操作进行赋值,然后进行第二次 hash 操作,找到相应的 HashEntry 的位置,这里会利用继承过来的锁的特性,在将数据插入指定的 HashEntry 位置时(尾插法),会通过继承 ReentrantLock 的 tryLock() 方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用 tryLock() 方法去获取锁,超过指定次数就挂起,等待唤醒。

由于 Segment 对象本身就是一把锁,所以在新增数据的时候,相应的 Segment对象块是被锁住的,其他线程并不能操作这个 Segment 对象,这样就保证了数据的安全性,在扩容时也是这样的,在 JDK1.7 中的 ConcurrentHashMap扩容只是针对 Segment 对象中的 HashEntry 数组进行扩容,又因为 Segment 对象是一把锁,所以在 rehash 的过程中,其他线程无法对 segment 的 hash 表做操作,这就解决了 HashMap 中 put 数据引起的闭环问题

3.1.4 size 操作

先采用不加锁的方式,连续计算元素的个数,最多计算3次:

  • 如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;
  • 如果前后两次计算结果都不同,则给每个Segment进行加锁,再计算一次元素的个数

其源码实现:

try {
    for (; ; ) {
        if (retries++ == RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                ensureSegment(j).lock(); // force creation  
        }
        sum = 0L;
        size = 0;
        overflow = false;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K, V> seg = segmentAt(segments, j);
            if (seg != null) {
                /* 在put、remove、clean方法里操作
                * 元素都会将变量modCount进行加一,
                * 统计也是依靠这个变量的前后变化来进行的 */
                sum += seg.modCount;
                int c = seg.count;
                if (c < 0 || (size += c) < 0) overflow = true;
            }
        }
        if (sum == last)
            break;
        last = sum;
    }
} finally {
    if (retries > RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            segmentAt(segments, j).unlock();
    }
}
复制代码

3.2 JDK 1.8 中的实现

3.2.1 基本属性及概念

看一下基本属性

//node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始值,必须是2的幂数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
//2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//forwarding nodes的hash值
static final int MOVED = -1;
//树根节点的hash值
static final int TREEBIN = -2;
//ReservationNode的hash值
static final int RESERVED = -3;
//可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
    *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
    *当为0时:代表当时的table还没有被初始化
    *当为正数时:表示初始化或者下一次进行扩容的大小
    */
private transient volatile int sizeCtl;
复制代码

重要概念:

  1. table: 默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方
  2. nextTable: 默认为null,扩容时新生成的数组,其大小为原数组的两倍
  3. Node :保存 key,value 及 key 的 hash 值的数据结构。
class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    //省略部分代码
}
复制代码

其中 key 字段被 final 修饰,说明在生命周期内,key 是不可变的,value 和 next 都用 volatile 修饰,保证并发的可见性。

  1. ForwardingNode: 一个特殊的 Node 节点,hash 值为 -1,其中存储 nextTable 的引用。
final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
}
复制代码

只有table发生扩容的时候,ForwardingNode 才会发挥作用,作为一个占位符放在table中表示当前节点为 null 或则已经被移动。

  1. TreeNode类和TreeBin类:  TreeNode类表示的是红黑树上的每个节点。当一个链表上的节点数量超过了指定的值,会将这个链表变为红黑树,当然每个节点就转换为TreeNode。不像HashMap,ConcurrentHashMap在桶里面直接存储的不是TreeNode,而是一个TreeBin,在TreeBin内部维护一个红黑树,也就是说TreeNode在TreeBin内部使用的。

3.2.2 初始化

实例化 ConcurrentHashMap 时带参数时,会根据参数调整 table 的大小,假设参数为 100,最终会调整成 256,确保 table 的大小总是2的幂次方.

table 初始化

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
        if ((sc = sizeCtl) < 0) 
            //其他线程cas失败,再次进入循环时,发现SIZECTL为-1,谦让出cpu
            Thread.yield(); 
        //cas将SIZECTL设置为-1
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

3.2.3 put 操作

假设 table 已经初始化完成,put 操作采用 CAS + synchronized 实现并发插入或更新操作。

public V put(K key, V value) {  
        return putVal(key, value, false);  
}  

/** Implementation for put and putIfAbsent */  
final V putVal(K key, V value, boolean onlyIfAbsent) {  
        if (key == null || value == null) throw new NullPointerException();  
        int hash = spread(key.hashCode()); //两次hash,减少hash冲突,可以均匀分布  
        int binCount = 0;  
        for (Node<K,V>[] tab = table;;) { //对这个table进行迭代  
            Node<K,V> f; int n, i, fh;  
            //这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化  
            if (tab == null || (n = tab.length) == 0)  
                tab = initTable();  
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入  
                if (casTabAt(tab, i, null,  
                             new Node<K,V>(hash, key, value, null)))  
                    break;                   // no lock when adding to empty bin  
            }  
            else if ((fh = f.hash) == MOVED)//如果在进行扩容,则先进行扩容操作  
                tab = helpTransfer(tab, f);  
            else {  
                V oldVal = null;  
                //如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点  
                synchronized (f) {  
                    if (tabAt(tab, i) == f) {  
                        if (fh >= 0) { //表示该节点是链表结构  
                            binCount = 1;  
                            for (Node<K,V> e = f;; ++binCount) {  
                                K ek;  
                                //这里涉及到相同的key进行put就会覆盖原先的value  
                                if (e.hash == hash &&  
                                    ((ek = e.key) == key ||  
                                     (ek != null && key.equals(ek)))) {  
                                    oldVal = e.val;  
                                    if (!onlyIfAbsent)  
                                        e.val = value;  
                                    break;  
                                }  
                                Node<K,V> pred = e;  
                                if ((e = e.next) == null) {  //插入链表尾部  
                                    pred.next = new Node<K,V>(hash, key,  
                                                              value, null);  
                                    break;  
                                }  
                            }  
                        }  
                        else if (f instanceof TreeBin) {//红黑树结构  
                            Node<K,V> p;  
                            binCount = 2;  
                            //红黑树结构旋转插入  
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,  
                                                           value)) != null) {  
                                oldVal = p.val;  
                                if (!onlyIfAbsent)  
                                    p.val = value;  
                            }  
                        }  
                    }  
                }  
                if (binCount != 0) { //如果链表的长度大于8时就会进行红黑树的转换  
                    if (binCount >= TREEIFY_THRESHOLD)  
                        treeifyBin(tab, i);  
                    if (oldVal != null)  
                        return oldVal;  
                    break;  
                }  
            }  
        }  
        addCount(1L, binCount);//统计size,并且检查是否需要扩容  
        return null;  
}

hash算法

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}
复制代码

table 中定位索引位置,n 是 table 的大小

int index = (n - 1) & hash

获取 table 中对应索引的元素f

Unsafe.getObjectVolatile 可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。

如果 f 为 null,说明 table 中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject 方法插入 Node 节点。

如果 CAS 成功,说明 Node 节点已经插入,随后 addCount(1L, binCount) 方法会检查当前容量是否需要进行扩容。

如果 CAS 失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。

如果f的 hash 值为 -1,说明当前 f 是 ForwardingNode 节点,意味有其它线程正在扩容,则一起进行扩容操作。

其余情况把新的 Node 节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发,代码如下:

synchronized (f) {
    if (tabAt(tab, i) == f) {
        if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                if (e.hash == hash &&
                    ((ek = e.key) == key ||
                     (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node<K,V> pred = e;
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,
                                              value, null);
                    break;
                }
            }
        }
        else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                           value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
            }
        }
    }
}
复制代码

在节点 f 上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改。

如果 f.hash >= 0,说明 f 是链表结构的头结点,遍历链表,如果找到对应的 node 节点,则修改 value,否则在链表尾部加入节点。 如果 f 是 TreeBin 类型节点,说明 f 是红黑树根节点,则在树结构上遍历元素,更新或增加节点。 如果链表中节点数 binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构。

table扩容

当 table 容量不足的时候,即 table 的元素数量达到容量阈值 sizeCtl,需要对 table 进行扩容。

整个扩容分为两部分:

构建一个 nextTable,大小为 table 的两倍。 把 table 的数据复制到 nextTable 中。

这两个过程在单线程下实现很简单,但是 ConcurrentHashMap 是支持并发插入的,扩容操作自然也会有并发的出现,这种情况下,第二步可以支持节点的并发复制,这样性能自然提升不少,但实现的复杂度也上升了一个台阶。

先看第一步,构建nextTable,毫无疑问,这个过程只能只有单个线程进行 nextTable 的初始化,具体实现如下:

private final void addCount(long x, int check) {
    ... 省略部分代码
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}
复制代码

通过 Unsafe.compareAndSwapInt 修改 sizeCtl 值,保证只有一个线程能够初始化 nextTable,扩容后的数组长度为原来的两倍,但是容量是原来的 1.5

节点从 table 移动到 nextTable,大体思想是遍历、复制的过程。

首先根据运算得到需要遍历的次数i,然后利用 tabAt 方法获得 i 位置的元素 f,初始化一个 forwardNode 实例 fwd。

如果 f == null,则在 table 中的 i 位置放入 fwd,这个过程是采用 Unsafe.compareAndSwapObjectf 方法实现的,很巧妙的实现了节点的并发移动。

如果 f 是链表的头节点,就构造一个反序链表,把他们分别放在 nextTable 的 i 和 i+n 的位置上,移动完成,采用 Unsafe.putObjectVolatile 方法给 table 原位置赋值 fwd。 如果 f 是 TreeBin 节点,也做一个反序处理,并判断是否需要 untreeify,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上,移动完成,同样采用 Unsafe.putObjectVolatile 方法给 table 原位置赋值 fwd。 遍历过所有的节点以后就完成了复制工作,把 table 指向 nextTable,并更新 sizeCtl 为新数组大小的 0.75 倍 ,扩容完成。

红黑树构造

注意:如果链表结构中元素超过 TREEIFY_THRESHOLD 阈值,默认为 8 个,则把链表转化为红黑树,提高遍历查询效率。

if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}
复制代码

接下来我们看看如何构造树结构,代码如下:

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
复制代码

可以看出,生成树节点的代码块是同步的,进入同步代码块之后,再次验证 table 中 index 位置元素是否被修改过。

  1. 根据 table 中 index 位置 Node 链表,重新生成一个 hd 为头结点的 TreeNode 链表。
  2. 根据 hd 头结点,生成 TreeBin 树结构,并把树结构的root节点写到 table 的 index 位置的内存中,具体实现如下:
TreeBin(TreeNode<K,V> b) {
    super(TREEBIN, null, null, null);
    this.first = b;
    TreeNode<K,V> r = null;
    for (TreeNode<K,V> x = b, next; x != null; x = next) {
        next = (TreeNode<K,V>)x.next;
        x.left = x.right = null;
        if (r == null) {
            x.parent = null;
            x.red = false;
            r = x;
        }
        else {
            K k = x.key;
            int h = x.hash;
            Class<?> kc = null;
            for (TreeNode<K,V> p = r;;) {
                int dir, ph;
                K pk = p.key;
                if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((kc == null &&
                          (kc = comparableClassFor(k)) == null) ||
                         (dir = compareComparables(kc, k, pk)) == 0)
                    dir = tieBreakOrder(k, pk);
                    TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    x.parent = xp;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    r = balanceInsertion(r, x);
                    break;
                }
            }
        }
    }
    this.root = r;
    assert checkInvariants(root);
}
复制代码

主要根据 Node 节点的 hash 值大小构建二叉树。

3.2.4 get 操作

get操作和put操作相比,显得简单了许多。

public V get(Object key) {
    Node<K,V>[] tab; 
    Node<K,V> e, p;
    int n, eh; 
    K ek;
    //spread方法能确保返回结果是正数
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //如果头结点已经是要查找的key
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //hash为负数,表示该bin在扩容中或是treebin(红黑树),这时调用find方法来查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //正常遍历链表,用equals比较
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
  1. 判断table是否为空,如果为空,直接返回null。
  2. 计算key的hash值,并获取指定table中指定位置的Node节点,通过遍历链表或则树结构找到对应的节点,返回value值。

3.2.4 size 操作

size计算实际发生在put,remove改变集合元素的操作之中

  • 没有竞争发生时,向baseCount累加计数
  • 有竞争时,新建counterCells,向其中的一个cell累加计数

counterCells初始有两个cell

如果计数竞争比较激烈,会创建新的cell来累加计数

JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。我们直接看 size() 代码:

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
           (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}

最大值是 Integer 类型的最大值,但是 Map 的 size 可能超过 MAX_VALUE, 所以还有一个方法 mappingCount(),JDK 的建议使用 mappingCount() 而不是size()mappingCount() 的代码如下:

public long mappingCount() {
    long n = sumCount();
    return (n < 0L) ? 0L : n; // ignore transient negative values
}

以上可以看出,无论是 size() 还是 mappingCount(), 计算大小的核心方法都是 sumCount()sumCount() 的代码如下:

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    //将baseCount计数和所有的cell计数累加
    long sum = baseCount;
    if (as != null) {
       for (int i = 0; i < as.length; ++i) {
           if ((a = as[i]) != null)
               sum += a.value;
           }
       }
    return sum;
}

分析一下 sumCount() 代码。ConcurrentHashMap 提供了 baseCount、counterCells 两个辅助变量和一个 CounterCell 辅助内部类。sumCount() 就是迭代 counterCells 来统计 sum 的过程。 put 操作时,肯定会影响 size(),在 put() 方法最后会调用 addCount() 方法。

总结

  • JDK1.7 和 JDK1.8 对 size 的计算是不一样的。 1.7 中是先不加锁计算三次,如果三次结果不一样在加锁。
  • JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。
  • JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值。

4. JDK 1.8 中为什么要摒弃分段锁

很多人不明白为什么Doug Lea在JDK1.8为什么要做这么大变动,使用重级锁synchronized,性能反而更高,原因如下:

  • jdk1.8中锁的粒度更细了

jdk1.7中ConcurrentHashMap 中的并发度是和segment数组大小保持一致的,segment数组初始化后,不会进行扩容了,并发数是固定的。jdk1.8中的并发度是和node数组大小保持一致的,每次扩容,并发度会扩大一倍

  • 红黑树的引入

链表过长,转红黑树,这个优化使得 hash 冲突时的 put 和 get 效率更高

  • 获得JVM的支持

ReentrantLock 毕竟是 API 这个级别的,后续的性能优化空间很小。 synchronized 则是 JVM 直接支持的, JVM 能够在运行时作出相应的优化措施:锁粗化、锁消除、锁自旋等等。这就使得 synchronized 能够随着 JDK 版本的升级而不改动代码的前提下获得性能上的提升。

5. 小结&参考资料

小结

可以看出 JDK1.8 版本的 ConcurrentHashMap 的数据结构已经非常接近 HashMap,相对而言,ConcurrentHashMap 只是增加了同步的操作来控制并发,从 JDK1.7 版本的 ReentrantLock+Segment+HashEntry,到 JDK1.8 版本中synchronized+CAS+Node+红黑树,优化确实很大。

HashMap中的key和value都允许为null,而ConcurrentHashMap 中的key和value都不允许为null

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ConcurrentHashMap概述 的相关文章

  • Java模拟一个简单的双向链表

    Java模拟一个简单的双向链表 1 链表结构 Node实体类代码 public class Node public Object item 存放数据的地方 public Node next 指向下一个结点 public Node pre 指
  • arduino 土壤温湿度传感器_Arduino温度湿度传感器-Moisture Sensor土壤湿度传感器

    外观 概述 这是一个简易的水分传感器可用于检测土壤的水分 当土壤缺水时 传感器输出值将减小 反之将增大 使用这个传感器制作一款自动浇花装置 让您的花园里的植物不用人去管理 传感器表面做了镀金处理 可以延长它的使用寿命 将它插入土壤 然后使用
  • WPF软件导致Win10系统的平板电脑小键盘自动隐藏问题

    在Win10系统下 开发WPF软件的时候 点入TextBox框内的时候 出现小键盘自动隐藏问题 通过检查发现 附加属性里面 触发了以下代码 从而触发小键盘隐藏 InputMethod SetIsInputMethodEnabled Asso
  • 华为OD机试真题-自动曝光 【2023.Q1】

    题目内容 一个图像有n个像素点 存储在一个长度为n的数组img里 每个像素点的取值范围 0 255 的正整数 请你给图像每个像素点值加上一个整数k 可以是负数 得到新图newImg 使得新图newImg的所有像素平均值最接近中位值128 请
  • Onvif协议学习:8、设备校时

    Onvif协议学习 8 设备校时 文章目录 Onvif协议学习 8 设备校时 1 编码流程 2 注意事项 3 示例代码 原文链接 https blog csdn net benkaoya article details 72486511 1
  • 学习ShaderToy第一天: glsl语言内置函数gl_FragCoord

    运行环境 Android opengl es版本 2 0 3D引擎库 Rajawali3D ShaderToy上用的shader语言 为glsl 效果是用webgl跑的 而webgl封装了opengl es 所以ShaderToy上的例子同
  • Pillow库 三分钟带你了解最基础的使用

    努力是为了不平庸 学习的最大理由是想摆脱平庸 早一天就多一份人生的精彩 迟一天就多一天平庸的困扰 目录 一 Pillow库是什么 二 以下是 Pillow 的一些主要作用和使用方法的概述 三 学习使用 Pillow 一个强大的 Python
  • 拦截器反射机制/动态代理(拦截器。通过接口调实现类也是反射实现的?)和代理模式

    反射机制 1 获得Class的实例c 如Class forName 包路径 类名 2 创建对象 1 c newInstance 直接调用无参构造函数创建对象 已过时 2 先获取构造函数再创建对象 getDeclaredConstructor
  • PHP2详细解析,(响应码,url编码,御剑)

    标题1 打开之后各种手段都尝试 抓包 源码为协议等等 结果还是不出现 然后就想到用到最近刚 下载的工具御剑来扫一下目录 发现御剑依靠的是字典进行扫描 其实这个index phps也是自己添加的字典有限 所以还是学习用一下dirsearch把
  • sublime列编辑状态

    从notepad 转到sublime 感觉sublime的列编辑没有notepad 好用 一个是先入为主的问题 notepad 进行列编辑是alt 左键 notepad 进行列选择时 选择的行没有编辑也是可以选中的 sublime只能选中编
  • 量化交易策略干货收集

    量化交易策略 价值投资 成长股内在价值投资 http www joinquant com post 541 三一投资管理公司价值选股法 http www joinquant com post 556 低估价值选股策略 http www jo
  • 网易笔试编程题

    下厨房 题目描述 牛牛想尝试一些新的料理 每个料理需要一些不同的材料 问完成所有的料理需要准备多少种不同的材料 输入描述 每个输入包含 1 个测试用例 每个测试用例的第 i 行 表示完成第 i 件料理需要哪些材料 各个材料用空格隔开 输入只
  • jenkins -- send files or execute commands over ssh

    新建一个任务free 我之前已经建好 其余的保持默认 保存或应用 回到任务 开始构建 查看日志 传输成功 在服务器上 查看对应的路径 a js 确实已经传送成功
  • signature=8dfeb54e036883a518e97630c0013eed,unit3.lfm · BLumia/BLumiaTimidityShell - Gitee.com

    object Form3 TForm3 Left 357 Height 211 Top 100 Width 578 BorderStyle bsDialog Caption About ClientHeight 211 ClientWidt
  • RelativeLayout相对布局

    相对布局 RelativeLayout 允许子元素指定它们相对于其父元素或兄弟元素的位置 这是实际布局中最常用的布局方式之一 它灵活性大很多 当然属性也多 操RelativeLayout相对布局 相对布局 RelativeLayout 允许
  • 「软件测试」最全面试问题和回答,全文背熟不拿下offer算我输

    一般要应聘关于测试的工作 面试题会不会很难 下面小编整理了软件测试面试题及答案 欢迎参考 一 引言 1 1 文档目的 本次文档是为了收集在面试中遇到的一问题与常见的一些答案并不是唯一答案 二 职业规划 2 1 简单的自我介绍下 面试宫 您好
  • 对注意力机制(Attention)的一些理解附上Bi-LSTM和seq2seq加上注意力机制的代码

    注意力机制 简单来说就是让模型能够学会具体问题具体分析吧 比如问你是个水果大师 别人问你这个苹果怎么样 那总得盯着苹果端详吧 而不是去看那个西瓜 理解的应该差不太多吧 这个是从b站看的一个手推注意力机制的视频 照着画了一遍 感觉大概也是明白
  • Java可视化界面设计(登录界面设计)

    1 界面居中显示 frame setResizable false fame setLocationRelativeTo null frame setVisible true 2 全屏操作 Dimension screenSize Tool
  • 联想微型计算机C470拆装,联想C470一体机一键U盘重装系统教程图解

    联想C470一体机造型小巧 外观唯美时尚 易于摆放并能脱离冗杂线缆的束缚 该机是一款非常时尚的家用一体电脑 采用21 5英寸触控屏幕 全高清显示相当精细 无论是学习办公 还是家庭娱乐都能够满足用户的需求 下面给大家介绍联想C470一体机一键

随机推荐

  • 数的分解

    题目描述 本题为填空题 只需要算出结果后 在代码中使用输出语句将所填结果输出即可 把 20192019 分解成 33 个各不相同的正整数之和 并且要求每个正整数都不包含数字 22 和 44 一共有多少种不同的分解方法 注意交换 33 个整数
  • 七牛产品概览

    七牛云产品概览 服务对象 个人开发者 创业团队 企业用户 对象存储 Kodo 简介 七牛云提供的数据存储服务主要是针对静态资源文件 image js css 音频 视频 文档 PDF txt json xml yml apk 等等 提供存储
  • Java中string的null和“”对比

    Java中字符串的比对用string equals object 来做 但与空字符串比对的时候要注意 如果是 话 用string equals 如果是null的时候 string equals null 会报错 应该使用string nul
  • SYSAUX表空间清理之WRH$_ACTIVE_SESSION_HISTORY

    查看sysaux表空间使用率高 对于sysaux表空间之前有文章讨论过 本次直入正题 1 检查sysaux表空间占用空间较大的segments SQL gt select from select owner segment name seg
  • mysql while bug_案例分享:MySQL BUG处理

    近一个月处理历史数据问题时 居然连续遇到了2个MySQL BUG 分享给大家一下 也欢迎指正是否有问题 BUG1 数据库版本 MySQL5 7 25 28 操作系统 Centos 7 7 不重要 数据库架构 主 从 级联从 数据库参数 in
  • Java 冒泡排序示例

    以下是 Java 语言实现冒泡排序的示例代码 public class BubbleSort public static void main String args int arr 5 2 8 3 9 1 bubbleSort arr Sy
  • 64、3D Neural Scene Representations for Visuomotor Control

    简介 主页 https 3d representation learning github io nerf dy 机器人操作模型学习的核心问题之一是如何确定 dynamics model 的状态表示 理想的表示应该易于捕捉环境动态 展示对场
  • CDN是做什么用的,怎么一直有人在推荐使用?

    CDN 内容分发网络 的作用与不断的推荐使用背后有着深刻的原因 这是因为CDN在互联网领域发挥着重要且多方面的作用 为许多网站和在线业务提供了显著的优势 首先 让我们来了解CDN的作用是什么 CDN是一种网络架构 旨在将网站的静态资源 如图
  • 100级小号搬砖地图_DNF100版本搬砖地图最高收益攻略(利润化透明)

    DNF搬砖那个地图收益最高 100版本搬砖攻略 大家都知道DNF是一款经典的手游 到现在已经运营十年了 同时也是一款氪金的 当然对于神豪来说都是小问题 但是对于一般的玩家 想玩DNF建议还是先去医院检查一下肝 肝不好建议别入坑 今天小编就给
  • 数学的科普文

    20210105 这次又是整理自己的书签 然后发现了这个文章 这个文章应该是很久之前的时候我看到的 觉得很有趣 应该是当时学习最小二乘法的时候看到的 所以这篇文章就来记录一些平时看到的不错的科普文 以前很多文章都错过了 挺可惜的 正态分布的
  • React Hook的useCallback,memo,usememo的使用

    1 useCallback 每当组件重新渲染的时候 我们之前定义的函数就会被重新声明一次 即使这个函数不需要做出改变 这时可以使用useCallback useCallback主要用于缓存一个函数 useCallback接收两个参数 第一个
  • Apriori算法完整代码

    文章目录 apriori py apriori py usr bin env python coding utf 8 from numpy import 加载数据集 def loadDataSet return 1 3 4 2 3 5 1
  • 深度学习与计算机视觉系列(5)_反向传播与它的直观理解

    作者 寒小阳 龙心尘 时间 2015年12月 出处 http blog csdn net han xiaoyang article details 50321873 声明 版权所有 转载请联系作者并注明出处 1 引言 其实一开始要讲这部分内
  • javaScript:宏任务与微任务的运行顺序

    在写代码的时候 我使用Element组件中的表单重置方法 和vue中子传父方法 这里我想要通过 async 和 await 的特点 await 下一行代码作为微任务执行 来规定代码的执行顺序 让重置方法 滞后 于数据传递 执行 确定按钮 子
  • Java底层原理——HashMap面试问题

    什么时候会用到HashMap 他有什么特点 是基于Map接口实现的 存储键值对时 可以接收null的键值 是非同步的 HashMap存储着Entry hash key value next 对象 你知道HashMap的工作原理吗 通过has
  • Linux驱动

    一 前言 设备树是每一个Linux驱动工程师都必须掌握的一个知识点 有很多之前做单片机的朋友刚接触Linux驱动时 会一脸懵 其实设备树的使用并没有大家想像的那么复杂 对于大部分工程师来说 只要会修改即可 很多粉丝留言说 希望彭老师提供一个
  • 记录CTF命令执行练习中遇到的几道题(一些PHP命令过滤的绕过方法)

    题目1 if isset GET Command GET Command command GET Command if preg match f a g flag cat tac more ls system exec popen pass
  • 在SQL中寻找唯一记录的3种终极方法

    在SQL中寻找唯一记录的3种终极方法 停止使用DISTINCT 开始使用这些快速替代方法 以避免混淆 照片 Luis Cortes on Unsplash 不使用DISTINCT关键字就能获得唯一记录 在你的数据分析项目中 只要你需要从数据
  • Windows server 2008 R2远程桌面3389端口号修改及远程桌面窗口大小调整

    转自 https help aliyun com document detail 51644 html spm 5176 doc51644 6 784 4iAHWH 修改 Windows 服务器默认远程端口 操作步骤 远程连接并登录到 Wi
  • ConcurrentHashMap概述

    1 前言 为什么要使用 ConcurrentHashMap 主要基于两个原因 在并发编程中 jdk1 7的情况下使用 HashMap 可能造成死循环 而jdk1 8 中有可能会造成数据丢失 HashTable 效率非常低下 2 Concur