killJava系列 -- ConcurrentHashmap源码分析

前言

私信请在SegmentFault
转发请注明出处

改进一

取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。

改进二

将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。

1. 内部参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//初始容积为 16 
private static final int DEFAULT_CAPACITY = 16;
//加载因子 0.75
private static final float LOAD_FACTOR = 0.75f;

/**
* 盛装Node元素的数组 它的大小是2的整数次幂
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
/*
* hash表初始化或扩容时的一个控制位标识量。
* 负数代表正在进行初始化或扩容操作
* -1代表正在初始化
* -N 表示有N-1个线程正在进行扩容操作
* 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
*
* **既代表 HashMap 的 threshold**
* 又代表 **进行扩容时的进程数**
*/
private transient volatile int sizeCtl;

// 以下两个是用来控制扩容的时候 单线程进入的变量
// resize校验码
private static int RESIZE_STAMP_BITS = 16;
// resize校验码的位移量。
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;


/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash值是-1,表示这是一个forwardNode节点
static final int TREEBIN = -2; // hash值是-2 表示这时一个TreeBin节点
static final int RESERVED = -3; // hash for transient reservations
//在 spread() 方法中 用来对 hashcode 进行 高位hash 减少可能发生的碰撞。
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

上面的 sizectl 很重要。是解决 concurrenthashmap 扩容的基础

2. 内部类

2.1. Node

HashMap 最大的区别是 加入了对val 与 next 用了volatile关键字修饰
并且 setValue() 方法 直接抛出异常,可以看出,val 是不能直接改变的。
是通过 Unsafe 类的 方法进行全部替换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
//相比于 HashMap ,加入了 volatile 关键字
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

2.2 TreeNode

HashMap 不同的是

  1. 这次 TreeNode 不再是继承自 LinkedHashMap.Entry 而是继承自本类中的 Node.
  2. 并不直接用于红黑树的结点,而是将 结点包装成 TreeNode 后,用下面的 TreeBin 进行二次包装。
  3. 优点是可以使用 Node 类的 next 指针,方便TreeBin 后续 从 链表红黑树 的转换。
    构造函数可以看出,原先对TreeNode 的初始化只是设置了其的后续结点。组成了链表。
1
2
3
4
5
6
7
8
9
10
11
12
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}

2.3. TreeBin

特点: 1. 不持有key与val ,指向TreeNode 的 root 与 list。

2. 加入读写锁。方便并发的访问。
1
2
3
4
5
6
7
8
9
10
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
//通过锁的状态 , 判断锁的类型。
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock

构造方法如下
root 代表 TreeNode 的根结点
使用first ,是用于第一次初始化时,因为root的特殊性,所以不便于 this.root = b 因此通过 first代替第一次的初始化过程。
然后在 过程中 用r 代表root ,直到结束 红黑树的初始化后,再 root =r保证root的安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
TreeBin(TreeNode<K,V> b) {  
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}

2.4. ForwardingNode

作用是在 transfer() 过程中,插入到 TreeBin 之间,用作链接作用。

1
2
3
4
5
6
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}

3. Unsafe 类 与 常用的操作

3.1. Unsafe 与 静态代码块

Unsafe提供了硬件级别的原子操作。内部的方法均为 native方法 ,可以访问系统底层。
这里用了 CAS 算法(compare and swap) 大大的避免了使用时对性能的消耗,以及保证了使用时的安全性。
注: CAS 算法的核心是 将需要改变的参数,与内存中已经存在的变量的值进行对比,一致就改变,不一致就放弃这次操作。与之相类似的优化操作还有 LL/SC(Load-Linked/Store-Conditional : 加载链接/条件存储) 、 Test-and-Set(测试并设置)
这里额外介绍一下 Unsafe 类的 compareAndSwapInt 方法。

1
2
3
4
5
6
7
8
9
10
/**
* 比较obj的offset处内存位置中的值和期望的值,如果相同则更新。此更新是不可中断的。
*
* @param obj 需要更新的对象
* @param offset obj中整型field的偏移量
* @param expect 希望field中存在的值
* @param update 如果期望值expect与field的当前值相同,设置filed的值为这个新值
* @return 如果field的值被更改返回true
*/
public native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);

