ConcurrentHashMap解析

[TOC]

存储结构

ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁
(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发
度更高(并发度就是 Segment 的个数)。Segment 继承自 ReentrantLock。

1
2
3
4
5
6
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}

ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁
(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发
度更高(并发度就是 Segment 的个数)。
Segment 继承自 ReentrantLock。

1
2
3
4
5
6
7
8
9
10
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
transient volatile HashEntry<K,V>[] table;
transient int count;
transient int modCount;
transient int threshold;
final float loadFactor;
}
1
final Segment<K,V>[] segments;

默认的并发级别为 16,也就是说默认创建 16 个 Segment。

1
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

HashEntry跟HashMap差不多的,但是不同点是,他使用volatile去修饰了他的数据Value还有下一个节点next。

并发度高的原因

原理上来说,ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。

不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。

put

concurrentHashMap的put 方法首先定位到 某个Segment,调用segment的put方法在对应的segment做插入操作。segment put实现过程:

  1. 获取锁,保证put操作的线程安全;
  2. 定位到HashEntry数组中具体的HashEntry;

然后进行插入操作,第一步判断是否需要对 Segment 里的 HashEntry 数组进行扩容,在扩容的时候,首先创建一个容量是原来容量两倍的数组,将原数组的元素再散列后插入到新的数组里。为了高效,ConcurrentHashMap只对某个Segment进行扩容,不会对整个容器扩容。第二步定位添加元素的位置,然后将其放入数组

