实现阻塞队列

[TOC]

1. 什么是阻塞队列?

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

2.LinkedBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class LinkedBlockingQueue<E> {
private final int capacity;

private Lock lock = new ReentrantLock();

private Condition unfull = lock.newCondition();

private Condition unEmpty = lock.newCondition();

private int count;

private LinkedList<E> queue;

public LinkedBlockingQueue() throws InterruptedException {
this(Integer.MAX_VALUE);
}


public LinkedBlockingQueue(int capacity) throws InterruptedException {
this.capacity = capacity;
queue = new LinkedList<E>();
}


public void put(E e) throws InterruptedException {
lock.lock();
try {
while (count == capacity) {
unfull.await();//阻塞队列已满,等待
}
queue.add(e);
count++;
unEmpty.signal();
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {//队列为空,阻塞
unEmpty.await();
}
E e = queue.pop();
count--;
unfull.signal();
return e;
} finally {
lock.unlock();
}
}

}

3.ArrayBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
 

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BlockQueue<T> {

private int size;
private Object[] queue;

private Lock lock = new ReentrantLock();
private Condition full = lock.newCondition();
private Condition empty = lock.newCondition();

private int index;
private int removeIndex;
private int currLen;

public BlockQueue() {
this(10);
}

public BlockQueue(int size) {
this.index = 0;
this.removeIndex = 0;
this.currLen = 0;
this.size = size;
queue = new Object[size];
}

public void push(T element) throws InterruptedException {
lock.lock();
try {
while (currLen == size) {
System.out.println("队列满。。。");
full.await();
}
queue[index] = element;
if (++index == size) {
index = 0;
}
currLen++;
empty.signal();
} finally {
lock.unlock();
}
}

public T pop() throws InterruptedException {
lock.lock();
try {
while (currLen == 0) {
System.out.println("队列空。。。");
empty.await();
}
Object obj = queue[removeIndex];
if (++removeIndex == size) {
removeIndex = 0;
}
currLen--;
full.signal();
return (T) obj;
} finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
BlockQueue<Integer> blockQueue = new BlockQueue<Integer>(3);
blockQueue.push(1);
System.out.println(blockQueue.pop());
blockQueue.push(2);
System.out.println(blockQueue.pop());
blockQueue.push(3);
System.out.println(blockQueue.pop());

blockQueue.push(5);
blockQueue.push(5);
System.out.println(blockQueue.pop());
}

}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class test {
static BlockQueue<String> blockQueue = new BlockQueue<>(2);
static class Thread1 extends Thread{
@Override
public void run() {
try {
blockQueue.push("product..");
System.out.print("product..");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Thread2 extends Thread{
@Override
public void run() {
try {

blockQueue.pop();
System.out.print("consume..");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
for(int i = 0; i<2; i++)
{
Thread1 tmp =new Thread1();
tmp.start();
}
for(int i = 0; i<3; i++)
{
Thread2 tmp =new Thread2();
tmp.start();
}
Thread.sleep(1000);
for(int i = 0; i<1; i++)
{
Thread1 tmp =new Thread1();
tmp.start();
}

}

}