下面是 ConcurrentHashMap 中有关的应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Unsafe mechanics
private static final sun.misc.Unsafe U;
//对应于 类中的 sizectl
private static final long SIZECTL;
//在 transfer() 方法的使用时,计算索引
private static final long TRANSFERINDEX;
// 用于对 ConcurrentHashMap 的 size 统计。
// 下文 第8点关于 size 会说明。
private static final long BASECOUNT;
// 辅助类 countercell 类中的属性,用于分布式计算
// 是实现 java8 中 londAddr 的基础
private static final long CELLSBUSY;
private static final long CELLVALUE;
// 用来确定在数组中的位置
// 数组中的偏移地址
private static final long ABASE;
// 数组中的增量地址
private static final int ASHIFT;

static {
try {
//通过反射调用 类中的值,从而对 这些变量赋值
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}

3.2 常用方法

在操作过程中,经常会看到以下几个,或者相类似的方法。
其核心是

1
2
3
4
5
6
7
8
9
10
11
12
13
   //获得 i 位置上的 Node 节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//利用CAS算法设置i位置上的Node节点。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//利用volatile方法设置节点位置的值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

4. 初始化函数 initTable

调用ConcurrentHashMap的构造方法仅仅是设置了一些参数而已,而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。
当向 map 插入数据的时候 table == null , 则会调用 initTable()方法 。
put 方法 简单展示一下。

1
2
3
4
5
6
7
8
9
10
final V putVal(K key, V value, boolean onlyIfAbsent) {
...
...
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
...
...
}

initTable() 方法展示如下
其中有 sizectl 变量,这里回顾一下
hash表初始化或扩容时的一个控制位标识量。
负数代表正在进行初始化或扩容操作
-1代表正在初始化
-N 表示有N-1个线程正在进行扩容操作
正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/** 
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {

//sizeCtl <0 表示有其他线程正在进行初始化操作,把线程挂起。对于table的初始化工作,只能有一个线程在进行。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin

//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//相当于0.75*n 设置一个扩容的阈值
// sc = n - n/4
sc = n - (n >>> 2);
}
} finally {
// 更新 sizectl
sizeCtl = sc;
}
break;
}
}
return tab;
}

5. transfer() 扩容操作

当ConcurrentHashMap容量不足的时候,需要对table进行扩容。这个方法的基本思想跟HashMap是很像的,但是由于它是支持并发扩容的,所以要复杂的多。原因是它支持多线程进行扩容操作,而并没有加锁。我想这样做的目的不仅仅是为了满足concurrent的要求,而是希望利用并发处理去减少扩容带来的时间影响。因为在扩容的时候,总是会涉及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作能够并发进行,那真真是极好的了。

整个扩容操作分为两个部分:

1. 第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的,这个地方在后面会有提到;
2. 第二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。

先来看一下单线程是如何完成的:
它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素:

1. 如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;
2. 如果这个位置是Node节点(fh>=0),就构造两个链表,一个代表高位为 0 , 一个代表高位为 1 。将原来的结点 分别放在nextTable的i和i+n的位置上,并且除了lastRun的位置相对位于链表的底部外,其余元素均为 **反序** 。
3. 如果这个位置是TreeBin节点(fh<0),也做一个处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上

遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。

再看一下多线程是如何完成的:

1
2
3
//如果遍历到ForwardingNode节点  说明这个点已经被处理过了,直接跳过  这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed

这是一个判断,如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。

下面是源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/**
* 一个过渡的table表 只有在扩容的时候才会使用
*/
private transient volatile Node<K, V>[] nextTable;

/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
int n = tab.length, stride;
// 通过计算 NCPU CPU的核心数与 表的大小的比值,将表进行范围的细分,以方便 并发。
// 感觉上 有点像 segment 分段锁的意思。
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
//构造一个nextTable对象 它的容量是原来的两倍。
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
//原来的 容量限制为 1<<30
//HashMap 在扩容时,会用 resize() 方法,扩大 threshold 的值
//当大于 MAXIMUM_CAPACITY 时,会将 threshold 设置为 Integer.MAX_VALUE
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);//构造一个连节点指针 用于标志位
boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过
boolean finishing = false; // to ensure sweep before committing nextTab