1
2
3
4
5
6
7
8
9
10
11
12
//ConcurrentHashMap#put
public V put(K key, V value) {
  Segment<K,V> s;
  if (value == null)
    throw new NullPointerException();
  int hash = hash(key);//根据散列函数,计算出key值的散列值
  int j = (hash >>> segmentShift) & segmentMask;//这个操作就是定位Segment的数组下标,jdk1.7之前是segmentFor返回Segment,1.7之后直接就取消了这个方法,直接计算数组下标,然后通过偏移量底层操作获取Segment
  if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
      (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
    s = ensureSegment(j);//通过便宜量定位不到就调用ensureSegment方法定位Segment
  return s.put(key, hash, value, false);
}
  • 通过key定位到Segment,之后在对应的Segment中进行具体的put
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

get

get 逻辑比较简单,只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。

由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值,整个过程都不需要加锁。。

size

每个 Segment 维护了一个 count 变量来统计该 Segment 中的键值对个数。

在执行 size 操作时,需要遍历所有 Segment 然后把 count 累计起来。ConcurrentHashMap 在执行 size 操作时先尝试不加锁,如果连续两次不加锁操作得到的结果一致,那么可以认为这个结果是正确的。
如果尝试的次数超过 3 次,就需要对每个 Segment 加锁。

扩容

相对于HashMap的resize,ConcurrentHashMap的rehash原理类似,但是Doug Lea为rehash做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的index为i,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此大多数HashEntry节点在扩容前后index可以保持不变。基于此,rehash方法中会定位第一个后续所有节点在扩容后index都保持不变的节点,然后将这个节点之前的所有节点重排即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
void rehash() {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY)
return;

/*
* Reclassify nodes in each list to new Map. Because we are
* using power-of-two expansion, the elements from each bin
* must either stay at same index, or move with a power of two
* offset. We eliminate unnecessary node creation by catching
* cases where old nodes can be reused because their next
* fields won't change. Statistically, at the default
* threshold, only about one-sixth of them need cloning when
* a table doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by any
* reader thread that may be in the midst of traversing table
* right now.
*/
/*
* 其实这个注释已经解释的很清楚了,主要就是因为扩展是按照2的幂次方
* 进行扩展的,所以扩展前在同一个桶中的元素,现在要么还是在原来的
* 序号的桶里,或者就是原来的序号再加上一个2的幂次方,就这两种选择。
* 所以原桶里的元素只有一部分需要移动,其余的都不要移动。该函数为了
* 提高效率,就是找到最后一个不在原桶序号的元素,那么连接到该元素后面
* 的子链表中的元素的序号都是与找到的这个不在原序号的元素的序号是一样的
* 那么就只需要把最后一个不在原序号的元素移到新桶里,那么后面跟的一串
* 子元素自然也就连接上了,而且序号还是相同的。在找到的最后一个不在
* 原桶序号的元素之前的元素就需要逐个的去遍历,加到和原桶序号相同的新桶上
* 或者加到偏移2的幂次方的序号的新桶上。这个都是新创建的元素,因为
* 只能在表头插入元素。这个原因可以参考
* 《探索 ConcurrentHashMap 高并发性的实现机制》中的讲解
*/

HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
threshold = (int)(newTable.length * loadFactor);
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity ; i++) {
// We need to guarantee that any existing reads of old Map can
// proceed. So we cannot yet null out each bin.
HashEntry<K,V> e = oldTable[i];

if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;

// Single node on list
if (next == null)
newTable[idx] = e;

else {
// Reuse trailing consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
// 这里就是遍历找到最后一个不在原桶序号处的元素
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 把最后一个不在原桶序号处的元素赋值到新桶中
// 由于链表本身的特性,那么该元素后面的元素也都能连接过来
// 并且能保证后面的这些元素在新桶中的序号都是和该元素是相等的
// 因为上面的遍历就是确保了该元素后面的元素的序号都是和这个元素
// 的序号是相等的。不然遍历中还会重新赋值lastIdx
newTable[lastIdx] = lastRun;

// Clone all remaining nodes
// 这个就是把上面找到的最后一个不在原桶序号处的元素之前的元素赋值到
// 新桶上,注意都是把元素添加到新桶的表头处
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable;
}

1.8他的数据结构

其中抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。

跟HashMap很像,也把之前的HashEntry改成了Node,但是作用不变,把值和next采用了volatile去修饰,保证了可见性,并且也引入了红黑树,在链表大于一定值的时候会转换(默认是8)。

① 取消分段锁机制,进一步降低冲突概率。② 引入红黑树结构,同一个哈希槽上的元素个数超过一定阈值后,单向链表改为红黑树结构。③ 使用了更加优化的方式统计集合内的元素数量。具体优化表现在:在 put、resize 和 size 方法中设计元素总数的更新和计算都避免了锁,使用 CAS 代替。

put

①根据 key 计算出 hashcode,判断是否需要进行初始化。 。

f 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。

③如果都不满足,则利用 synchronized 锁写入数据。

④如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

get

get 同样不需要同步

1.根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。

2.如果是红黑树那就按照树的方式获取值。

3.就不满足那就按照链表的方式遍历获取值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

扩容

扩容时候遇到put操作怎么办

get,无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新 table 进行搜索

ForwardingNode: 是临时节点,这个节点会出现在扩容的时候,不存储实际的数据数据。

这是一个真正的辅助类,该类仅仅只存活在ConcurrentHashMap扩容操作时。只是一个标志节点,并且指向nextTable,它提供find方法而已。该类也是集成Node节点,其hash为-1,key、value、next均为null。如下:

static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
    Node<K,V> find(int h, Object k) {
    // loop to avoid arbitrarily deep recursion on forwarding nodes
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
        if (k == null || tab == null || (n = tab.length) == 0 ||
                (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        for (;;) {
            int eh; K ek;
            if ((eh = e.hash) == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            if (eh < 0) {
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                    return e.find(h, k);
            }
            if ((e = e.next) == null)
                return null;
        }
    }
}
}

如果Hash桶被迁移到新的table中,会在旧的table插入一个ForwardingNode临时节点,内部会指向新的table。

当读操作碰到ForwardingNode,会通过ForwardingNode内部的nextTable找到新的table,继续读。

当写操作碰到ForwadingNode,加入帮助扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
private final void tryPresize(int size) {
// c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 这个 if 分支和之前说的初始化数组的代码基本上是一样的
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // 0.75 * n
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法
// 此时 nextTab 不为 null
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2)
// 调用 transfer 方法,此时 nextTab 参数为 null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}