并发编程-阻塞队列
BlockingQueue
阻塞队列本身也是队列,但是它是适用于多线程环境下的,基于ReentrantLock实现的。
接口定义如下:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
// 入队,如果队列已满,返回false否则返回true(非阻塞)
boolean offer(E e);
// 入队,如果队列已满,阻塞线程直到能入队为止
void put(E e) throws InterruptedException;
// 入队,如果队列已满,阻塞线程直到能入队或超时、中断为止,入队成功返回true否则false
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 出队,如果队列为空,阻塞线程直到能出队为止
E take() throws InterruptedException;
// 出队,如果队列为空,阻塞线程直到能出队超时、中断为止,出队成功正常返回,否则返回null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 返回此队列理想情况下(在没有内存或资源限制的情况下)可以不阻塞地入队的数量,如果没有限制,则返回 Integer.MAX_VALUE
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
// 一次性从BlockingQueue中获取所有可用的数据对象(还可以指定获取数据的个数)
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
操作 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞一段时间 |
---|---|---|---|---|
放数据 | add() | offer() | put() | offer(e, time, unit) |
取数据(同时删除数据) | remove() | poll() | take() | poll(time, unit) |
取数据(不删除) | element() | peek() | 不支持 | 不支持 |
BlockingQueue
有5个常见的实现类(应用于不同的场景)。
- ArrayBlockingQueue:基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。
- LinkedBlockingQueue:无界带缓冲阻塞队列(没有容量限制,也可以限制容量,也会阻塞,链表实现)。
- SynchronousQueue:无缓冲阻塞队列(相当于没有容量的ArrayBlockingQueue,因此只有阻塞的情况)。
- PriorityBlockingQueue - 是一个支持优先级的阻塞队列,元素的获取顺序按优先级决定;是无界队列。
- DelayQueue - 基于PriorityQueue实现的,是无界队列;它能够实现延迟获取元素,同样支持优先级。
ArrayBlockingQueue
ArrayBlockingQueue
队列具有以下特点:
- ArrayBlockingQueue实现了BlockingQueue接口,提供了四组放数据和读数据的方法,来满足不同的场景。
- ArrayBlockingQueue底层基于数组实现,采用循环数组,提升了数组的空间利用率。
- ArrayBlockingQueue初始化的时候,必须指定队列长度,是有界的阻塞队列,所以要预估好队列长度,保证生产者和消费者速率相匹配。
- ArrayBlockingQueue的方法是线程安全的,使用ReentrantLock在操作前后加锁释放锁来线程安全。
public class ArrayBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 用来存放数据的数组
*/
final Object[] items;
/**
* 下次取数据的数组下标位置
*/
int takeIndex;
/**
* 下次放数据的数组下标位置
*/
int putIndex;
/**
* 元素个数
*/
int count;
/**
* 独占锁,用来保证存取数据安全
*/
final ReentrantLock lock;
/**
* 取数据的条件
*/
private final Condition notEmpty;
/**
* 放数据的条件
*/
private final Condition notFull;
}
ArrayBlockingQueue
底层是基于数组实现的,使用对象数组items存储元素。
为了实现队列特性(一端插入,另一端删除),定义了两个指针,takeIndex表示下次取数据的位置,putIndex表示下次放数据的位置。
另外ArrayBlockingQueue
还使用ReentrantLock
保证线程安全,并且定义了两个条件 notEmpty notFull,当条件满足的时候才允许放数据或者取数据。
构造方法
ArrayBlockingQueue
常用的初始化方法有两个:
- 指定容量大小
- 指定容量大小和是否是公平锁
/**
* 指定容量大小的构造方法
*/
BlockingQueue<Integer> blockingDeque1 = new ArrayBlockingQueue<>(1);
/**
* 指定容量大小、公平锁的构造方法
*/
BlockingQueue<Integer> blockingDeque1 = new ArrayBlockingQueue<>(1, true);
源码实现:
/**
* 指定容量大小的构造方法(默认是非公平锁)
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 指定容量大小、公平锁的构造方法
*
* @param capacity 数组容量
* @param fair 是否是公平锁
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
放数据的方法有四个:
操作 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞一段时间 |
---|---|---|---|---|
放数据 | add() | offer() | put() | offer(e, time, unit) |
put方法
put()方法在数组满的时候,会一直阻塞,直到有其他线程取走数据,空出位置,才能添加成功。
/**
* put方法入口
*
* @param e 元素
*/
public void put(E e) throws InterruptedException {
// 1. 判空,传参不允许为null
checkNotNull(e);
// 2. 加可中断的锁,防止一直阻塞
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 3. 如果队列已满,就一直阻塞,直到被唤醒
while (count == items.length) {
notFull.await();
}
// 4. 如果队列未满,直接入队
enqueue(e);
} finally {
// 5. 释放锁
lock.unlock();
}
}
add方法
/**
* add方法入口
*
* @param e 元素
* @return 是否添加成功
*/
public boolean add(E e) {
if (offer(e)) {
return true;
} else {
throw new IllegalStateException("Queue full");
}
}
add()方法在数组满的时候,会抛出异常,底层基于offer()实现。
offer方法
/**
* offer方法入口
*
* @param e 元素
* @return 是否插入成功
*/
public boolean offer(E e) {
// 1. 判空,传参不允许为null
checkNotNull(e);
// 2. 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 3. 判断数组是否已满,如果满了就直接返回false结束
if (count == items.length) {
return false;
} else {
// 4. 否则就插入
enqueue(e);
return true;
}
} finally {
// 5. 释放锁
lock.unlock();
}
}
/**
* 入队
*
* @param x 元素
*/
private void enqueue(E x) {
// 1. 获取数组
final Object[] items = this.items;
// 2. 直接放入数组
items[putIndex] = x;
// 3. 移动putIndex位置,如果到达数组的末尾就从头开始
if (++putIndex == items.length) {
putIndex = 0;
}
// 4. 计数
count++;
// 5. 唤醒因为队列为空,等待取数据的线程
notEmpty.signal();
}
offer()在数组满的时候,会返回false,表示添加失败。 为了循环利用数组,添加元素的时候如果已经到了队尾,就从队头重新开始,相当于一个循环队列。
offer(e, time, unit)方法
在数组满的时候, offer(e, time, unit)方法会阻塞一段时间。
/**
* offer方法入口
*
* @param e 元素
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否添加成功
*/
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 1. 判空,传参不允许为null
checkNotNull(e);
// 2. 把超时时间转换为纳秒
long nanos = unit.toNanos(timeout);
// 3. 加可中断的锁,防止一直阻塞
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 4. 循环判断队列是否已满
while (count == items.length) {
if (nanos <= 0) {
// 6. 如果队列已满,且超时时间已过,则返回false
return false;
}
// 5. 如果队列已满,则等待指定时间
nanos = notFull.awaitNanos(nanos);
}
// 7. 如果队列未满,则入队
enqueue(e);
return true;
} finally {
// 8. 释放锁
lock.unlock();
}
}
取数据(取出数据并删除)的方法有四个:
操作 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞一段时间 |
---|---|---|---|---|
取数据(同时删除数据) | remove() | poll() | take() | poll(time, unit) |
poll方法
poll()方法在弹出元素的时候如果数组为空则返回null,表示弹出失败(取数据跟放数据一样都是循环遍历数组)。
/**
* poll方法入口
*/
public E poll() {
// 1. 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 2. 如果数组为空,则返回null,否则返回队列头部元素
return (count == 0) ? null : dequeue();
} finally {
// 3. 释放锁
lock.unlock();
}
}
/**
* 出列
*/
private E dequeue() {
// 1. 取出队列头部元素
final Object[] items = this.items;
E x = (E) items[takeIndex];
// 2. 取出元素后,把该位置置空
items[takeIndex] = null;
// 3. 移动takeIndex位置,如果到达数组的末尾就从头开始
if (++takeIndex == items.length) {
takeIndex = 0;
}
// 4. 元素个数减一
count--;
if (itrs != null) {
itrs.elementDequeued();
}
// 5. 唤醒因为队列已满,等待放数据的线程
notFull.signal();
return x;
}
remove方法
如果数组为空,remove()会抛出异常。
/**
* remove方法入口
*/
public E remove() {
// 1. 直接调用poll方法
E x = poll();
// 2. 如果取到数据,直接返回,否则抛出异常
if (x != null) {
return x;
} else {
throw new NoSuchElementException();
}
}
take方法
如果数组为空,take()方法就一直阻塞,直到被唤醒。
/**
* take方法入口
*/
public E take() throws InterruptedException {
// 1. 加可中断的锁,防止一直阻塞
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 2. 如果数组为空,就一直阻塞,直到被唤醒
while (count == 0) {
notEmpty.await();
}
// 3. 如果数组不为空,就从数组中取数据
return dequeue();
} finally {
// 4. 释放锁
lock.unlock();
}
}
poll(time, unit)方法
在数组满的时候, poll(time, unit)方法会阻塞一段时间。
/**
* poll方法入口
*
* @param timeout 超时时间
* @param unit 时间单位
* @return 元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 1. 把超时时间转换成纳秒
long nanos = unit.toNanos(timeout);
// 2. 加可中断的锁,防止一直阻塞
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 3. 如果数组为空,就开始阻塞
while (count == 0) {
if (nanos <= 0) {
// 5. 如果数组为空,且超时时间已过,则返回null
return null;
}
// 4. 阻塞到到指定时间
nanos = notEmpty.awaitNanos(nanos);
}
// 6. 如果数组不为空,则出列
return dequeue();
} finally {
// 7. 释放锁
lock.unlock();
}
}
取数据的方法
操作 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞一段时间 |
---|---|---|---|---|
取数据(不删除) | element() | peek() | 不支持 | 不支持 |
peek方法
/**
* peek方法入口
*/
public E peek() {
// 1. 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 2. 返回数组头部元素,如果数组为空,则返回null
return itemAt(takeIndex);
} finally {
// 3. 释放锁
lock.unlock();
}
}
/**
* 返回当前位置元素
*/
final E itemAt(int i) {
return (E) items[i];
}
element方法
/**
* element方法入口
*/
public E element() {
// 1. 调用peek方法查询数据
E x = peek();
// 2. 如果查到数据,直接返回
if (x != null) {
return x;
} else {
// 3. 如果没找到,则抛出异常
throw new NoSuchElementException();
}
}
LinkedBlockingQueue
LinkedBlockingQueue
队列具有以下特点:
LinkedBlockingQueue
实现了BlockingQueue
接口,提供了四组放数据和读数据的方法,来满足不同的场景。LinkedBlockingQueue
底层基于链表实现,支持从头部弹出数据,从尾部添加数据。LinkedBlockingQueue
初始化的时候,如果不指定队列长度,默认长度是Integer最大值,有内存溢出风险,建议初始化的时候指定队列长度。LinkedBlockingQueue
的方法是线程安全的,分别使用了读写两把锁,比ArrayBlockingQueue
性能更好。
Java线程池中的固定大小线程池就是基于
LinkedBlockingQueue
实现的。
// 创建固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 底层使用LinkedBlockingQueue队列存储任务
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public class LinkedBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 容量大小
*/
private final int capacity;
/**
* 元素个数
*/
private final AtomicInteger count = new AtomicInteger();
/**
* 头节点
*/
transient Node<E> head;
/**
* 尾节点
*/
private transient Node<E> last;
/**
* 取数据的锁
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
* 取数据的条件(队列非空)
*/
private final Condition notEmpty = takeLock.newCondition();
/**
* 放数据的锁
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
* 放数据的条件(队列非满)
*/
private final Condition notFull = putLock.newCondition();
/**
* 链表节点类
*/
static class Node<E> {
/**
* 节点元素
*/
E item;
/**
* 后继节点
*/
Node<E> next;
Node(E x) {
item = x;
}
}
}
LinkedBlockingQueue
底层是基于链表实现的,定义了头节点head和尾节点last,由链表节点类Node可以看出是个单链表。
ArrayBlockingQueue
中只使用了一把锁,入队出队操作共用这把锁。而LinkedBlockingQueue则使用了两把锁,分别是出队锁takeLock和入队锁putLock,为什么要这么设计呢?
LinkedBlockingQueue
把两把锁分开,性能更好,为什么ArrayBlockingQueue
不这样设计呢?
原因是ArrayBlockingQueue
是基于数组实现的,所有数据都存储在同一个数组对象里面,对同一个对象没办法使用两把锁,会有数据可见性的问题。而LinkedBlockingQueue
底层是基于链表实现的,从头节点删除,尾节点插入,头尾节点分别是两个对象,可以分别使用两把锁,提升操作性能。
构造方法
LinkedBlockingQueue常用的初始化方法有两个:
- 无参构造方法
- 指定容量大小的有参构造方法
/**
* 无参构造方法
*/
BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingQueue<>();
/**
* 指定容量大小的构造方法
*/
BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingQueue<>(10);
源码实现:
/**
* 无参构造方法
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 指定容量大小的构造方法
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException();
}
// 设置容量大小,初始化头尾结点
this.capacity = capacity;
last = head = new Node<E>(null);
}
LinkedBlockingQueue的无参构造方法使用的链表容量是Integer的最大值,存储大量数据的时候,会有内存溢出的风险,建议使用有参构造方法,指定容量大小。
有参构造方法还会初始化头尾节点,节点值为null。
LinkedBlockingQueue初始化的时候,不支持指定是否使用公平锁,只能使用非公平锁,而ArrayBlockingQueue
是支持指定的。
操作 | 抛出异常 | 返回特定值 | 一直阻塞 | 阻塞指定时间 |
---|---|---|---|---|
放数据 | add() | offer() | put() | offer(e, time, unit) |
取数据(同时删除数据) | remove() | poll() | take() | poll(time, unit) |
取数据(不删除) | element() | peek() | 不支持 | 不支持 |
这四组方法的区别是:
- 当队列满的时候,再次添加数据,add()会抛出异常,offer()会返回false,put()会一直阻塞,offer(e, time, unit)会阻塞指定时间,然后返回false。
- 当队列为空的时候,再次取数据,remove()会抛出异常,poll()会返回null,take()会一直阻塞,poll(time, unit)会阻塞指定时间,然后返回null。
offer方法
在链表尾部插入。 offer()方法在队列满的时候,会直接返回false,表示插入失败。
追加元素到链表末尾,如果是第一次添加元素,就唤醒因为队列为空而等待取数据的线程。
/**
* offer方法入口
*
* @param e 元素
* @return 是否插入成功
*/
public boolean offer(E e) {
// 1. 判空,传参不允许为null
if (e == null) {
throw new NullPointerException();
}
// 2. 如果队列已满,则直接返回false,表示插入失败
final AtomicInteger count = this.count;
if (count.get() == capacity) {
return false;
}
int c = -1;
Node<E> node = new Node<E>(e);
// 3. 获取put锁,并加锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 4. 加锁后,再次判断队列是否已满,如果未满,则入队
if (count.get() < capacity) {
enqueue(node);
// 5. 队列个数加一
c = count.getAndIncrement();
// 6. 如果队列未满,则唤醒因为队列已满而等待放数据的线程(用来补偿,不加也行)
if (c + 1 < capacity) {
notFull.signal();
}
}
} finally {
// 7. 释放锁
putLock.unlock();
}
// 8. c等于0,表示插入前,队列为空,是第一次插入,需要唤醒因为队列为空而等待取数据的线程
if (c == 0) {
signalNotEmpty();
}
// 9. 返回是否插入成功
return c >= 0;
}
/**
* 入队
*
* @param node 节点
*/
private void enqueue(LinkedBlockingQueue.Node<E> node) {
// 直接追加到链表末尾
last = last.next = node;
}
/**
* 唤醒因为队列为空而等待取数据的线程
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
add方法
add()方法在数组满的时候,会抛出异常,底层基于offer()实现。
/**
* add方法入口
*
* @param e 元素
* @return 是否添加成功
*/
public boolean add(E e) {
if (offer(e)) {
return true;
} else {
throw new IllegalStateException("Queue full");
}
}
put方法
put()方法在数组满的时候,会一直阻塞,直到有其他线程取走数据,空出位置,才能添加成功。
/**
* put方法入口
*
* @param e 元素
*/
public void put(E e) throws InterruptedException {
// 1. 判空,传参不允许为null
if (e == null) {
throw new NullPointerException();
}
int c = -1;
Node<E> node = new Node<E>(e);
// 2. 加可中断的锁,防止一直阻塞
final ReentrantLock putLock = this.putLock;
putLock.lockInterruptibly();
final AtomicInteger count = this.count;
try {
// 3. 如果队列已满,就一直阻塞,直到被唤醒
while (count.get() == capacity) {
notFull.await();
}
// 4. 如果队列未满,则直接入队
enqueue(node);
c = count.getAndIncrement();
// 5. 如果队列未满,则唤醒因为队列已满而等待放数据的线程(用来补偿,不加也行)
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
// 6. 释放锁
putLock.unlock();
}
// 7. c等于0,表示插入前,队列为空,是第一次插入,需要唤醒因为队列为空而等待取数据的线程
if (c == 0) {
signalNotEmpty();
}
}
offer(e, time, unit)方法
在数组满的时候, offer(e, time, unit)方法会阻塞一段时间。
/**
* offer方法入口
*
* @param e 元素
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否添加成功
*/
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 1. 判空,传参不允许为null
if (e == null) {
throw new NullPointerException();
}
// 2. 把超时时间转换为纳秒
long nanos = unit.toNanos(timeout);
int c = -1;
final AtomicInteger count = this.count;
// 2. 加可中断的锁,防止一直阻塞
final ReentrantLock putLock = this.putLock;
putLock.lockInterruptibly();
try {
// 4. 循环判断队列是否已满
while (count.get() == capacity) {
if (nanos <= 0) {
// 6. 如果队列已满,且超时时间已过,则返回false
return false;
}
// 5. 如果队列已满,则等待指定时间
nanos = notFull.awaitNanos(nanos);
}
// 7. 如果队列未满,则入队
enqueue(new Node<E>(e));
// 8. 如果队列未满,则唤醒因为队列已满而等待放数据的线程(用来补偿,不加也行)
c = count.getAndIncrement();
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
// 9. 释放锁
putLock.unlock();
}
// 10. c等于0,表示插入前,队列为空,是第一次插入,需要唤醒因为队列为空而等待取数据的线程
if (c == 0) {
signalNotEmpty();
}
return true;
}
poll方法
poll()方法取数据都是从链表头部弹出元素。 poll()方法在弹出元素的时候,如果队列为空,直接返回null,表示弹出失败。
/**
* poll方法入口
*/
public E poll() {
// 如果队列为空,则返回null
final AtomicInteger count = this.count;
if (count.get() == 0) {
return null;
}
E x = null;
int c = -1;
// 2. 加锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 3. 如果队列不为空,则取出队头元素
if (count.get() > 0) {
x = dequeue();
// 4. 元素个数减一
c = count.getAndDecrement();
// 5. 如果队列不为空,则唤醒因为队列为空而等待取数据的线程
if (c > 1) {
notEmpty.signal();
}
}
} finally {
// 6. 释放锁
takeLock.unlock();
}
// 7. 如果取数据之前,队列已满,取数据之后队列肯定不满了,则唤醒因为队列已满而等待放数据的线程
if (c == capacity) {
signalNotFull();
}
return x;
}
/**
* 取出队头元素
*/
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* 唤醒因为队列已满而等待放数据的线程
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
remove方法
/**
* remove方法入口
*/
public E remove() {
// 1. 直接调用poll方法
E x = poll();
// 2. 如果取到数据,直接返回,否则抛出异常
if (x != null) {
return x;
} else {
throw new NoSuchElementException();
}
}
take方法
如果队列为空,take()方法就一直阻塞,直到被唤醒。
/**
* take方法入口
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// 1. 加可中断的锁,防止一直阻塞
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 2. 如果队列为空,就一直阻塞,直到被唤醒
while (count.get() == 0) {
notEmpty.await();
}
// 3. 如果队列不为空,则取出队头元素
x = dequeue();
// 4. 队列元素个数减一
c = count.getAndDecrement();
// 5. 如果队列不为空,则唤醒因为队列为空而等待取数据的线程
if (c > 1) {
notEmpty.signal();
}
} finally {
// 6. 释放锁
takeLock.unlock();
}
// 7. 如果取数据之前,队列已满,取数据之后队列肯定不满了,则唤醒因为队列已满而等待放数据的线程
if (c == capacity) {
signalNotFull();
}
return x;
}
poll(time, unit)方法
在队列满的时候, poll(time, unit)方法会阻塞指定时间,然后返回null。
/**
* poll方法入口
*
* @param timeout 超时时间
* @param unit 时间单位
* @return 元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
// 1. 把超时时间转换成纳秒
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
// 2. 加可中断的锁,防止一直阻塞
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 3. 循环判断队列是否为空
while (count.get() == 0) {
if (nanos <= 0) {
// 5. 如果队列为空,且超时时间已过,则返回null
return null;
}
// 4. 阻塞到到指定时间
nanos = notEmpty.awaitNanos(nanos);
}
// 6. 如果队列不为空,则取出队头元素
x = dequeue();
// 7. 队列元素个数减一
c = count.getAndDecrement();
// 8. 如果队列不为空,则唤醒因为队列为空而等待取数据的线程
if (c > 1) {
notEmpty.signal();
}
} finally {
// 9. 释放锁
takeLock.unlock();
}
// 7. 如果取数据之前,队列已满,取数据之后队列肯定不满了,则唤醒因为队列已满而等待放数据的线程
if (c == capacity) {
signalNotFull();
}
return x;
}
peek方法
如果数组为空,直接返回null。
/**
* peek方法入口
*/
public E peek() {
// 1. 如果队列为空,则返回null
if (count.get() == 0) {
return null;
}
// 2. 加锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 3. 取出队头元素
Node<E> first = head.next;
if (first == null) {
return null;
} else {
return first.item;
}
} finally {
// 4. 释放锁
takeLock.unlock();
}
}
element方法
如果队列为空,则抛出异常。
/**
* element方法入口
*/
public E element() {
// 1. 调用peek方法查询数据
E x = peek();
// 2. 如果查到数据,直接返回
if (x != null) {
return x;
} else {
// 3. 如果没找到,则抛出异常
throw new NoSuchElementException();
}
}
SynchronousQueue
SynchronousQueue
被称为同步队列
,当生产者往队列中放元素的时候,必须等待消费者把这个元素取走,否则一直阻塞。消费者取元素的时候,同理也必须等待生产者放队列中放元素。(同步生产-消费)
Java线程池中的带缓存的线程池就是基于
SynchronousQueue
实现的。
// 创建带缓存的线程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 底层使用SynchronousQueue队列处理任务
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
SynchronousQueue
队列具有以下特点:
SynchronousQueue
实现了BlockingQueue
接口,提供了四组放数据和读数据的方法,来满足不同的场景。SynchronousQueue
底层有两种实现方式,分别是基于栈实现非公平策略,以及基于队列实现的公平策略。SynchronousQueue
初始化的时候,可以指定使用公平策略还是非公平策略。SynchronousQueue
不存储元素,不适合作为缓存队列使用。适用于生产者与消费者速度相匹配的场景,可减少任务执行的等待时间。
SynchronousQueue
底层是基于Transferer抽象类实现的,放数据和取数据的逻辑都耦合在transfer()方法中。而Transferer抽象类又有两个实现类,分别是基于栈结构实现和基于队列实现。
public class SynchronousQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 转接器(栈和队列的父类)
*/
abstract static class Transferer<E> {
/**
* 转移(put和take都用这一个方法)
*
* @param e 元素
* @param timed 是否超时
* @param nanos 纳秒
*/
abstract E transfer(E e, boolean timed, long nanos);
}
/**
* 栈实现类
*/
static final class TransferStack<E> extends Transferer<E> {
}
/**
* 队列实现类
*/
static final class TransferQueue<E> extends Transferer<E> {
}
}
构造方法
SynchronousQueue
常用的初始化方法有两个:
- 无参构造方法
- 指定容量大小的有参构造方法
/**
* 无参构造方法
*/
BlockingQueue<Integer> blockingQueue1 = new SynchronousQueue<>();
/**
* 有参构造方法,指定是否使用公平锁(默认使用非公平锁)
*/
BlockingQueue<Integer> blockingQueue2 = new SynchronousQueue<>(true);
对应的源码实现:
/**
* 无参构造方法
*/
public SynchronousQueue() {
this(false);
}
/**
* 有参构造方法,指定是否使用公平锁
*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue
的无参构造方法默认使用的非公平策略,有参构造方法可以指定使用公平策略。
- 公平策略,基于队列实现的是公平策略,先进先出。
- 非公平策略,基于栈实现的是非公平策略,先进后出。
TransferStack
TODO
TransferQueue
TODO
offer方法
底层都是调用的transfer()方法实现。 如果没有匹配到合适的节点,offer()方法会直接返回false,表示插入失败。
/**
* offer方法入口
*
* @param e 元素
* @return 是否插入成功
*/
public boolean offer(E e) {
// 1. 判空,传参不允许为null
if (e == null) {
throw new NullPointerException();
}
// 2. 调用底层transfer方法
return transferer.transfer(e, true, 0) != null;
}
add方法
如果没有匹配到合适的节点,add()方法会抛出异常,底层基于offer()实现。
/**
* add方法入口
*
* @param e 元素
* @return 是否添加成功
*/
public boolean add(E e) {
if (offer(e)) {
return true;
} else {
throw new IllegalStateException("Queue full");
}
}
put方法
如果没有匹配到合适的节点,put()方法会一直阻塞,直到有其他线程取走数据,才能添加成功。
/**
* put方法入口
*
* @param e 元素
*/
public void put(E e) throws InterruptedException {
// 1. 判空,传参不允许为null
if (e == null) {
throw new NullPointerException();
}
// 2. 调用底层transfer方法
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
offer(e, time, unit)方法
如果没有匹配到合适的节点, offer(e, time, unit)方法会阻塞一段时间,然后返回false。
/**
* offer方法入口
*
* @param e 元素
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否添加成功
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 1. 判空,传参不允许为null
if (e == null) {
throw new NullPointerException();
}
// 2. 调用底层transfer方法
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) {
return true;
}
if (!Thread.interrupted()) {
return false;
}
throw new InterruptedException();
}
poll方法
底层都是调用的transfer方法实现。
poll()方法在弹出元素的时候,如果没有匹配到合适的节点,直接返回null,表示弹出失败。
/**
* poll方法入口
*/
public E poll() {
// 调用底层transfer方法
return transferer.transfer(null, true, 0);
}
remove方法
如果没有匹配到合适的节点,remove()会抛出异常。
/**
* remove方法入口
*/
public E remove() {
// 1. 直接调用poll方法
E x = poll();
// 2. 如果取到数据,直接返回,否则抛出异常
if (x != null) {
return x;
} else {
throw new NoSuchElementException();
}
}
take方法
如果没有匹配到合适的节点,take()方法就一直阻塞,直到被唤醒。
/**
* take方法入口
*/
public E take() throws InterruptedException {
// 调用底层transfer方法
E e = transferer.transfer(null, false, 0);
if (e != null) {
return e;
}
Thread.interrupted();
throw new InterruptedException();
}
poll(time, unit)方法
如果没有匹配到合适的节点, poll(time, unit)方法会阻塞指定时间,然后返回null。
/**
* poll方法入口
*
* @param timeout 超时时间
* @param unit 时间单位
* @return 元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 调用底层transfer方法
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted()) {
return e;
}
throw new InterruptedException();
}
peek方法
直接返回null,SynchronousQueue
不支持这种操作。
/**
* peek方法入口
*/
public E peek() {
return null;
}
element方法
底层调用的也是peek()方法,也是不支持这种操作。
/**
* element方法入口
*/
public E element() {
// 1. 调用peek方法查询数据
E x = peek();
// 2. 如果查到数据,直接返回
if (x != null) {
return x;
} else {
// 3. 如果没找到,则抛出异常
throw new NoSuchElementException();
}
}
LinkedTransferQueue
在JDK7的时候,基于SynchronousQueue产生了一个更强大的TransferQueue,它保留了SynchronousQueue的匹配交接机制,并且与等待队列进行融合。
SynchronousQueue并没有使用锁,而是采用CAS操作保证生产者与消费者的协调,但是它没有容量,而LinkedBlockingQueue虽然是有容量且无界的,但是内部基本都是基于锁实现的,性能并不是很好,这时,我们就可以将它们各自的优点单独拿出来,揉在一起,就成了性能更高的LinkedTransferQueue
相比 SynchronousQueue
,它多了一个可以存储的队列,我们依然可以像阻塞队列那样获取队列中所有元素的值,简单来说,LinkedTransferQueue
其实就是一个多了存储队列的SynchronousQueue
。
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
// 插入时,会先检查是否有其他线程等待获取,如果是,直接进行交接,否则插入到存储队列中
// 不会像 SynchronousQueue 那样必须等一个匹配的才可以
queue.put("1");
queue.put("2");
// 直接打印所有的元素,这在SynchronousQueue下只能是空,因为单独的入队或出队操作都会被阻塞
queue.forEach(System.out::println);
}
PriorityBlockingQueue
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient Object[] queue;
private transient int size;
private PriorityQueue<E> q;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private transient volatile int allocationSpinLock;
}
TODO
DelayQueue
实现延时出队,也就是说当一个元素插入后,如果没有超过一定时间,那么是无法让此元素出队的。
DelayQueue
是一种本地延迟队列,比如希望我们的任务在5秒后执行,就可以使用DelayQueue
实现。常见的使用场景有:
- 订单10分钟内未支付,就取消。
- 缓存过期后,就删除。
- 消息的延迟发送等。
public class DelayQueue<E extends Delayed>
extends AbstractQueue<E>
implements BlockingQueue<E> {
/**
* 排它锁,用于保证线程安全
*/
private final transient ReentrantLock lock = new ReentrantLock();
/**
* 底层是基于PriorityQueue实现
*/
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* 当前线程
*/
private Thread leader = null;
/**
* 条件队列
*/
private final Condition available = lock.newCondition();
}
DelayQueue
实现了BlockingQueue
接口,是一个阻塞队列。并且DelayQueue
里面的元素需要实现Delayed
接口。使用了ReentrantLock
保证线程安全,使用了Condition
作条件队列,当队列中没有过期元素的时候,取数据的线程需要在条件队列中等待。
DelayQueue
只接受Delayed的实现类作为元素:
// 这里继承了Comparable,它支持优先级
public interface Delayed extends Comparable<Delayed> {
// 获取剩余等待时间,正数表示还需要进行等待,0或负数表示等待结束
long getDelay(TimeUnit unit);
}
示例:DelayedTask.java DelayQueueTest.java
输出结果:
java10:30:10.000 [main] INFO com.test.DelayQueueTest - 开始运行任务 10:30:11.000 [main] INFO com.test.DelayQueueTest - 任务1开始运行 10:30:13.000 [main] INFO com.test.DelayQueueTest - 任务2开始运行 10:30:15.000 [main] INFO com.test.DelayQueueTest - 任务3开始运行
/**
* 自定义延迟任务
**/
public class DelayedTask implements Delayed {
/**
* 任务到期时间
*/
private long expirationTime;
/**
* 任务
*/
private Runnable task;
public void execute() {
task.run();
}
public DelayedTask(long delay, Runnable task) {
// 到期时间 = 当前时间 + 延迟时间
this.expirationTime = System.currentTimeMillis() + delay;
this.task = task;
}
/**
* 返回延迟时间
*/
@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(expirationTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 任务列表按照到期时间排序
*/
@Override
public int compareTo(@NotNull Delayed o) {
return Long.compare(this.expirationTime, ((DelayedTask) o).expirationTime);
}
}
/**
* DelayQueue测试类
**/
@Slf4j
public class DelayQueueTest {
public static void main(String[] args) throws InterruptedException {
// 初始化延迟队列
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
// 添加3个任务,延迟时间分别是3秒、1秒、5秒
delayQueue.add(new DelayedTask(3000, () -> log.info("任务2开始运行")));
delayQueue.add(new DelayedTask(1000, () -> log.info("任务1开始运行")));
delayQueue.add(new DelayedTask(5000, () -> log.info("任务3开始运行")));
// 运行任务
log.info("开始运行任务");
while (!delayQueue.isEmpty()) {
//阻塞获取最先到期的任务
DelayedTask task = delayQueue.take();
task.execute();
}
}
}
DelayQueue
队列具有以下特点:
DelayQueue
实现了BlockingQueue
接口,提供了四组放数据和读数据的方法,来满足不同的场景。DelayQueue
底层采用组合的方式,复用PriorityQueue
的按照延迟时间排序任务的功能,实现了延迟队列。DelayQueue
是线程安全的,内部使用ReentrantLock
加锁。
Q&A
ArrayBlockingQueue
与LinkedBlockingQueue
区别是什么?
相同点:
- 都是继承自
AbstractQueue
抽象类,并实现了BlockingQueue
接口,所以两者拥有相同的读写方法,出现的地方可以相互替换。
不同点:
- 底层结构不同,
ArrayBlockingQueue
底层基于数组实现,初始化的时候必须指定数组长度,无法扩容。LinkedBlockingQueue
底层基于链表实现,链表最大长度是Integer最大值。 - 占用内存大小不同,
ArrayBlockingQueue
一旦初始化,数组长度就确定了,不会随着元素增加而改变。LinkedBlockingQueue
会随着元素越多,链表越长,占用内存越大。 - 性能不同,
ArrayBlockingQueue
的入队和出队共用一把锁,并发较低。LinkedBlockingQueue
入队和出队使用两把独立的锁,并发情况下性能更高。 - 公平锁选项,
ArrayBlockingQueue
初始化的时候,可以指定使用公平锁或者非公平锁,公平锁模式下,可以按照线程等待的顺序来操作队列。LinkedBlockingQueue
只支持非公平锁。 - 适用场景不同,
ArrayBlockingQueue
适用于明确限制队列大小的场景,防止生产速度大于消费速度的时候,造成内存溢出、资源耗尽。LinkedBlockingQueue
适用于业务高峰期可以自动扩展消费速度的场景。
阻塞队列的特性各不相同,在使用的时候该怎么选择呢?
阻塞队列实现生产者消费者模型
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
Runnable supplier = () -> {
while (true){
try {
String name = Thread.currentThread().getName();
System.err.println(time()+"生产者 "+name+" 正在准备餐品...");
TimeUnit.SECONDS.sleep(3);
System.err.println(time()+"生产者 "+name+" 已出餐!");
queue.put(new Object());
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
};
Runnable consumer = () -> {
while (true){
try {
String name = Thread.currentThread().getName();
System.out.println(time()+"消费者 "+name+" 正在等待出餐...");
queue.take();
System.out.println(time()+"消费者 "+name+" 取到了餐品。");
TimeUnit.SECONDS.sleep(4);
System.out.println(time()+"消费者 "+name+" 已经将饭菜吃完了!");
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
};
for (int i = 0; i < 2; i++) new Thread(supplier, "Supplier-"+i).start();
for (int i = 0; i < 3; i++) new Thread(consumer, "Consumer-"+i).start();
}
private static String time(){
SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
return "["+format.format(new Date()) + "] ";
}
}