concurrentHashMap解析这篇文章就够了

2023-11-09

实现原理

ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。如下图是ConcurrentHashMap的内部结构图:

从图中可以看到,ConcurrentHashMap内部分为很多个Segment,每一个Segment拥有一把锁,然后每个Segment(继承ReentrantLock)下面包含很多个HashEntry列表数组。对于一个key,需要经过三次(为什么要hash三次下文会详细讲解)hash操作,才能最终定位这个元素的位置,这三次hash分别为:

  • 对于一个key,先进行一次hash操作,得到hash值h1,也即h1 = hash1(key);
  • 将得到的h1的高几位进行第二次hash,得到hash值h2,也即h2 = hash2(h1高几位),通过h2能够确定该元素的放在哪个Segment;
  • 将得到的h1进行第三次hash,得到hash值h3,也即h3 = hash3(h1),通过h3能够确定该元素放置在哪个HashEntry。

初始化

构造函数的源码如下:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

 

public ConcurrentHashMap(int initialCapacity,

float loadFactor, int concurrencyLevel) {

if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)

throw new IllegalArgumentException();

if (concurrencyLevel > MAX_SEGMENTS)

concurrencyLevel = MAX_SEGMENTS;

// Find power-of-two sizes best matching arguments

int sshift = 0;

int ssize = 1;

while (ssize < concurrencyLevel) {

++sshift;

ssize <<= 1;

}

this.segmentShift = 32 - sshift;

this.segmentMask = ssize - 1;

if (initialCapacity > MAXIMUM_CAPACITY)

initialCapacity = MAXIMUM_CAPACITY;

int c = initialCapacity / ssize;

if (c * ssize < initialCapacity)

++c;

int cap = MIN_SEGMENT_TABLE_CAPACITY;

while (cap < c)

cap <<= 1;

// create segments and segments[0]

Segment<K,V> s0 =

new Segment<K,V>(loadFactor, (int)(cap * loadFactor),

(HashEntry<K,V>[])new HashEntry[cap]);

Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]

this.segments = ss;

}

传入的参数有initialCapacity,loadFactor,concurrencyLevel这三个。

  • initialCapacity表示新创建的这个ConcurrentHashMap的初始容量,也就是上面的结构图中的Entry数量。默认值为static final int DEFAULT_INITIAL_CAPACITY = 16;
  • loadFactor表示负载因子,就是当ConcurrentHashMap中的元素个数大于loadFactor * 最大容量时就需要rehash,扩容。默认值为static final float DEFAULT_LOAD_FACTOR = 0.75f;
  • concurrencyLevel表示并发级别,这个值用来确定Segment的个数,Segment的个数是大于等于concurrencyLevel的第一个2的n次方的数。比如,如果concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为static final int DEFAULT_CONCURRENCY_LEVEL = 16;。理想情况下ConcurrentHashMap的真正的并发访问量能够达到concurrencyLevel,因为有concurrencyLevel个Segment,假如有concurrencyLevel个线程需要访问Map,并且需要访问的数据都恰好分别落在不同的Segment中,则这些线程能够无竞争地自由访问(因为他们不需要竞争同一把锁),达到同时访问的效果。这也是为什么这个参数起名为“并发级别”的原因。

初始化的一些动作:

  • 验证参数的合法性,如果不合法,直接抛出异常。
  • concurrencyLevel也就是Segment的个数不能超过规定的最大Segment的个数,默认值为static final int MAX_SEGMENTS = 1 << 16;,如果超过这个值,设置为这个值。
  • 然后使用循环找到大于等于concurrencyLevel的第一个2的n次方的数ssize,这个数就是Segment数组的大小,并记录一共向左按位移动的次数sshift,并令segmentShift = 32 - sshift,并且segmentMask的值等于ssize - 1,segmentMask的各个二进制位都为1,目的是之后可以通过key的hash值与这个值做&运算确定Segment的索引。
  • 检查给的容量值是否大于允许的最大容量值,如果大于该值,设置为该值。最大容量值为static final int MAXIMUM_CAPACITY = 1 << 30;。
  • 然后计算每个Segment平均应该放置多少个元素,这个值c是向上取整的值。比如初始容量为15,Segment个数为4,则每个Segment平均需要放置4个元素。
  • 最后创建一个Segment实例,将其当做Segment数组的第一个元素。

put操作

put操作的源码如下:

 

1

2

3

4

5

6

7

8

9

10

11

 

