[TOC]
存储结构
ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁
(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发
度更高(并发度就是 Segment 的个数)。Segment 继承自 ReentrantLock。
1 | static final class HashEntry<K,V> { |
ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁
(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发
度更高(并发度就是 Segment 的个数)。
Segment 继承自 ReentrantLock。
1 | static final class Segment<K,V> extends ReentrantLock implements Serializable { |
1 | final Segment<K,V>[] segments; |
默认的并发级别为 16,也就是说默认创建 16 个 Segment。
1 | static final int DEFAULT_CONCURRENCY_LEVEL = 16; |
HashEntry跟HashMap差不多的,但是不同点是,他使用volatile去修饰了他的数据Value还有下一个节点next。
并发度高的原因
原理上来说,ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。
不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。
put
concurrentHashMap的put 方法首先定位到 某个Segment,调用segment的put方法在对应的segment做插入操作。segment put实现过程:
- 获取锁,保证put操作的线程安全;
- 定位到HashEntry数组中具体的HashEntry;
然后进行插入操作,第一步判断是否需要对 Segment 里的 HashEntry 数组进行扩容,在扩容的时候,首先创建一个容量是原来容量两倍的数组,将原数组的元素再散列后插入到新的数组里。为了高效,ConcurrentHashMap只对某个Segment进行扩容,不会对整个容器扩容。第二步定位添加元素的位置,然后将其放入数组
1 | //ConcurrentHashMap#put |
- 通过key定位到Segment,之后在对应的Segment中进行具体的put
1 | final V put(K key, int hash, V value, boolean onlyIfAbsent) { |
get
get 逻辑比较简单,只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。
由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值,整个过程都不需要加锁。。
size
每个 Segment 维护了一个 count 变量来统计该 Segment 中的键值对个数。
在执行 size 操作时,需要遍历所有 Segment 然后把 count 累计起来。ConcurrentHashMap 在执行 size 操作时先尝试不加锁,如果连续两次不加锁操作得到的结果一致,那么可以认为这个结果是正确的。
如果尝试的次数超过 3 次,就需要对每个 Segment 加锁。
扩容
相对于HashMap的resize,ConcurrentHashMap的rehash原理类似,但是Doug Lea为rehash做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的index为i,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此大多数HashEntry节点在扩容前后index可以保持不变。基于此,rehash方法中会定位第一个后续所有节点在扩容后index都保持不变的节点,然后将这个节点之前的所有节点重排即可。
1 | void rehash() { |
1.8他的数据结构
其中抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized
来保证并发安全性。
跟HashMap很像,也把之前的HashEntry改成了Node,但是作用不变,把值和next采用了volatile去修饰,保证了可见性,并且也引入了红黑树,在链表大于一定值的时候会转换(默认是8)。
① 取消分段锁机制,进一步降低冲突概率。② 引入红黑树结构,同一个哈希槽上的元素个数超过一定阈值后,单向链表改为红黑树结构。③ 使用了更加优化的方式统计集合内的元素数量。具体优化表现在:在 put、resize 和 size 方法中设计元素总数的更新和计算都避免了锁,使用 CAS 代替。
put
①根据 key 计算出 hashcode,判断是否需要进行初始化。 。
②f
即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
③如果都不满足,则利用 synchronized 锁写入数据。
④如果数量大于 TREEIFY_THRESHOLD
则要转换为红黑树。
1 | final V putVal(K key, V value, boolean onlyIfAbsent) { |
get
get
同样不需要同步
1.根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
2.如果是红黑树那就按照树的方式获取值。
3.就不满足那就按照链表的方式遍历获取值。
1 | public V get(Object key) { |
扩容
扩容时候遇到put操作怎么办
get,无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新 table 进行搜索
ForwardingNode: 是临时节点,这个节点会出现在扩容的时候,不存储实际的数据数据。
这是一个真正的辅助类,该类仅仅只存活在ConcurrentHashMap扩容操作时。只是一个标志节点,并且指向nextTable,它提供find方法而已。该类也是集成Node节点,其hash为-1,key、value、next均为null。如下:
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
如果Hash桶被迁移到新的table中,会在旧的table插入一个ForwardingNode临时节点,内部会指向新的table。
当读操作碰到ForwardingNode,会通过ForwardingNode内部的nextTable找到新的table,继续读。
当写操作碰到ForwadingNode,加入帮助扩容。
1 | // 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了 |