AQS是什么?

[TOC]

概念

AQS全称是AbstractQueuedSynchronizer,即抽象同步队列。下面看一下AQS的类图结构:

img

为了方便下面几个关键点的理解,大家先熟悉一下AQS的类图结构

AQS 队列同步器是用来构建锁或其他同步组件的基础框架,它使用一个 volatile int state 变量作为共享资源。如果线程获取资源失败,则进入同步队列等待;如果获取成功就执行临界区代码,释放资源时会通知同步队列中的等待线程。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,对同步状态进行更改需要使用同步器提供的 3个方法 getStatesetStatecompareAndSetState ,它们保证状态改变是安全的。

state 状态的维护

1
2
在AQS中维持了一个单一的共享状态state,来实现同步器同步。看一下state的相关代码如下:
复制代码

state源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    /**
* The synchronization state.
*/
private volatile int state;

/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}

/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}

/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
复制代码

state 源码设计几个回答要点:

  • state用volatile修饰,保证多线程中的可见性。
  • getState()和setState()方法采用final修饰,限制AQS的子类重写它们两。
  • compareAndSetState()方法采用乐观锁思想的CAS算法,也是采用final修饰的,不允许子类重写。

CLH队列

谈到CLH队列,我们结合以上state状态,先来看一下AQS原理图

CLH(Craig, Landin, and Hagersten locks) 同步队列 是一个FIFO双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。

AQS依赖它来完成同步状态state的管理,当前线程如果获取同步状态失败时,AQS则会调用addWaiter方法,CAS的方式将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程CAS设置失败会采用死循环的方式。当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

Node节点

CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),condition队列的后续节点(nextWaiter)如下图:

img

waitStatus几种状态状态:

img

我们再看一下CLH队列入列以及出列的代码:

入列

CLH队列入列就是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。addWaiter方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//构造Node
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//多次尝试
enq(node);
return node;
}
复制代码