public V put(K key, V value) {

Segment<K,V> s;

if (value == null)

throw new NullPointerException();

int hash = hash(key);

int j = (hash >>> segmentShift) & segmentMask;

if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck

(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment

s = ensureSegment(j);

return s.put(key, hash, value, false);

}

操作步骤如下:

  • 判断value是否为null,如果为null,直接抛出异常。
  • key通过一次hash运算得到一个hash值。(这个hash运算下文详说)
  • 将得到hash值向右按位移动segmentShift位,然后再与segmentMask做&运算得到segment的索引j。
    在初始化的时候我们说过segmentShift的值等于32-sshift,例如concurrencyLevel等于16,则sshift等于4,则segmentShift为28。hash值是一个32位的整数,将其向右移动28位就变成这个样子:
    0000 0000 0000 0000 0000 0000 0000 xxxx,然后再用这个值与segmentMask做&运算,也就是取最后四位的值。这个值确定Segment的索引。
  • 使用Unsafe的方式从Segment数组中获取该索引对应的Segment对象。
  • 向这个Segment对象中put值,这个put操作也基本是一样的步骤(通过&运算获取HashEntry的索引,然后set)。

get操作

get操作的源码如下:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

 

public V get(Object key) {

Segment<K,V> s; // manually integrate access methods to reduce overhead

HashEntry<K,V>[] tab;

int h = hash(key);

long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&

(tab = s.table) != null) {

for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile

(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);

e != null; e = e.next) {

K k;

if ((k = e.key) == key || (e.hash == h && key.equals(k)))

return e.value;

}

}

return null;

}

操作步骤为:

  • 和put操作一样,先通过key进行两次hash确定应该去哪个Segment中取数据。
  • 使用Unsafe获取对应的Segment,然后再进行一次&运算得到HashEntry链表的位置,然后从链表头开始遍历整个链表(因为Hash可能会有碰撞,所以用一个链表保存),如果找到对应的key,则返回对应的value值,如果链表遍历完都没有找到对应的key,则说明Map中不包含该key,返回null。

size操作

size操作与put和get操作最大的区别在于,size操作需要遍历所有的Segment才能算出整个Map的大小,而put和get都只关心一个Segment。假设我们当前遍历的Segment为SA,那么在遍历SA过程中其他的Segment比如SB可能会被修改,于是这一次运算出来的size值可能并不是Map当前的真正大小。所以一个比较简单的办法就是计算Map大小的时候所有的Segment都Lock住,不能更新(包含put,remove等等)数据,计算完之后再Unlock。这是普通人能够想到的方案,但是牛逼的作者还有一个更好的Idea:先给3次机会,不lock所有的Segment,遍历所有Segment,累加各个Segment的大小得到整个Map的大小,如果某相邻的两次计算获取的所有Segment的更新的次数(每个Segment都有一个modCount变量,这个变量在Segment中的Entry被修改时会加一,通过这个值可以得到每个Segment的更新操作的次数)是一样的,说明计算过程中没有更新操作,则直接返回这个值。如果这三次不加锁的计算过程中Map的更新次数有变化,则之后的计算先对所有的Segment加锁,再遍历所有Segment计算Map大小,最后再解锁所有Segment。源代码如下:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

 

public int size() {

// Try a few times to get accurate count. On failure due to

// continuous async changes in table, resort to locking.

final Segment<K,V>[] segments = this.segments;

int size;

boolean overflow; // true if size overflows 32 bits

long sum; // sum of modCounts

long last = 0L; // previous sum

int retries = -1; // first iteration isn't retry

try {

for (;;) {

if (retries++ == RETRIES_BEFORE_LOCK) {

for (int j = 0; j < segments.length; ++j)

ensureSegment(j).lock(); // force creation

}

sum = 0L;

size = 0;

overflow = false;

for (int j = 0; j < segments.length; ++j) {

Segment<K,V> seg = segmentAt(segments, j);

if (seg != null) {

sum += seg.modCount;

int c = seg.count;

if (c < 0 || (size += c) < 0)

overflow = true;

}

}

if (sum == last)

break;

last = sum;

}

} finally {

if (retries > RETRIES_BEFORE_LOCK) {

for (int j = 0; j < segments.length; ++j)

segmentAt(segments, j).unlock();

}

}

return overflow ? Integer.MAX_VALUE : size;

}

举个例子:

一个Map有4个Segment,标记为S1,S2,S3,S4,现在我们要获取Map的size。计算过程是这样的:第一次计算,不对S1,S2,S3,S4加锁,遍历所有的Segment,假设每个Segment的大小分别为1,2,3,4,更新操作次数分别为:2,2,3,1,则这次计算可以得到Map的总大小为1+2+3+4=10,总共更新操作次数为2+2+3+1=8;第二次计算,不对S1,S2,S3,S4加锁,遍历所有Segment,假设这次每个Segment的大小变成了2,2,3,4,更新次数分别为3,2,3,1,因为两次计算得到的Map更新次数不一致(第一次是8,第二次是9)则可以断定这段时间Map数据被更新,则此时应该再试一次;第三次计算,不对S1,S2,S3,S4加锁,遍历所有Segment,假设每个Segment的更新操作次数还是为3,2,3,1,则因为第二次计算和第三次计算得到的Map的更新操作的次数是一致的,就能说明第二次计算和第三次计算这段时间内Map数据没有被更新,此时可以直接返回第三次计算得到的Map的大小。最坏的情况:第三次计算得到的数据更新次数和第二次也不一样,则只能先对所有Segment加锁再计算最后解锁。