for (int i = 0, bound = 0; ; ) {
Node<K, V> f;
int fh;
//这个while循环体的作用就是在控制i递减 通过i可以依次遍历原hash表中的节点
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
} else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
return;
}
//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果遍历到的节点为空 则放入ForwardingNode指针
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍历到ForwardingNode节点 说明这个点已经被处理过了,直接跳过 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//节点上锁
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K, V> ln, hn;
//如果fh>=0 证明这是一个Node节点
if (fh >= 0) {
// runBit 代表正在 运行的 Node 节点的 分类
// 因此链表根据高位为0或者1分为两个子链表,高位为0的节点桶位置没有发生变化,高位为1的节点桶位置增加了n,
// 所以有setTabAt(nextTab, i, ln);和 setTabAt(nextTab, i + n, hn);
// n = 2的幂 。 二进制 0001000
// fh & n = 1. 1000
// 2. 0000 所以划分出两个链表。
int runBit = fh & n;
// lastRun 是正在运行的节点
Node<K, V> lastRun = f;
//以下的部分在完成的工作是构造两个链表 一个是高位为 0 的链表 另一个是高位为 1 的链表
// 找出最后一个 与后面的结点不同的 结点
for (Node<K, V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 将最后一个 结点保存起来
if (runBit == 0) {
ln = lastRun;
hn = null;
} else {
hn = lastRun;
ln = null;
}

for (Node<K, V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
//这个链表是从低层向上构建
// ln 或 hn = lastRun, 构建一个 node 结点
// 其下一个结点为 lastRun 。
if ((ph & n) == 0) // 构建低位链表
ln = new Node<K, V>(ph, pk, pv, ln);
else // 构建高位链表
hn = new Node<K, V>(ph, pk, pv, hn);
}

//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行 --i 操作
advance = true;
}
//对TreeBin对象进行处理 与上面的过程类似
else if (f instanceof TreeBin) {
TreeBin<K, V> t = (TreeBin<K, V>) f;
TreeNode<K, V> lo = null, loTail = null;
TreeNode<K, V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//构造高位和低位两个链表
for (Node<K, V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K, V> p = new TreeNode<K, V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果扩容后已经不再需要tree的结构 反向转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K, V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K, V>(hi) : t;
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行 --i 操作
advance = true;
}
}
}
}
}
}

6. put 方法

put方法依然沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i。

注:

1. hash = spread(key.hashCode())
2. spread(int h) --> return (h ^ (h >>> 16)) & HASH_BITS;  --> 通过hashCode()的高16位异或低16位优化高位运算的算法
3. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {     
    if (casTabAt(tab, i, null,      
        new Node<K,V>(hash, key, value, null)))    
            break;                   // no lock when adding to empty bin    
}

如果i位置是空的,直接放进去,否则进行判断,
如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾
不同点:ConcurrentHashMap不允许key或value为null值。

多线程的情况下:

  1. 如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容; –> helpTransfer() 方法。
  2. 如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比hashTable的synchronized要好得多。
    1. 首先判断这个节点的类型。如果是链表节点(fh>0),则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。
    2. 如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。
    3. 如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不允许 key或value为null
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
//计算该链表 节点的数量
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 第一次 put 操作的时候初始化,如果table为空的话,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();

//根据hash值计算出在table里面的位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 根据对应的key hash 到具体的索引,如果该索引对应的 Node 为 null,则采用 CAS 操作更新整个 table
// 如果这个位置没有值 ,直接放进去,不需要加锁
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//当遇到表连接点时,需要进行整合表的操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 结点上锁,只是对链表头结点作锁操作
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh > 0 说明这个节点是一个链表的节点 不是树的节点
if (fh >= 0) {
binCount = 1;
//在这里遍历链表所有的结点
//并且计算链表里结点的数量
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果hash值和key值相同 则修改对应结点的value值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部
if ((e = e.next) == null) {
// 插入到链表尾
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果这个节点是树节点,就按照树的方式插入值
else if (f instanceof TreeBin) {
// 如果是红黑树结点,按照红黑树的插入
Node<K,V> p;
// 如果为树节点, binCount一直为2,不会引发扩容。
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果这个链表结点达到了临界值8,那么把这个链表转换成红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//将当前ConcurrentHashMap的元素数量+1,table的扩容是在这里发生的
addCount(1L, binCount);
return null;
}

6.1 helpTransfer() 方法

出现于 put 方法 如下地点

1
2
3
//当遇到表连接点时,需要进行整合表的操作  
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);

helpTransfer() 方法的源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {  
Node<K,V>[] nextTab; int sc;
// 当前 table 不为 null , 且 f 为 forwardingNode 结点 , 且存在下一张表
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);//计算一个扩容校验码
// 当 sizeCtl < 0 时,表示有线程在 transfer().
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//正常情况下 sc >>> RESIZE_STAMP_SHIFT == resizeStamp(tab.length);
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//将 扩容的线程先行减一,表示,这是来辅助 transfer,而非进行 transfer的线程。
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

6.2 treeifyBin() 方法

涉及变量 MIN_TREEIFY_CAPACITY = 64;
如果数组长度n小于阈值MIN_TREEIFY_CAPACITY,默认是64,则会调用tryPresize方法把数组长度扩大到原来的两倍,并触发transfer方法,重新调整节点的位置。
出现于 put 方法 如下地点

1
2
3
4
5
6
7
8
if (binCount != 0) {
// TREEIFY_THRESHOLD 默认为 8.
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}

其中源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 将原来的数组扩大为原来的两倍
tryPresize(n << 1);

else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

6.3 tableSizeFor 方法

这里讲一个 JDK8 中设计的非常巧妙的算法。看了好久才看懂。
出自 tryPresize 方法中的以下位置

1
2
3
4
5
//数组的最大容积为 1<<30 。如果数组大小超过 1<<29 ,则将最大大小设置为 MAXIMUM_CAPACITY
//否则,设置为原来的两倍。
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);