由以上代码可得,addWaiter设置尾节点失败的话,调用enq(Node node)方法设置尾节点,enq方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
   private Node enq(final Node node) {
//死循环尝试,知道成功为止
for (;;) {
Node t = tail;
//tail 不存在,设置为首节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码

出列

首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点。可以看一下以下两段源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
复制代码
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

复制代码

CLH核心几个回答要点

  • 双向链表入列出列
  • CAS算法设置尾节点+死循环自旋。

CAS算法,可以看一下我工作实战中仿造CAS算法解决并发问题的实现 juejin.im/post/5d0616…

ConditionObject

ConditionObject简介

我们都知道,synchronized控制同步的时候,可以配合Object的wait()、notify(),notifyAll() 系列方法可以实现等待/通知模式。而Lock呢?它提供了条件Condition接口,配合await(),signal(),signalAll() 等方法也可以实现等待/通知机制。ConditionObject实现了Condition接口,给AQS提供条件变量的支持

Condition队列与CLH队列的那些事

我们先来看一下图:

img

ConditionObject队列与CLH队列的爱恨情仇:

  • 调用了await()方法的线程,会被加入到conditionObject等待队列中,并且唤醒CLH队列中head节点的下一个节点。
  • 线程在某个ConditionObject对象上调用了singnal()方法后,等待队列中的firstWaiter会被加入到AQS的CLH队列中,等待被唤醒。
  • 当线程调用unLock()方法释放锁时,CLH队列中的head节点的下一个节点(在本例中是firtWaiter),会被唤醒。

区别:

  • ConditionObject对象都维护了一个单独的等待队列 ,AQS所维护的CLH队列是同步队列,它们节点类型相同,都是Node。

独占与共享模式。

AQS支持两种同步模式:独占式和共享式。

  • Exclusive

    (独占):只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁:

    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
  • Share(共享):多个线程可同时执行,如 Semaphore/CountDownLatch。Semaphore、CountDownLatch、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。

独占模式表示锁只会被一个线程占用,其他线程必须等到持有锁的线程释放锁后才能获取锁,同一时间只能有一个线程获取到锁。

共享模式表示多个线程获取同一个锁有可能成功

独占模式通过 acquire 和 release 方法获取和释放锁,共享模式通过 acquireShared 和 releaseShared

img

img

独占式

获取同步状态时,调用 acquire 方法,维护一个同步队列,使用 tryAcquire 方法安全地获取线程同步状态,获取失败的线程会被构造同步节点并通过 addWaiter 方法加入到同步队列的尾部,在队列中自旋。之后调用 acquireQueued 方法使得该节点以死循环的方式获取同步状态,如果获取不到则阻塞,被阻塞线程的唤醒主要依靠前驱节点的出队或被中断实现,移出队列或停止自旋的条件是前驱节点是头结点且成功获取了同步状态。

释放同步状态时,同步器调用 tryRelease 方法释放同步状态,然后调用 unparkSuccessor 方法唤醒头节点的后继节点,使后继节点重新尝试获取同步状态。

同一时刻仅有一个线程持有同步状态,如ReentrantLock。又可分为公平锁和非公平锁。

公平锁: 按照线程在队列中的排队顺序,有礼貌的,先到者先拿到锁。

非公平锁: 当线程要获取锁时,无视队列顺序直接去抢锁,不讲道理的,谁抢到就是谁的。

acquire(int arg)是独占式获取同步状态的方法,我们来看一下源码:

  • acquire(long arg)方法
1
2
3
4
5
6
  public final void acquire(long arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
  • addWaiter方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//构造Node
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//多次尝试
enq(node);
return node;
}
复制代码
  • acquireQueued(final Node node, long arg)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 final boolean acquireQueued(final Node node, long arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码
  • selfInterrupt()方法
1
2
3
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

结合源代码,可得acquire(int arg)方法流程图,如下:

img

共享式

多个线程可同时执行,如Semaphore/CountDownLatch等都是共享式的产物。

获取同步状态时,调用 acquireShared 方法,该方法调用 tryAcquireShared 方法尝试获取同步状态,返回值为 int 类型,返回值不小于于 0 表示能获取同步状态。因此在共享式获取锁的自旋过程中,成功获取同步状态并退出自旋的条件就是该方法的返回值不小于0。

释放同步状态时,调用 releaseShared 方法,释放后会唤醒后续处于等待状态的节点。它和独占式的区别在于 tryReleaseShared 方法必须确保同步状态安全释放,通过循环 CAS 保证,因为释放同步状态的操作会同时来自多个线程。

acquireShared(long arg)是共享式获取同步状态的方法,可以看一下源码:

1
2
3
4
5
  public final void acquireShared(long arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
复制代码

由上可得,先调用tryAcquireShared(int arg)方法尝试获取同步状态,如果获取失败,调用doAcquireShared(int arg)自旋方式获取同步状态,方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 private void doAcquireShared(long arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
复制代码

https://www.javadoop.com/

模板模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:

1
2
3
4
5
6

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS 类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock()时,会调用 tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire()时就会失败,直到 A 线程 unlock()到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state 是能回到零态的。

再以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown()一次,state 会 CAS(Compare and Swap)减 1。等到所有子线程都执行完后(即 state=0),会 unpark()主调用线程,然后主调用线程就会从 await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

自定义同步器。

基于以上分析,我们都知道state,CLH队列,ConditionObject队列 等这些关键点,你要实现自定义锁的话,首先需要确定你要实现的是独占锁还是共享锁,定义原子变量state的含义,再定义一个内部类去继承AQS,重写对应的模板方法

我们来看一下基于 AQS 实现的不可重入的独占锁的demo,来自《Java并发编程之美》:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class NonReentrantLock implements Lock,Serializable{

//内部类,自定义同步器
static class Sync extends AbstractQueuedSynchronizer {
//是否锁已经被持有
public boolean isHeldExclusively() {
return getState() == 1;
}
//如果state为0 则尝试获取锁
public boolean tryAcquire(int arg) {
assert arg== 1 ;
//CAS设置状态,能保证操作的原子性,当前为状态为0,操作成功状态改为1
if(compareAndSetState(0, 1)){
//设置当前独占的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//尝试释放锁,设置state为0
public boolean tryRelease(int arg) {
assert arg ==1;
//如果同步器同步器状态等于0,则抛出监视器非法状态异常
if(getState() == 0)
throw new IllegalMonitorStateException();
//设置独占锁的线程为null
setExclusiveOwnerThread(null);
//设置同步状态为0
setState(0);
return true;
}
//返回Condition,每个Condition都包含了一个Condition队列
Condition newCondition(){
return new ConditionObject();
}
}
//创建一个Sync来做具体的工作
private final Sync sync= new Sync ();

@Override
public void lock() {
sync.acquire(1);
}

public boolean isLocked() {
return sync.isHeldExclusively();
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
sync.release(1);
}


@Override
public Condition newCondition() {
return sync.newCondition();
}
}
复制代码

NonReentrantLockDemoTest:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NonReentrantLockDemoTest {

private static NonReentrantLock nonReentrantLock = new NonReentrantLock();

public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
nonReentrantLock.lock();
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
nonReentrantLock.unlock();
}
});
thread.start();
}
}
}

复制代码

运行结果:

img

作者:Jay_huaxiao
链接:https://juejin.im/post/5d34502cf265da1baf7d27aa
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class test {

public static void main(String[] args) {
Lock lock=new ReentrantLock();
Condition a=lock.newCondition();
Thread t1=new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
try {
System.out.println("wo shi t1 aaa");

a.signal();
} finally {
lock.unlock();
}
}
});
Thread t2=new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
try {
System.out.println("wo shi t2 ");
a.await();
System.out.println("wo shi t2 aaaa");

a.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
});
Thread t3=new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
try {
// Thread.sleep(2000);
System.out.println("wo shi t3");
a.await();
System.out.println("wo shi t3 aaaa");
a.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
});
t1.start();
t2.start();
t3.start();

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class MyLock implements Lock {

private Helper helper=new Helper();

private class Helper extends AbstractQueuedSynchronizer{
//获取锁
@Override
protected boolean tryAcquire(int arg) {
int state=getState();
if(state==0){
//利用CAS原理修改state
if(compareAndSetState(0,arg)){
//设置当前线程占有资源
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}else if(getExclusiveOwnerThread()==Thread.currentThread()){
setState(getState()+arg);
return true;
}
return false;
}

//释放锁
@Override
protected boolean tryRelease(int arg) {
int state=getState()-arg;
boolean flag=false;
//判断释放后是否为0
if(state==0){
setExclusiveOwnerThread(null);
setState(state);
return true;
}
setState(state);//存在线程安全吗?重入性的问题,当前已经独占了资源()state
return false;
}

public Condition newConditionObjecct(){
return new ConditionObject();
}
}
@Override
public void lock() {
helper.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
helper.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return helper.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return helper.tryAcquireNanos(1,unit.toNanos(time));
}

@Override
public void unlock() {
helper.release(1);
}

@Override
public Condition newCondition() {
return helper.newConditionObjecct();
}
}

Lock Support

LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。java锁和同步器框架的核心 AQS: AbstractQueuedSynchronizer,就是通过调用 LockSupport .park()和 LockSupport .unpark()实现线程的阻塞和解除阻塞的。LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。

LockSupport类是Java6(JSR166-JUC)引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:

  • park:阻塞当前线程(Block current thread),字面理解park,就算占住,停车的时候不就把这个车位给占住了么?起这个名字还是很形象的。
  • unpark: 使给定的线程停止阻塞(Unblock the given thread blocked )。

两个函数声明清楚地说明了操作对象:park函数是将当前Thread阻塞,而unpark函数则是将指定线程Thread唤醒

与Object类的wait/notify机制相比,park/unpark有两个优点:

\1. 以thread为操作对象更符合阻塞线程的直观定义;

\2. 操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性。

Unsafe.park和Unsafe.unpark的底层实现原理

在Linux系统下,是用的Posix线程库pthread中的mutex(互斥量),condition(条件变量)来实现的。
mutex和condition保护了一个_counter的变量,当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。

在Linux系统下,是用的Posix线程库pthread中的mutex(互斥量),condition(条件变量)来实现的。且在 park unpark 过程中,保护了一个_counter的变量。

当调用park时,先尝试能否直接拿到“许可”,即判断_counter>0时,如果成功,则把_counter设置为0,并返回。否则进入等待。

当调用 unpark 时,直接设置_counter为1,再unlock mutex返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程。
https://blog.csdn.net/hugo_lei/article/details/105813614

源码:
每个Java线程都有一个Parker实例,Parker类是这样定义的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Parker : public os::PlatformParker {  
private:
volatile int _counter ;
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
class PlatformParker : public CHeapObj<mtInternal> {
protected:
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
...
}

复制代码

可以看到Parker类实际上用Posix的mutex,condition来实现的。
在Parker类里的_counter字段,就是用来记录“许可”的。

  • park 过程

当调用park时,先尝试能否直接拿到“许可”,即_counter>0时,如果成功,则把_counter设置为0,并返回:

复制代码

1
2
3
4
5
6
7
8
9
10
11
void Parker::park(bool isAbsolute, jlong time) {  

// Ideally we'd do something useful while spinning, such
// as calling unpackTime().

// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.

if (Atomic::xchg(0, &_counter) > 0) return;

复制代码

如果不成功,则构造一个ThreadBlockInVM,然后检查_counter是不是>0,如果是,则把_counter设置为0,unlock mutex并返回:

1
2
3
4
ThreadBlockInVM tbivm(jt);  
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);

否则,再判断等待的时间,然后再调用pthread_cond_wait函数等待,如果等待返回,则把_counter设置为0,unlock mutex并返回:

复制代码

1
2
3
4
5
6
7
if (time == 0) {  
status = pthread_cond_wait (_cond, _mutex) ;
}
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
OrderAccess::fence();

复制代码

  • unpark 过程

当unpark时,则简单多了,直接设置_counter为1,再unlock mutex返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程:

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void Parker::unpark() {  
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}

复制代码