ReentrantReadWriteLock剖析

[TOC]

一、读写锁简介

现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。

 针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁

读写锁允许同一时刻被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞

而读写锁有以下三个重要的特性:

(1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。

(2)重进入:读锁和写锁都支持线程重进入。

(3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

二、源码解读

我们先来看下 ReentrantReadWriteLock 类的整体结构:

1.HoldCounter

Sync类内部存在两个内部类,分别为HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要与读锁配套使用,其中,HoldCounter源码如下。

1
2
3
4
5
6
7
8
// 计数器
static final class HoldCounter {
// 计数
int count = 0;
// Use id, not reference, to avoid garbage retention
// 获取当前线程的TID属性的值
final long tid = getThreadId(Thread.currentThread());
}

说明:HoldCounter主要有两个属性,count和tid,其中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。ThreadLocalHoldCounter的源码如下

说明:ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。

2.ThreadLocalHoldCounter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 本地线程计数器
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
// 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 构造函数
Sync() {
// 本地线程计数器
readHolds = new ThreadLocalHoldCounter();
// 设置AQS的状态
setState(getState()); // ensures visibility of readHolds
}

说明:在Sync的构造函数中设置了本地线程计数器和AQS的状态state。

3、读写状态的设计

同步状态在重入锁的实现中是表示被同一个线程重复获取的次数,即一个整形变量来维护,但是之前的那个表示仅仅表示是否锁定,而不用区分是读锁还是写锁。而读写锁需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。

读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写。

http://static.open-open.com/lib/uploadImg/20151031/20151031223319_397.png

4、写锁的获取与释放

看下WriteLock类中的lock和unlock方法:

1
2
3
4
5
6
7
public void lock() {
sync.acquire(1);
}

public void unlock() {
sync.release(1);
}

可以看到就是调用的独占式同步状态的获取与释放,因此真实的实现就是Sync的 tryAcquire和 tryRelease。

写锁的获取,看下tryAcquire:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 1 protected final boolean tryAcquire(int acquires) {
2 //当前线程
3 Thread current = Thread.currentThread();
4 //获取状态
5 int c = getState();
6 //写线程数量(即获取独占锁的重入数)
7 int w = exclusiveCount(c);
8
9 //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁
10 if (c != 0) {
11 // 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false;
12 // 如果写锁状态不为0且写锁没有被当前线程持有返回false
13 if (w == 0 || current != getExclusiveOwnerThread())
14 return false;
15
16 //判断同一线程获取写锁是否超过最大次数(65535),支持可重入
17 if (w + exclusiveCount(acquires) > MAX_COUNT)
18 throw new Error("Maximum lock count exceeded");
19 //更新状态
20 //此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。
21 setState(c + acquires);
22 return true;
23 }
24
25 //到这里说明此时c=0,读锁和写锁都没有被获取
26 //writerShouldBlock表示是否阻塞
27 if (writerShouldBlock() ||
28 !compareAndSetState(c, c + acquires))
29 return false;
30
31 //设置锁为当前线程所有
32 setExclusiveOwnerThread(current);
33 return true;
34 }

其中exclusiveCount方法表示占有写锁的线程数量,源码如下:

1
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

说明:直接将状态state和(2^16 - 1)做与运算,其等效于将state模上2^16。写锁数量由state的低十六位表示。

从源代码可以看出,获取写锁的步骤如下:

(1)首先获取c、w。c表示当前锁状态;w表示写线程数量。然后判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行(2);否则执行(5)。

(2)如果锁状态不为零(c != 0),而写锁的状态为0(w = 0),说明读锁此时被其他线程占用,所以当前线程不能获取写锁,自然返回false。或者锁状态不为零,而写锁的状态也不为0,但是获取写锁的线程不是当前线程,则当前线程也不能获取写锁。

(3)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新是线程安全的),返回true。

(4)如果state为0,此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。

(5)成功获取写锁后,将当前线程设置为占有写锁的线程,返回true。

方法流程图如下:

img

写锁的释放,tryRelease方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 1 protected final boolean tryRelease(int releases) {
2 //若锁的持有者不是当前线程,抛出异常
3 if (!isHeldExclusively())
4 throw new IllegalMonitorStateException();
5 //写锁的新线程数
6 int nextc = getState() - releases;
7 //如果独占模式重入数为0了,说明独占模式被释放
8 boolean free = exclusiveCount(nextc) == 0;
9 if (free)
10 //若写锁的新线程数为0,则将锁的持有者设置为null
11 setExclusiveOwnerThread(null);
12 //设置写锁的新线程数
13 //不管独占模式是否被释放,更新独占重入数
14 setState(nextc);
15 return free;
16 }

写锁的释放过程还是相对而言比较简单的:首先查看当前线程是否为写锁的持有者,如果不是抛出异常。然后检查释放后写锁的线程数是否为0,如果为0则表示写锁空闲了,释放锁资源将锁的持有线程设置为null,否则释放仅仅只是一次重入锁而已,并不能将写锁的线程清空。

说明:此方法用于释放写锁资源,首先会判断该线程是否为独占线程,若不为独占线程,则抛出异常,否则,计算释放资源后的写锁的数量,若为0,表示成功释放,资源不将被占用,否则,表示资源还被占用。其方法流程图如下。

img

5、读锁的获取与释放

类似于写锁,读锁的lock和unlock的实际实现对应Sync的 tryAcquireShared 和 tryReleaseShared方法。

读锁的获取,看下tryAcquireShared方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
/**
* 写锁状态不为0 并且 当前线程不是写锁的占有者,即写锁由其他
* 线程占有,则获取读锁失败
*/
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
//读锁的同步状态
int r = sharedCount(c);
// 读状态的增加与减少需要对参数进行移位操作
// compareAndSetState(c, c + SHARED_UNIT) 即为c+ 1<<16
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
//如果读锁未被获取
if (r == 0) {
//firstThread 是第一个获取读锁的线程
//firstReaderHoldCount 是firstThread持有读锁的数目
firstReader = current;
firstReaderHoldCount = 1;
//如果当前线程是第一个获取读锁的线程,则计数器++
} else if (firstReader == current) {
firstReaderHoldCount++;
//如果读锁被获取了,且当前线程不是第一个获取读锁的线程
} else {
//HoldCounter是每一个线程读锁持有数目的计数器,它包含两个成员变量:count和线程id
//cachedHoldCounter表示上一个成功获取读锁的线程的读锁计数器
HoldCounter rh = cachedHoldCounter;
/**
* rh==null 表示这是第一个获取读锁的线程
* rh.tid != rh.tid != getThreadId(current)) 表示当前线程不是上一个成功获取读锁的线程
* 其实下面的if和else if都是在更新cachedHoldCounter,读锁持有数目的增加在rh.count++
*/
if (rh == null || rh.tid != getThreadId(current))
//获得当前线程的计数器 并将其设为cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

1、获取同步状态c,获取写锁状态w、读锁状态r。

2、如果w!=0且写锁被其他线程获取,则进入等待状态。

3.1、如果读锁未被获取(r==0),则设置当前线程为第一个获取读锁的线程,并设置持有读锁数目为1

3.2、如果当前线程是第一个获取读锁的线程,则持有读锁的数目加1

3.3、如果读锁被获取了,且当前线程不是第一个获取读锁的线程。那么:

3.3.1 如果上一个获取读锁的线程不是当前线程,则拿到当前线程的ThreadLocal变量,并赋予cachedHoldCounter,持有读锁数目加1

3.3.2 如果上一个获取读锁的线程是当前线程,直接持有读锁数目加1

从28行开始,乍看之下不知道在做些什么,其实我们举个获取读锁的例子,然后根据代码一步步操作就会发现很简单。 举个例子:假定此时readHold是的状态如下图,上一个成功获取读锁的线程为1003。

img

​ 此时线程1004来获取读锁(当前线程是1004)。从上面代码28行开始:我们将1003的计数器赋予了rh,由于rh不为null且当前线程不是1003,所以我们获取当前线程的计数器(实际上就是线程id=1004,count=0),并将该计数器赋予rh和cachedHoldCounter;然后进行rh.count++,即读锁计数加1.

img

​ a处黑线代码中没有该操作,我这里是说明1004的计数器由readHolds维护。

​ 同样,如果现在是1002线程来获取读锁,那么获取1002线程的计数器,然后进行相应操作

这里为什么要搞一个firstRead、firstReaderHoldCount呢?而不是直接使用else那段代码?这是为了一个效率问题,firstReader是不会放入到readHolds中的,如果读锁仅有一个的情况下就会避免查找readHolds。可能就看这个代码还不是很理解HoldCounter。我们先看firstReader、firstReaderHoldCount的定义:

img

读锁的释放,tryReleaseShared方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 1 protected final boolean tryReleaseShared(int unused) {
2 // 获取当前线程
3 Thread current = Thread.currentThread();
4 if (firstReader == current) { // 当前线程为第一个读线程
5 // assert firstReaderHoldCount > 0;
6 if (firstReaderHoldCount == 1) // 读线程占用的资源数为1
7 firstReader = null;
8 else // 减少占用的资源
9 firstReaderHoldCount--;
10 } else { // 当前线程不为第一个读线程
11 // 获取缓存的计数器
12 HoldCounter rh = cachedHoldCounter;
13 if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
14 // 获取当前线程对应的计数器
15 rh = readHolds.get();
16 // 获取计数
17 int count = rh.count;
18 if (count <= 1) { // 计数小于等于1
19 // 移除
20 readHolds.remove();
21 if (count <= 0) // 计数小于等于0,抛出异常
22 throw unmatchedUnlockException();
23 }
24 // 减少计数
25 --rh.count;
26 }
27 for (;;) { // 无限循环
28 // 获取状态
29 int c = getState();
30 // 获取状态
31 int nextc = c - SHARED_UNIT;
32 if (compareAndSetState(c, nextc)) // 比较并进行设置
33 // Releasing the read lock has no effect on readers,
34 // but it may allow waiting writers to proceed if
35 // both read and write locks are now free.
36 return nextc == 0;
37 }
38 }

说明:此方法表示读锁线程释放锁。首先判断当前线程是否为第一个读线程firstReader,若是,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,若是,则设置第一个读线程firstReader为空,否则,将第一个读线程占有的资源数firstReaderHoldCount减1;若当前线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器 ),若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器,如果计数器的计数count小于等于1,则移除当前线程对应的计数器,如果计数器的计数count小于等于0,则抛出异常,之后再减少计数即可。无论何种情况,都会进入无限循环,该循环可以确保成功设置状态state。其流程图如下。

img

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class hhh {

private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
private ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();

public void read(){
try {
readLock.lock();
System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
Thread.sleep(3000);
System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
} catch (Exception e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}

public void write(){
try {
writeLock.lock();
System.out.println("当前线程:" + Thread.currentThread().getName() + "进入...");
Thread.sleep(3000);
System.out.println("当前线程:" + Thread.currentThread().getName() + "退出...");
} catch (Exception e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}

public static void main(String[] args) {

final hhh urrw = new hhh();

Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
urrw.read();
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
urrw.read();
}
}, "t2");
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
urrw.write();
}
}, "t3");
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
urrw.write();
}
}, "t4");
// t1.start();
// t2.start();
// t1.start(); // R
// t3.start(); // W
// t3.start();
// t4.start();

}
}

总结

通过上面的源码分析,我们可以发现一个现象:

在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。

在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。

仔细想想,这个设计是合理的:因为当线程获取读锁的时候,可能有其他线程同时也在持有读锁,因此不能把获取读锁的线程“升级”为写锁;而对于获得写锁的线程,它一定独占了读写锁,因此可以继续让它获取读锁,当它同时获取了写锁和读锁后,还可以先释放写锁继续持有读锁,这样一个写锁就“降级”为了读锁。

综上:

一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;写锁可以“降级”为读锁;读锁不能“升级”为写锁。