关于hash

术语 英文 解释
哈希算法 hash algorithm 是一种将任意内容的输入转换成相同长度输出的加密方式,其输出被称为哈希值。
哈希表 hash table 根据设定的哈希函数H(key)和处理冲突方法将一组关键字映象到一个有限的地址区间上,并以关键字在地址区间中的象作为记录在表中的存储位置,这种表称为哈希表或散列,所得存储位置称为哈希地址或散列地址。

hash的源代码:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

 

private int hash(Object k) {

int h = hashSeed;

if ((0 != h) && (k instanceof String)) {

return sun.misc.Hashing.stringHash32((String) k);

}

h ^= k.hashCode();

// Spread bits to regularize both segment and index locations,

// using variant of single-word Wang/Jenkins hash.

h += (h << 15) ^ 0xffffcd7d;

h ^= (h >>> 10);

h += (h << 3);

h ^= (h >>> 6);

h += (h << 2) + (h << 14);

return h ^ (h >>> 16);

}

这里用到了Wang/Jenkins hash算法的变种,主要的目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。

举个简单的例子:

 

1

2

3

4

 

System.out.println(Integer.parseInt("0001111", 2) & 15);

System.out.println(Integer.parseInt("0011111", 2) & 15);

System.out.println(Integer.parseInt("0111111", 2) & 15);

System.out.println(Integer.parseInt("1111111", 2) & 15);

这些数字得到的hash值都是一样的,全是15,所以如果不进行第一次预hash,发生冲突的几率还是很大的,但是如果我们先把上例中的二进制数字使用hash()函数先进行一次预hash,得到的结果是这样的:

 

1

2

3

4

 

0100|0111|0110|0111|1101|1010|0100|1110

1111|0111|0100|0011|0000|0001|1011|1000

0111|0111|0110|1001|0100|0110|0011|1110

1000|0011|0000|0000|1100|1000|0001|1010

可以看到每一位的数据都散开了,并且ConcurrentHashMap中是使用预hash值的高位参与运算的。比如之前说的先将hash值向右按位移动28位,再与15做&运算,得到的结果都别为:4,15,7,8,没有冲突!

注意事项

ConcurrentHashMap中的key和value值都不能为null。
ConcurrentHashMap的整个操作过程中大量使用了Unsafe类来获取Segment/HashEntry,这里Unsafe的主要作用是提供原子操作。Unsafe这个类比较恐怖,破坏力极强,一般场景不建议使用,如果有兴趣可以到这里做详细的了解Java中鲜为人知的特性
ConcurrentHashMap是线程安全的类并不能保证使用了ConcurrentHashMap的操作都是线程安全的!

参考

深入分析ConcurrentHashMap

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

concurrentHashMap解析这篇文章就够了 的相关文章

