【中】ConcurrentHashMap源码阅读
https://www.bilibili.com/video/BV163YyzbE9g
一、添加元素
ConcurrentHashMap
之所以是安全的map
就是因为它在put
的时候进行了锁处理,下面是整个put的过程,基本上都写了注释,看完之后可以帮助你更好的理解它的原理。
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许null值和nullkey
if (key == null || value == null) throw new NullPointerException();
// 使用key计算出hash值
// (h ^ (h >>> 16)) & HASH_BITS (HASH_BITS = 0x7fffffff)
int hash = spread(key.hashCode());
int binCount = 0;
// 死循环,直到 break 跳出
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组为空,就初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 通过计算((n - 1) & hash)去判断当位置是否已经有值了
// tabAt方法:安全地读取数组中指定位置的 Node 节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
/*
* casTabAt: 原子性地更新数组中指定位置的 Node 节点
*
* 底层是调用了native的方法:compareAndSwapObject(obj,i,expect,update)
* 如果 obj的 i位置的值等于 expect,那么就更新为 update
*
* 因为这个地方是把数据添加到 null 位置,所以不需要加锁(可以看到 expect为null)
*/
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // no lock when adding to empty bin
break;
}
// MOVED = -1,表示当前节点已经被迁移了,helpTransfer判断当前线程是否需要协助扩容,下面讲什么是协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 如果添加的key已经存在,就锁住进行修改
synchronized (f) {
// 双重检查: 再次判断下当前的值是否被改动过
if (tabAt(tab, i) == f) {
// 当数据量很大的时候可能得出的hashcode为负数
if (fh >= 0) {
binCount = 1;
// 循环当前链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果当前节点key的hashcode和新增key的相同,并且key值相同,则覆盖
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 如果已经循环到最后一个节点还是没有找到相同的key,就添加到链表尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 判断是否是树结构(红黑树)
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
/*
* putTreeVal: 添加或者找到该节点,如果是添加就返回 null
*
* 这个方法的意思是如果找到了该节点,就替换它的 value
*/
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 判断达到了最大长度,是就转化成树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 记录数量并检查是否需要扩容
addCount(1L, binCount);
return null;
}
总结:ConcurrentHashMap 添加元素的过程
- 先把当前key的hash进行扰乱,再用
(n - 1) & hash
计算当前应该添加到哪个位置。使用安全的方式获取当前位置的节点 (tabAt
,底层调用 Unsafe),如果当前节点是 NULL,就用乐观锁设置当前数据(casTabAt
) - 如果获取到当前位置的节点
hash==-1
,就说明正在进行扩容,当前线程也会去借助进行扩容,并完成元素的添加 - 如果是常规的添加元素,先会用当前位置的头节点进行加锁,再使用 tabAt方法双重检查元素。循环当前节点的链表把元素插入到对应的位置(当前链表可能是普通链表也可能是红黑树)
- 元素插入完成之后,会调用addCount方法,记录数量并检查是否需要扩容
二、扩容
在开始之前需要了解一个重要的变量
/*
* 当为负数时:表示正在初始化或扩容
* -1: 正在初始化
* 其他负值: -(1 + 正在扩容的线程数)
* 当为0时:使用默认容量
* 当为正数时:下次扩容的阈值
*/
private transient volatile int sizeCtl;
2-1、首次扩容
从上面可知,元素添加完之后,就会调用这个 addCount
方法
private final void addCount(long x, int check) {
// ... 省略其它
// 统计Map中的元素
s = sumCount();
// check 表示当前插入元素的节点位置,肯定大于 0
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// s > sizeCtl 表示需要进行扩容了
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
// 为当前容量n生成唯一的扩容标识戳
int rs = resizeStamp(n);
// 看上面 sizeCtl 为负数(不为-1)的时候,表示正在扩容
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 记录当前的扩容线程 +1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// sc 大于0,表示现在是首次进行扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
transfer 方法就是扩容的方法
2-2、协助扩容
在以往的认知里,扩容就是一下子的事。但 ConcurrentHashMap 是多线程的,扩容的过程中还会有其它线程来操作这个Map,为了更好的并发性,扩容的时候并会直接对整个Map进行加锁
ConcurrentHashMap 扩容是把数组中的每一个链表,一个个进行复制到新的链表里面去,每完成一个链表的复制就把原本的链表头设置为 ForwardingNode
(它的hash写死了为 -1)
上面在添加元素的过程中得知,每次加锁都是锁某一个节点。假设A线程正在扩容,B线程要操作的节点是 「节点B」
- 「节点B」= 普通节点,且没有当前节点没有被锁住, 可以继续完成插入
- 「节点B」= ForwardingNode,线程B 加入扩容
- 「节点B」= 普通节点,A线程正在对这个节点进行复制,线程A会锁住这个节点。 B线程等待,等A线程释放锁,B得到锁之后,双重检查的发现已经不是之前的节点了,再次循环,可能出现如下情况
- 扩容已经完成了,B线程正常对新的节点加锁,添加数据
- 扩容还在进行,节点已经是 ForwardingNode,线程B 加入扩容,扩容完之后再添加元素到新的链表
下面对三种情况进行代码说明
情况1:「节点B」= 普通节点,可以继续完成插入
// Thread-B访问slot[i],发现是普通Node
Node<K,V> f = tabAt(tab, i);
if (f != null && f.hash >= 0) { // 普通节点
synchronized (f) {
if (tabAt(tab, i) == f) { // 双重检查通过
// 正常插入到链表或红黑树
// Thread-A的扩容会跳过这个slot,等Thread-B完成
}
}
}
情况2:「节点B」= ForwardingNode,线程B加入扩容
// Thread-B访问slot[i],发现是ForwardingNode
else if ((fh = f.hash) == MOVED) {
// 该节点已被迁移,协助扩容
tab = helpTransfer(tab, f);
// 扩容完成后,在新数组上重新尝试插入
}
情况3:竞争同一个节点的详细过程
// 时刻T1:Thread-A扩容到slot[i]
Thread-A: f = tabAt(tab, i); // f = 普通Node
Thread-A: synchronized(f) { // 获得锁,开始迁移
// 时刻T2:Thread-B要插入到同一个slot[i]
Thread-B: f = tabAt(tab, i); // f = 同一个普通Node
Thread-B: synchronized(f) { // 尝试获得锁,被阻塞 ⭐
// 时刻T3:Thread-A完成迁移
Thread-A: // 数据迁移到新数组...
Thread-A: setTabAt(tab, i, forwardingNode); // 设置ForwardingNode
Thread-A: } // 释放锁
// 时刻T4:Thread-B获得锁
Thread-B: synchronized(f) {
Thread-B: if (tabAt(tab, i) == f) { // ⭐ 双重检查失败!
Thread-B: // tabAt(tab, i) 现在是ForwardingNode
Thread-B: // 但f还是原来的普通Node
Thread-B: // 条件为false,不执行插入
Thread-B: }
Thread-B: } // 释放锁,重新循环
情况3-1:扩容已完成,正常插入新数组
// Thread-B重新循环,table已经指向新数组了
Thread-B: for (Node<K,V>[] tab = table;;) { // 重新开始
Thread-B: // table现在指向新数组
Thread-B: f = tabAt(tab, newIndex); // 在新数组中计算位置
Thread-B: // 正常插入流程...
情况3-2:扩容仍在进行,协助扩容。扩容完之后再添加元素到新的链表
// Thread-B重新循环
Thread-B: f = tabAt(tab, i); // 现在是ForwardingNode
Thread-B: if (f.hash == MOVED) {
Thread-B: tab = helpTransfer(tab, f); // 协助扩容
Thread-B: // 扩容完成后继续在新数组操作
Thread-B: }
双重检查的重要性
synchronized (f) {
if (tabAt(tab, i) == f) { // ⭐ 这个检查至关重要
// 只有节点没有被替换时才执行操作
}
}
扩容的简洁版
// 时间点1:线程A插入第13个元素,触发扩容
Thread-A: addCount() -> sizeCtl从12变为负数 -> transfer()开始
// 时间点2:线程B要插入元素到slot[10](未迁移)
Thread-B: putVal() -> slot[10]是普通节点 -> synchronized(f) -> 正常插入
// 时间点3:线程C要插入元素到slot[15](已迁移)
Thread-C: putVal() -> slot[15]是ForwardingNode -> helpTransfer() -> 协助扩容
// 时间点4:线程D要插入元素到slot[3](正在迁移)
Thread-D: putVal() -> slot[3]可能是Node或ForwardingNode,根据情况处理
// 时间点5:扩容完成
All-Threads: table指向新数组,sizeCtl恢复为正数(24),后续操作在新数组上进行
2-3、扩容流程
transfer 方法很长,这里只给出一些关键点
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length;
// 【关键节点1】计算每个线程的工作量(步长)
int stride = (NCPU > 1) ? (n >>> 3) / NCPU : n;
if (stride < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // 最小16
// 【关键节点2】第一个线程创建新数组(容量翻倍)
if (nextTab == null) { // 只有第一个线程会进入
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 2倍容量
nextTab = nt;
nextTable = nextTab; // 设置全局引用
transferIndex = n; // 从数组末尾开始分配任务
}
// 【关键节点3】创建ForwardingNode占位符
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 【关键节点4】CAS任务分配机制
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing) {
advance = false; // 当前线程有任务或收尾中
}
else if ((nextIndex = transferIndex) <= 0) {
i = -1; advance = false; // 所有任务已分配完
}
// CAS获取任务:[nextBound, nextIndex)范围
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 【关键节点5】线程完成检查和退出机制
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 扩容完全结束
nextTable = null;
table = nextTab; // 切换数组
sizeCtl = (n << 1) - (n >>> 1); // 新阈值
return;
}
// 当前线程完成,CAS减少线程计数
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断是否为最后一个线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true; // 最后一个线程做最终检查
i = n;
}
}
// 【关键节点6】空节点处理
else if ((f = tabAt(tab, i)) == null) {
advance = casTabAt(tab, i, null, fwd); // 直接放ForwardingNode
}
// 【关键节点7】已迁移节点跳过
else if ((fh = f.hash) == MOVED) {
advance = true;
}
// 【关键节点8】实际数据迁移(最核心)
else {
synchronized (f) { // 锁定头节点
if (tabAt(tab, i) == f) { // 双重检查
Node<K,V> ln, hn; // 低位和高位链表
if (fh >= 0) { // 普通链表
// 数据迁移....
// 设置ForwardingNode
setTabAt(tab, i, fwd);
}
else if (f instanceof TreeBin) {
// 【红黑树迁移】类似链表,但处理TreeNode...
// 设置ForwardingNode
setTabAt(tab, i, fwd);
}
}
}
}
}
}
三、一些问题
什么是红黑树(红黑树的特点)
- 每个节点要么是红色,要么是黑色
- 根节点是黑色的
- 每个叶节点(空节点)是黑色的
- 红色节点的子节点必须是黑色的(即不能有两个连续的红色节点)
- 从任意节点到其所有叶节点的路径上,必须包含相同数量的黑色节点
ConcurrentHashMap默认支持16个并发,怎么支持多点的并发呢?
上面得知,每次加锁是对个节点槽加锁,可以把容量扩大,来支持多并发
ConcurrentHashMap 设置一个值要Hash几次
2次,第一次调用 spread方法扰乱,第二次 (n-1) & hash
计算节点的位置
ConcurrentHashMap 怎么实现线程安全
如上