下面让我们来分析一下,tableSizeFor()
这个算法的目的,是得出相比较于给定参数,返回一个刚好比参数大的 2次幂 整数。

1
2
3
4
5
6
7
8
9
static final int tableSizeFor(int cap) {
int n = cap - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

先来分析有关n位操作部分:先来假设n的二进制为01xxx…xxx。接着
对n右移1位:001xx…xxx,再位或:011xx…xxx
对n右移2为:00011…xxx,再位或:01111…xxx
此时前面已经有四个1了,再右移4位且位或可得8个1
同理,有8个1,右移8位肯定会让后八位也为1。
综上可得,该算法让最高位的1后面的位全变为1。

最后再让结果n+1,即得到了2的整数次幂的值了。

现在回来看看第一条语句:
int n = cap - 1;
  让cap-1再赋值给n的目的是另找到的目标值大于或等于原值。例如二进制1000,十进制数值为8。如果不对它减1而直接操作,将得到答案10000,即16。显然不是结果。减1后二进制为111,再进行操作则会得到原来的数值1000,即8。

引用自(http://www.cnblogs.com/loading4/p/6239441.html)

7. get 方法

通过 key值 搜索 value 值。
并且要 通过分辨 结点的种类,进行不同形式的寻找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public V get(Object key) {  
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
//根据hash值确定节点位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果eh<0 说明这个节点在树上 直接寻找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//否则遍历链表 找到对应的值并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

8. Size相关

《并发编程实战》中有提到,size返回的结果在计算时可能已经过期了,它实际上只是一个估计值,因此允许size返回一个近似值,而不是一个精确值。

8.1 CounterCell 类

从注释中可以看出,这是从 LongAdder 类中的思想,拷贝过来的一个类。
LongAdder 类 是 JDK 1.8 新引进的类,其思想:

多个线程持有自己的加数(cell),线程个数增加时,会自动提供新的加数。
当所有工作做完后,再提供新的加数。

有时间写一篇相关的源码分析.

不过,这里一样不能精确统计,这里的 CounterCell 等同于 LongAdder.Cell sumCount() 等同于 LongAdder.sum()方法。
执行逻辑是一样的。
就 LongAdder 类中的 sum 方法所说, 当有线程在运行时,一样只是估计值,只有当所有线程执行完毕,才是实际值。
而统计 Size ,不能够像垃圾清除一样,有 Safe point 或 Safe region ,所以,这个假设不成立。。。

其相关的源码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    /**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}

//执行逻辑
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

8.2 mappingCount 方法

就官方文档中所说, mappingCount 方法,应该取代 size 方法,
但这个方法得出的值一样在线程运行的时候,只是一个估计的值。
从源码中就可以看出,使用的是上文分析的 sumCount() 方法。

1
2
3
4
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}

8.3 addCount 方法

出自于 put 方法的如下位置

1
2
3
4
    //将当前ConcurrentHashMap的元素数量+1  
addCount(1L, binCount);
return null;
}

统计上:
这里用到 CounterCell类,并且统计的值的计算一样是采用的 sumCount() 方法。
所以缺点如上,不再阐述。
扩容上:
逻辑与 helpTransfer() 类似,都是判断是否有多个线程在执行扩容,然后判断是否需要辅助 transfer();
源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private final void addCount(long x, int check) {  
//用到了 CounterCell 类
CounterCell[] as; long b, s;
//利用CAS方法更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//如果check值大于等于0 则需要检验是否需要进行扩容操作
//下面的逻辑与 helpTransfer() 类似,可以与 helpTransfer() 一起参考。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//如果已经有其他线程在执行扩容操作
if (sc < 0) {
//校验失效,直接退出。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;

if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

结语

喜欢的小伙伴可以点个关注,不定期干货更新
加油加油

0%