随机推荐

  • vscode使用ssh远程连接服务器时出现timeout情况的解决方法汇总

    背景 mobaxterm通过ssh能正常连接服务器 而在vscode里远程连接服务器时则提示连接超时 解决方法一 win R打开运行命令框 输入cmd打开命令提示符 在下图红圈位置右键然后选择属性 选择取消 使用旧控制台 U 解决方法二 右
  • Visual Studio 2022 include和lib路径问题

    最近安装了Visual Studio 2022 想试下opengl 首先是用cmake尝试编译 结果编译不过 一直报错 LINK fatal error LNK1104 无法打开文件 ucrtd lib 然后我新建了一个工程 导入了glfw
  • Xshell 6安装和使用教程

    作者 翁松秀 Xshell 6安装和使用教程 工欲善其事必先利其器 Xshell 6安装和使用教程 一 XShell 6的安装 二 XShell 6的使用 创建链接 快捷键 主题 三 XFTP的下载和安装 一 XShell 6的安装 到官网
  • 数据挖掘---分类算法之支持向量机SVM

    上篇已经简单的说了下支持向量机的理论 里面有不少公式 需要肯学习的你一步步的推导试一试 说实在的还是挺能考验数学能力的 当年俺老孙就是一步步的推导过 只有这样你才能对这个过程有清晰的认识 才能对这个算法有所体会 下面来举个例子 说说用支持向
  • call、apply和bind的区别

    首先说说call apply和bind的作用吧 它们的作用都是相同的 都是动态的修改当前函数内部环境对象this的指向 call apply和bind区别 相同点 作用相同 都是动态修改this指向 都不会修改原先函数的this指向 异同点
  • 使用学生邮箱注册jetbrains

    1 到 https www jetbrains com zh student 点击申请按钮开始申请 2 在表单上方选择申请方式填写相关信息后提交申请 使用校园邮箱 就是广外邮箱 格式是学号加 gdufs edu cn 例如201810023
  • 二分查找 递归

    碎碎念念 假设我们要在一个升序排序的整型数组中查找某个特定的整数 如果找到了 返回该整数在数组中的索引号 如果没有找到 则返回 1 我们首先看要找的数和数组中间的数的大小关系 如果相等 那么说明找到了 如果要找的数小于数组中间的数 那么我们
  • 由集成运放构成的基本运算电路

    一 比例运算放大器 1 同相比例运算放大器 2 反相比例运算放大器 二 相加器 1 反向相加器 2 同向相加器 3 相减器 根据叠加原理 如上相减器可拆分为如下两个相加器 第一图 第二图 三 积分器 1 简单积分器 2 差动积分器 四 微分
  • 模拟上传文件至服务器(解决socket中read阻塞问题)

    客户端 package cn dali4 code03ex import java io FileInputStream import java io IOException import java io InputStream impor
  • python一行写不下,变多行

    python里一行写不下 拆成多行 和 两种方法 在一行末尾 加上 也就是空格加上 a sdfaf test 注意两个对象都要独立 字符串必须都用双引号引起 如果是if and 后加 其实用括号也可以 比如 a sdfaf test 或者
  • c语言实现学生管理系统,C语言学生管理系统源代码

    C语言学生管理系统源代码 由会员分享 可在线阅读 更多相关 C语言学生管理系统源代码 12页珍藏版 请在人人文库网上搜索 1 C语言学生成绩管理系统源代码 保证能用 include malloc h include stdio h incl
  • 【LVGL 学习】LVGL 在 arduino 环境的安装

    1 前提条件 使用 arduino IDE开发 使用 ESP32 作为主控 屏幕使用 ST7789 驱动 240 240像素TFT屏幕 注意 屏幕驱动部分不再这个赘述 以后开贴另行发布 2 安装 LVGL 库 打开 arduino 菜单栏中
  • 【Python】熵值法计算权重

    Python 熵值法计算权重 将分步骤基于python实现熵值法计算权重 代码在pycharm中执行 文章目录 Python 熵值法计算权重 1 引入库 2 读取数据 3 熵值法主体 4 打印出各指标的权重 5 将结果存储至csv文件中 总
  • 数据库 SQL 遍历父子关系表(二叉树)获得所有子节点 所有父节点

    数据库 SQL 遍历父子关系表 二叉树 获得所有子节点 所有父节点 创建表 Create Table A IDInt fatherIDInt NameVarchar 10 Insert A Select 1 NULL tt Union Al
  • 使用Vector模拟实现STL中的stack

    stack 介绍 栈是一种容器适配器 特别为后入先出而设计的一种 LIFO 那种数据被插入 然后再容器末端取出 栈实现了容器适配器 这是用了一个封装了的类作为他的特定容器 提供了一组成员函数去访问他的元素 元素从特定的容器 也就是堆栈的头取
  • python之requests模块详解

    目录 requests使用 requests请求方法 requests响应对象属性 Requests模块是一个用于网络请求的模块 主要用来模拟浏览器发请求 其实类似的模块有很多 比如urllib urllib2 httplib httpli
  • 微信公众号【OpenID详解】

    只知道 openID 是微信号加密后得到的 不同的公众号获取得微信号openID不同 但 UnionID 是一样的 微信openid由用户id和公众号id加密而来 同一用户相对同一公众账号的openid是不变的 对于不同公众号 同一用户的o
  • Learning OpenStack Keystone

    Author 海峰 http weibo com 344736086 http yanheven github io http blog csdn net yanheven1 这周重新学习整理了OpenStack Keystone里面的知识
  • CentOS没有了用什么?Rocky Linux 8.6安装体验

    2020 年 12 月 8 日 CentOS 项目宣布 CentOS 8 将于 2021 年底结束 而 CentOS 7 将在其生命周期结束后停止维护 CentOS 7 9 和 CentOS 8 5 将是最后的2个CentOS 版本 官方解
  • concurrentHashMap解析这篇文章就够了

    实现原理 ConcurrentHashMap使用分段锁技术 将数据分成一段一段的存储 然后给每一段数据配一把锁 当一个线程占用锁访问其中一个段数据的时候 其他段的数据也能被其他线程访问 能够实现真正的并发访问 如下图是ConcurrentH