Skip to content

并发编程-阻塞队列

BlockingQueue

阻塞队列本身也是队列,但是它是适用于多线程环境下的,基于ReentrantLock实现的。

接口定义如下:

java
public interface BlockingQueue<E> extends Queue<E> {
    
   	boolean add(E e);

    // 入队,如果队列已满,返回false否则返回true(非阻塞)
    boolean offer(E e);

    // 入队,如果队列已满,阻塞线程直到能入队为止
    void put(E e) throws InterruptedException;

    // 入队,如果队列已满,阻塞线程直到能入队或超时、中断为止,入队成功返回true否则false
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    // 出队,如果队列为空,阻塞线程直到能出队为止
    E take() throws InterruptedException;

    // 出队,如果队列为空,阻塞线程直到能出队超时、中断为止,出队成功正常返回,否则返回null
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    // 返回此队列理想情况下(在没有内存或资源限制的情况下)可以不阻塞地入队的数量,如果没有限制,则返回 Integer.MAX_VALUE
    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

  	// 一次性从BlockingQueue中获取所有可用的数据对象(还可以指定获取数据的个数)
    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}
操作抛出异常返回特定值阻塞阻塞一段时间
放数据add()offer()put()offer(e, time, unit)
取数据(同时删除数据)remove()poll()take()poll(time, unit)
取数据(不删除)element()peek()不支持不支持

BlockingQueue有5个常见的实现类(应用于不同的场景)。

  • ArrayBlockingQueue:基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。
  • LinkedBlockingQueue:无界带缓冲阻塞队列(没有容量限制,也可以限制容量,也会阻塞,链表实现)。
  • SynchronousQueue:无缓冲阻塞队列(相当于没有容量的ArrayBlockingQueue,因此只有阻塞的情况)。
  • PriorityBlockingQueue - 是一个支持优先级的阻塞队列,元素的获取顺序按优先级决定;是无界队列。
  • DelayQueue - 基于PriorityQueue实现的,是无界队列;它能够实现延迟获取元素,同样支持优先级。

ArrayBlockingQueue

ArrayBlockingQueue队列具有以下特点:

  1. ArrayBlockingQueue实现了BlockingQueue接口,提供了四组放数据和读数据的方法,来满足不同的场景。
  2. ArrayBlockingQueue底层基于数组实现,采用循环数组,提升了数组的空间利用率。
  3. ArrayBlockingQueue初始化的时候,必须指定队列长度,是有界的阻塞队列,所以要预估好队列长度,保证生产者和消费者速率相匹配。
  4. ArrayBlockingQueue的方法是线程安全的,使用ReentrantLock在操作前后加锁释放锁来线程安全。
java
public class ArrayBlockingQueue<E>
        extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * 用来存放数据的数组
     */
    final Object[] items;

    /**
     * 下次取数据的数组下标位置
     */
    int takeIndex;

    /**
     * 下次放数据的数组下标位置
     */
    int putIndex;

    /**
     * 元素个数
     */
    int count;

    /**
     * 独占锁,用来保证存取数据安全
     */
    final ReentrantLock lock;

    /**
     * 取数据的条件
     */
    private final Condition notEmpty;

    /**
     * 放数据的条件
     */
    private final Condition notFull;

}

ArrayBlockingQueue底层是基于数组实现的,使用对象数组items存储元素。

为了实现队列特性(一端插入,另一端删除),定义了两个指针,takeIndex表示下次取数据的位置,putIndex表示下次放数据的位置。

另外ArrayBlockingQueue还使用ReentrantLock保证线程安全,并且定义了两个条件 notEmpty notFull,当条件满足的时候才允许放数据或者取数据。

构造方法

ArrayBlockingQueue常用的初始化方法有两个:

  1. 指定容量大小
  2. 指定容量大小和是否是公平锁
java
/**
 * 指定容量大小的构造方法
 */
BlockingQueue<Integer> blockingDeque1 = new ArrayBlockingQueue<>(1);

/**
 * 指定容量大小、公平锁的构造方法
 */
BlockingQueue<Integer> blockingDeque1 = new ArrayBlockingQueue<>(1, true);

源码实现:

java
/**
 * 指定容量大小的构造方法(默认是非公平锁)
 */
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}


/**
 * 指定容量大小、公平锁的构造方法
 *
 * @param capacity 数组容量
 * @param fair     是否是公平锁
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0) {
        throw new IllegalArgumentException();
    }
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

放数据的方法有四个:

操作抛出异常返回特定值阻塞阻塞一段时间
放数据add()offer()put()offer(e, time, unit)

put方法

put()方法在数组满的时候,会一直阻塞,直到有其他线程取走数据,空出位置,才能添加成功。

java
/**
 * put方法入口
 *
 * @param e 元素
 */
public void put(E e) throws InterruptedException {
    // 1. 判空,传参不允许为null
    checkNotNull(e);
    // 2. 加可中断的锁,防止一直阻塞
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 3. 如果队列已满,就一直阻塞,直到被唤醒
        while (count == items.length) {
            notFull.await();
        }
        // 4. 如果队列未满,直接入队
        enqueue(e);
    } finally {
        // 5. 释放锁
        lock.unlock();
    }
}

add方法

java
/**
 * add方法入口
 *
 * @param e 元素
 * @return 是否添加成功
 */
public boolean add(E e) {
    if (offer(e)) {
        return true;
    } else {
        throw new IllegalStateException("Queue full");
    }
}

add()方法在数组满的时候,会抛出异常,底层基于offer()实现。

offer方法

java
/**
 * offer方法入口
 *
 * @param e 元素
 * @return 是否插入成功
 */
public boolean offer(E e) {
    // 1. 判空,传参不允许为null
    checkNotNull(e);
    // 2. 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 3. 判断数组是否已满,如果满了就直接返回false结束
        if (count == items.length) {
            return false;
        } else {
            // 4. 否则就插入
            enqueue(e);
            return true;
        }
    } finally {
        // 5. 释放锁
        lock.unlock();
    }
}

/**
 * 入队
 *
 * @param x 元素
 */
private void enqueue(E x) {
    // 1. 获取数组
    final Object[] items = this.items;
    // 2. 直接放入数组
    items[putIndex] = x;
    // 3. 移动putIndex位置,如果到达数组的末尾就从头开始
    if (++putIndex == items.length) {
        putIndex = 0;
    }
    // 4. 计数
    count++;
    // 5. 唤醒因为队列为空,等待取数据的线程
    notEmpty.signal();
}

offer()在数组满的时候,会返回false,表示添加失败。 为了循环利用数组,添加元素的时候如果已经到了队尾,就从队头重新开始,相当于一个循环队列。

offer(e, time, unit)方法

在数组满的时候, offer(e, time, unit)方法会阻塞一段时间。

java
/**
 * offer方法入口
 *
 * @param e       元素
 * @param timeout 超时时间
 * @param unit    时间单位
 * @return 是否添加成功
 */
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    // 1. 判空,传参不允许为null
    checkNotNull(e);
    // 2. 把超时时间转换为纳秒
    long nanos = unit.toNanos(timeout);
    // 3. 加可中断的锁,防止一直阻塞
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 4. 循环判断队列是否已满
        while (count == items.length) {
            if (nanos <= 0) {
                // 6. 如果队列已满,且超时时间已过,则返回false
                return false;
            }
            // 5. 如果队列已满,则等待指定时间
            nanos = notFull.awaitNanos(nanos);
        }
        // 7. 如果队列未满,则入队
        enqueue(e);
        return true;
    } finally {
        // 8. 释放锁
        lock.unlock();
    }
}

取数据(取出数据并删除)的方法有四个:

操作抛出异常返回特定值阻塞阻塞一段时间
取数据(同时删除数据)remove()poll()take()poll(time, unit)

poll方法

poll()方法在弹出元素的时候如果数组为空则返回null,表示弹出失败(取数据跟放数据一样都是循环遍历数组)。

java
/**
 * poll方法入口
 */
public E poll() {
    // 1. 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 2. 如果数组为空,则返回null,否则返回队列头部元素
        return (count == 0) ? null : dequeue();
    } finally {
        // 3. 释放锁
        lock.unlock();
    }
}

/**
 * 出列
 */
private E dequeue() {
    // 1. 取出队列头部元素
    final Object[] items = this.items;
    E x = (E) items[takeIndex];
    // 2. 取出元素后,把该位置置空
    items[takeIndex] = null;
    // 3. 移动takeIndex位置,如果到达数组的末尾就从头开始
    if (++takeIndex == items.length) {
        takeIndex = 0;
    }
    // 4. 元素个数减一
    count--;
    if (itrs != null) {
        itrs.elementDequeued();
    }
    // 5. 唤醒因为队列已满,等待放数据的线程
    notFull.signal();
    return x;
}

remove方法

如果数组为空,remove()会抛出异常。

java
/**
 * remove方法入口
 */
public E remove() {
    // 1. 直接调用poll方法
    E x = poll();
    // 2. 如果取到数据,直接返回,否则抛出异常
    if (x != null) {
        return x;
    } else {
        throw new NoSuchElementException();
    }
}

take方法

如果数组为空,take()方法就一直阻塞,直到被唤醒。

java
/**
 * take方法入口
 */
public E take() throws InterruptedException {
    // 1. 加可中断的锁,防止一直阻塞
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 2. 如果数组为空,就一直阻塞,直到被唤醒
        while (count == 0) {
            notEmpty.await();
        }
        // 3. 如果数组不为空,就从数组中取数据
        return dequeue();
    } finally {
        // 4. 释放锁
        lock.unlock();
    }
}

poll(time, unit)方法

在数组满的时候, poll(time, unit)方法会阻塞一段时间。

java
/**
 * poll方法入口
 *
 * @param timeout 超时时间
 * @param unit    时间单位
 * @return 元素
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 1. 把超时时间转换成纳秒
    long nanos = unit.toNanos(timeout);
    // 2. 加可中断的锁,防止一直阻塞
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 3. 如果数组为空,就开始阻塞
        while (count == 0) {
            if (nanos <= 0) {
                // 5. 如果数组为空,且超时时间已过,则返回null
                return null;
            }
            // 4. 阻塞到到指定时间
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 6. 如果数组不为空,则出列
        return dequeue();
    } finally {
        // 7. 释放锁
        lock.unlock();
    }
}

取数据的方法

操作抛出异常返回特定值阻塞阻塞一段时间
取数据(不删除)element()peek()不支持不支持

peek方法

java
/**
 * peek方法入口
 */
public E peek() {
    // 1. 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 2. 返回数组头部元素,如果数组为空,则返回null
        return itemAt(takeIndex);
    } finally {
        // 3. 释放锁
        lock.unlock();
    }
}

/**
 * 返回当前位置元素
 */
final E itemAt(int i) {
    return (E) items[i];
}

element方法

java
/**
 * element方法入口
 */
public E element() {
    // 1. 调用peek方法查询数据
    E x = peek();
    // 2. 如果查到数据,直接返回
    if (x != null) {
        return x;
    } else {
        // 3. 如果没找到,则抛出异常
        throw new NoSuchElementException();
    }
}

LinkedBlockingQueue

LinkedBlockingQueue队列具有以下特点:

  1. LinkedBlockingQueue实现了BlockingQueue接口,提供了四组放数据和读数据的方法,来满足不同的场景。
  2. LinkedBlockingQueue底层基于链表实现,支持从头部弹出数据,从尾部添加数据。
  3. LinkedBlockingQueue初始化的时候,如果不指定队列长度,默认长度是Integer最大值,有内存溢出风险,建议初始化的时候指定队列长度。
  4. LinkedBlockingQueue的方法是线程安全的,分别使用了读写两把锁,比ArrayBlockingQueue性能更好。

Java线程池中的固定大小线程池就是基于LinkedBlockingQueue实现的。

java
// 创建固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);

// 底层使用LinkedBlockingQueue队列存储任务
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
java
public class LinkedBlockingQueue<E>
        extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * 容量大小
     */
    private final int capacity;

    /**
     * 元素个数
     */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 头节点
     */
    transient Node<E> head;

    /**
     * 尾节点
     */
    private transient Node<E> last;

    /**
     * 取数据的锁
     */
    private final ReentrantLock takeLock = new ReentrantLock();

    /**
     * 取数据的条件(队列非空)
     */
    private final Condition notEmpty = takeLock.newCondition();

    /**
     * 放数据的锁
     */
    private final ReentrantLock putLock = new ReentrantLock();

    /**
     * 放数据的条件(队列非满)
     */
    private final Condition notFull = putLock.newCondition();

    /**
     * 链表节点类
     */
    static class Node<E> {
        
        /**
         * 节点元素
         */
        E item;
        
        /**
         * 后继节点
         */
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }
}

image-20240622202717389

LinkedBlockingQueue底层是基于链表实现的,定义了头节点head和尾节点last,由链表节点类Node可以看出是个单链表。

ArrayBlockingQueue中只使用了一把锁,入队出队操作共用这把锁。而LinkedBlockingQueue则使用了两把锁,分别是出队锁takeLock和入队锁putLock,为什么要这么设计呢?

LinkedBlockingQueue把两把锁分开,性能更好,为什么ArrayBlockingQueue不这样设计呢?

原因是ArrayBlockingQueue是基于数组实现的,所有数据都存储在同一个数组对象里面,对同一个对象没办法使用两把锁,会有数据可见性的问题。而LinkedBlockingQueue底层是基于链表实现的,从头节点删除,尾节点插入,头尾节点分别是两个对象,可以分别使用两把锁,提升操作性能。

构造方法

LinkedBlockingQueue常用的初始化方法有两个:

  1. 无参构造方法
  2. 指定容量大小的有参构造方法
java
/**
 * 无参构造方法
 */
BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingQueue<>();

/**
 * 指定容量大小的构造方法
 */
BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingQueue<>(10);

源码实现:

java
/**
 * 无参构造方法
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

/**
 * 指定容量大小的构造方法
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) {
        throw new IllegalArgumentException();
    }
    // 设置容量大小,初始化头尾结点
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

LinkedBlockingQueue的无参构造方法使用的链表容量是Integer的最大值,存储大量数据的时候,会有内存溢出的风险,建议使用有参构造方法,指定容量大小。

有参构造方法还会初始化头尾节点,节点值为null。

LinkedBlockingQueue初始化的时候,不支持指定是否使用公平锁,只能使用非公平锁,而ArrayBlockingQueue是支持指定的。

操作抛出异常返回特定值一直阻塞阻塞指定时间
放数据add()offer()put()offer(e, time, unit)
取数据(同时删除数据)remove()poll()take()poll(time, unit)
取数据(不删除)element()peek()不支持不支持

这四组方法的区别是:

  1. 当队列满的时候,再次添加数据,add()会抛出异常,offer()会返回false,put()会一直阻塞,offer(e, time, unit)会阻塞指定时间,然后返回false。
  2. 当队列为空的时候,再次取数据,remove()会抛出异常,poll()会返回null,take()会一直阻塞,poll(time, unit)会阻塞指定时间,然后返回null。

offer方法

在链表尾部插入。 offer()方法在队列满的时候,会直接返回false,表示插入失败。

追加元素到链表末尾,如果是第一次添加元素,就唤醒因为队列为空而等待取数据的线程。

java
/**
 * offer方法入口
 *
 * @param e 元素
 * @return 是否插入成功
 */
public boolean offer(E e) {
    // 1. 判空,传参不允许为null
    if (e == null) {
        throw new NullPointerException();
    }
    // 2. 如果队列已满,则直接返回false,表示插入失败
    final AtomicInteger count = this.count;
    if (count.get() == capacity) {
        return false;
    }
    int c = -1;
    Node<E> node = new Node<E>(e);
    // 3. 获取put锁,并加锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 4. 加锁后,再次判断队列是否已满,如果未满,则入队
        if (count.get() < capacity) {
            enqueue(node);
            // 5. 队列个数加一
            c = count.getAndIncrement();
            // 6. 如果队列未满,则唤醒因为队列已满而等待放数据的线程(用来补偿,不加也行)
            if (c + 1 < capacity) {
                notFull.signal();
            }
        }
    } finally {
        // 7. 释放锁
        putLock.unlock();
    }
    // 8. c等于0,表示插入前,队列为空,是第一次插入,需要唤醒因为队列为空而等待取数据的线程
    if (c == 0) {
        signalNotEmpty();
    }
    // 9. 返回是否插入成功
    return c >= 0;
}

/**
 * 入队
 *
 * @param node 节点
 */
private void enqueue(LinkedBlockingQueue.Node<E> node) {
    // 直接追加到链表末尾
    last = last.next = node;
}

/**
 * 唤醒因为队列为空而等待取数据的线程
 */
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

add方法

add()方法在数组满的时候,会抛出异常,底层基于offer()实现。

java
/**
 * add方法入口
 *
 * @param e 元素
 * @return 是否添加成功
 */
public boolean add(E e) {
    if (offer(e)) {
        return true;
    } else {
        throw new IllegalStateException("Queue full");
    }
}

put方法

put()方法在数组满的时候,会一直阻塞,直到有其他线程取走数据,空出位置,才能添加成功。

java
/**
 * put方法入口
 *
 * @param e 元素
 */
public void put(E e) throws InterruptedException {
    // 1. 判空,传参不允许为null
    if (e == null) {
        throw new NullPointerException();
    }
    int c = -1;
    Node<E> node = new Node<E>(e);
    // 2. 加可中断的锁,防止一直阻塞
    final ReentrantLock putLock = this.putLock;
    putLock.lockInterruptibly();
    final AtomicInteger count = this.count;
    try {
        // 3. 如果队列已满,就一直阻塞,直到被唤醒
        while (count.get() == capacity) {
            notFull.await();
        }
        // 4. 如果队列未满,则直接入队
        enqueue(node);
        c = count.getAndIncrement();
        // 5. 如果队列未满,则唤醒因为队列已满而等待放数据的线程(用来补偿,不加也行)
        if (c + 1 < capacity) {
            notFull.signal();
        }
    } finally {
        // 6. 释放锁
        putLock.unlock();
    }
    // 7. c等于0,表示插入前,队列为空,是第一次插入,需要唤醒因为队列为空而等待取数据的线程
    if (c == 0) {
        signalNotEmpty();
    }
}

offer(e, time, unit)方法

在数组满的时候, offer(e, time, unit)方法会阻塞一段时间。

java
/**
 * offer方法入口
 *
 * @param e       元素
 * @param timeout 超时时间
 * @param unit    时间单位
 * @return 是否添加成功
 */
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    // 1. 判空,传参不允许为null
    if (e == null) {
        throw new NullPointerException();
    }
    // 2. 把超时时间转换为纳秒
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final AtomicInteger count = this.count;
    // 2. 加可中断的锁,防止一直阻塞
    final ReentrantLock putLock = this.putLock;
    putLock.lockInterruptibly();
    try {
        // 4. 循环判断队列是否已满
        while (count.get() == capacity) {
            if (nanos <= 0) {
                // 6. 如果队列已满,且超时时间已过,则返回false
                return false;
            }
            // 5. 如果队列已满,则等待指定时间
            nanos = notFull.awaitNanos(nanos);
        }
        // 7. 如果队列未满,则入队
        enqueue(new Node<E>(e));
        // 8. 如果队列未满,则唤醒因为队列已满而等待放数据的线程(用来补偿,不加也行)
        c = count.getAndIncrement();
        if (c + 1 < capacity) {
            notFull.signal();
        }
    } finally {
        // 9. 释放锁
        putLock.unlock();
    }
    // 10. c等于0,表示插入前,队列为空,是第一次插入,需要唤醒因为队列为空而等待取数据的线程
    if (c == 0) {
        signalNotEmpty();
    }
    return true;
}

poll方法

poll()方法取数据都是从链表头部弹出元素。 poll()方法在弹出元素的时候,如果队列为空,直接返回null,表示弹出失败。

java
/**
 * poll方法入口
 */
public E poll() {
    // 如果队列为空,则返回null
    final AtomicInteger count = this.count;
    if (count.get() == 0) {
        return null;
    }
    E x = null;
    int c = -1;
    // 2. 加锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 3. 如果队列不为空,则取出队头元素
        if (count.get() > 0) {
            x = dequeue();
            // 4. 元素个数减一
            c = count.getAndDecrement();
            // 5. 如果队列不为空,则唤醒因为队列为空而等待取数据的线程
            if (c > 1) {
                notEmpty.signal();
            }
        }
    } finally {
        // 6. 释放锁
        takeLock.unlock();
    }
    // 7. 如果取数据之前,队列已满,取数据之后队列肯定不满了,则唤醒因为队列已满而等待放数据的线程
    if (c == capacity) {
        signalNotFull();
    }
    return x;
}

/**
 * 取出队头元素
 */
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h;
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

/**
 * 唤醒因为队列已满而等待放数据的线程
 */
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

remove方法

java
/**
 * remove方法入口
 */
public E remove() {
    // 1. 直接调用poll方法
    E x = poll();
    // 2. 如果取到数据,直接返回,否则抛出异常
    if (x != null) {
        return x;
    } else {
        throw new NoSuchElementException();
    }
}

take方法

如果队列为空,take()方法就一直阻塞,直到被唤醒。

java
/**
 * take方法入口
 */
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    // 1. 加可中断的锁,防止一直阻塞
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 2. 如果队列为空,就一直阻塞,直到被唤醒
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 3. 如果队列不为空,则取出队头元素
        x = dequeue();
        // 4. 队列元素个数减一
        c = count.getAndDecrement();
        // 5. 如果队列不为空,则唤醒因为队列为空而等待取数据的线程
        if (c > 1) {
            notEmpty.signal();
        }
    } finally {
        // 6. 释放锁
        takeLock.unlock();
    }
    // 7. 如果取数据之前,队列已满,取数据之后队列肯定不满了,则唤醒因为队列已满而等待放数据的线程
    if (c == capacity) {
        signalNotFull();
    }
    return x;
}

poll(time, unit)方法

在队列满的时候, poll(time, unit)方法会阻塞指定时间,然后返回null。

java
/**
 * poll方法入口
 *
 * @param timeout 超时时间
 * @param unit    时间单位
 * @return 元素
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    // 1. 把超时时间转换成纳秒
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    // 2. 加可中断的锁,防止一直阻塞
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 3. 循环判断队列是否为空
        while (count.get() == 0) {
            if (nanos <= 0) {
                // 5. 如果队列为空,且超时时间已过,则返回null
                return null;
            }
            // 4. 阻塞到到指定时间
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 6. 如果队列不为空,则取出队头元素
        x = dequeue();
        // 7. 队列元素个数减一
        c = count.getAndDecrement();
        // 8. 如果队列不为空,则唤醒因为队列为空而等待取数据的线程
        if (c > 1) {
            notEmpty.signal();
        }
    } finally {
        // 9. 释放锁
        takeLock.unlock();
    }
    // 7. 如果取数据之前,队列已满,取数据之后队列肯定不满了,则唤醒因为队列已满而等待放数据的线程
    if (c == capacity) {
        signalNotFull();
    }
    return x;
}

peek方法

如果数组为空,直接返回null。

java
/**
 * peek方法入口
 */
public E peek() {
    // 1. 如果队列为空,则返回null
    if (count.get() == 0) {
        return null;
    }
    // 2. 加锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 3. 取出队头元素
        Node<E> first = head.next;
        if (first == null) {
            return null;
        } else {
            return first.item;
        }
    } finally {
        // 4. 释放锁
        takeLock.unlock();
    }
}

element方法

如果队列为空,则抛出异常。

java
/**
 * element方法入口
 */
public E element() {
    // 1. 调用peek方法查询数据
    E x = peek();
    // 2. 如果查到数据,直接返回
    if (x != null) {
        return x;
    } else {
        // 3. 如果没找到,则抛出异常
        throw new NoSuchElementException();
    }
}

SynchronousQueue

SynchronousQueue被称为同步队列,当生产者往队列中放元素的时候,必须等待消费者把这个元素取走,否则一直阻塞。消费者取元素的时候,同理也必须等待生产者放队列中放元素。(同步生产-消费)

Java线程池中的带缓存的线程池就是基于SynchronousQueue实现的。

java
// 创建带缓存的线程池
ExecutorService executorService = Executors.newCachedThreadPool();

// 底层使用SynchronousQueue队列处理任务
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

SynchronousQueue队列具有以下特点:

  1. SynchronousQueue实现了BlockingQueue接口,提供了四组放数据和读数据的方法,来满足不同的场景。
  2. SynchronousQueue底层有两种实现方式,分别是基于栈实现非公平策略,以及基于队列实现的公平策略。
  3. SynchronousQueue初始化的时候,可以指定使用公平策略还是非公平策略。
  4. SynchronousQueue不存储元素,不适合作为缓存队列使用。适用于生产者与消费者速度相匹配的场景,可减少任务执行的等待时间。

SynchronousQueue底层是基于Transferer抽象类实现的,放数据和取数据的逻辑都耦合在transfer()方法中。而Transferer抽象类又有两个实现类,分别是基于栈结构实现和基于队列实现。

java
public class SynchronousQueue<E>
        extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * 转接器(栈和队列的父类)
     */
    abstract static class Transferer<E> {
        
        /**
         * 转移(put和take都用这一个方法)
         *
         * @param e     元素
         * @param timed 是否超时
         * @param nanos 纳秒
         */
        abstract E transfer(E e, boolean timed, long nanos);
        
    }

    /**
     * 栈实现类
     */
    static final class TransferStack<E> extends Transferer<E> {
    }

    /**
     * 队列实现类
     */
    static final class TransferQueue<E> extends Transferer<E> {
    }

}

构造方法

SynchronousQueue常用的初始化方法有两个:

  1. 无参构造方法
  2. 指定容量大小的有参构造方法
java
/**
 * 无参构造方法
 */
BlockingQueue<Integer> blockingQueue1 = new SynchronousQueue<>();

/**
 * 有参构造方法,指定是否使用公平锁(默认使用非公平锁)
 */
BlockingQueue<Integer> blockingQueue2 = new SynchronousQueue<>(true);

对应的源码实现:

java
/**
 * 无参构造方法
 */
public SynchronousQueue() {
    this(false);
}

/**
 * 有参构造方法,指定是否使用公平锁
 */
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

SynchronousQueue的无参构造方法默认使用的非公平策略,有参构造方法可以指定使用公平策略。

  1. 公平策略,基于队列实现的是公平策略,先进先出。
  2. 非公平策略,基于栈实现的是非公平策略,先进后出。

TransferStack

TODO

TransferQueue

TODO

offer方法

底层都是调用的transfer()方法实现。 如果没有匹配到合适的节点,offer()方法会直接返回false,表示插入失败。

java
/**
 * offer方法入口
 *
 * @param e 元素
 * @return 是否插入成功
 */
public boolean offer(E e) {
    // 1. 判空,传参不允许为null
    if (e == null) {
        throw new NullPointerException();
    }
    // 2. 调用底层transfer方法
    return transferer.transfer(e, true, 0) != null;
}

add方法

如果没有匹配到合适的节点,add()方法会抛出异常,底层基于offer()实现。

java
/**
 * add方法入口
 *
 * @param e 元素
 * @return 是否添加成功
 */
public boolean add(E e) {
    if (offer(e)) {
        return true;
    } else {
        throw new IllegalStateException("Queue full");
    }
}

put方法

如果没有匹配到合适的节点,put()方法会一直阻塞,直到有其他线程取走数据,才能添加成功。

java
/**
 * put方法入口
 *
 * @param e 元素
 */
public void put(E e) throws InterruptedException {
    // 1. 判空,传参不允许为null
    if (e == null) {
        throw new NullPointerException();
    }
    // 2. 调用底层transfer方法
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

offer(e, time, unit)方法

如果没有匹配到合适的节点, offer(e, time, unit)方法会阻塞一段时间,然后返回false。

java
/**
 * offer方法入口
 *
 * @param e       元素
 * @param timeout 超时时间
 * @param unit    时间单位
 * @return 是否添加成功
 */
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    // 1. 判空,传参不允许为null
    if (e == null) {
        throw new NullPointerException();
    }
    // 2. 调用底层transfer方法
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) {
        return true;
    }
    if (!Thread.interrupted()) {
        return false;
    }
    throw new InterruptedException();
}

poll方法

底层都是调用的transfer方法实现。

poll()方法在弹出元素的时候,如果没有匹配到合适的节点,直接返回null,表示弹出失败。

java
/**
 * poll方法入口
 */
public E poll() {
    // 调用底层transfer方法
    return transferer.transfer(null, true, 0);
}

remove方法

如果没有匹配到合适的节点,remove()会抛出异常。

java
/**
 * remove方法入口
 */
public E remove() {
    // 1. 直接调用poll方法
    E x = poll();
    // 2. 如果取到数据,直接返回,否则抛出异常
    if (x != null) {
        return x;
    } else {
        throw new NoSuchElementException();
    }
}

take方法

如果没有匹配到合适的节点,take()方法就一直阻塞,直到被唤醒。

java
/**
 * take方法入口
 */
public E take() throws InterruptedException {
    // 调用底层transfer方法
    E e = transferer.transfer(null, false, 0);
    if (e != null) {
        return e;
    }
    Thread.interrupted();
    throw new InterruptedException();
}

poll(time, unit)方法

如果没有匹配到合适的节点, poll(time, unit)方法会阻塞指定时间,然后返回null。

java
/**
 * poll方法入口
 *
 * @param timeout 超时时间
 * @param unit    时间单位
 * @return 元素
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 调用底层transfer方法
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted()) {
        return e;
    }
    throw new InterruptedException();
}

peek方法

直接返回null,SynchronousQueue不支持这种操作。

java
/**
 * peek方法入口
 */
public E peek() {
    return null;
}

element方法

底层调用的也是peek()方法,也是不支持这种操作。

java
/**
 * element方法入口
 */
public E element() {
    // 1. 调用peek方法查询数据
    E x = peek();
    // 2. 如果查到数据,直接返回
    if (x != null) {
        return x;
    } else {
        // 3. 如果没找到,则抛出异常
        throw new NoSuchElementException();
    }
}

LinkedTransferQueue

在JDK7的时候,基于SynchronousQueue产生了一个更强大的TransferQueue,它保留了SynchronousQueue的匹配交接机制,并且与等待队列进行融合。

SynchronousQueue并没有使用锁,而是采用CAS操作保证生产者与消费者的协调,但是它没有容量,而LinkedBlockingQueue虽然是有容量且无界的,但是内部基本都是基于锁实现的,性能并不是很好,这时,我们就可以将它们各自的优点单独拿出来,揉在一起,就成了性能更高的LinkedTransferQueue

相比 SynchronousQueue ,它多了一个可以存储的队列,我们依然可以像阻塞队列那样获取队列中所有元素的值,简单来说,LinkedTransferQueue其实就是一个多了存储队列的SynchronousQueue

java
public static void main(String[] args) throws InterruptedException {
    
    LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
    // 插入时,会先检查是否有其他线程等待获取,如果是,直接进行交接,否则插入到存储队列中
    // 不会像 SynchronousQueue 那样必须等一个匹配的才可以
    queue.put("1");  
    queue.put("2"); 
    // 直接打印所有的元素,这在SynchronousQueue下只能是空,因为单独的入队或出队操作都会被阻塞
    queue.forEach(System.out::println);   
}

PriorityBlockingQueue

java
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    
    private transient Object[] queue;
    
    private transient int size;
    
    private PriorityQueue<E> q;
    
    private transient Comparator<? super E> comparator;

    private final ReentrantLock lock = new ReentrantLock();

    private final Condition notEmpty = lock.newCondition();

    private transient volatile int allocationSpinLock;
}

TODO

DelayQueue

实现延时出队,也就是说当一个元素插入后,如果没有超过一定时间,那么是无法让此元素出队的。

DelayQueue是一种本地延迟队列,比如希望我们的任务在5秒后执行,就可以使用DelayQueue实现。

常见的使用场景有:

  • 订单10分钟内未支付,就取消。
  • 缓存过期后,就删除。
  • 消息的延迟发送等。
java
public class DelayQueue<E extends Delayed>
        extends AbstractQueue<E>
        implements BlockingQueue<E> {

    /**
     * 排它锁,用于保证线程安全
     */
    private final transient ReentrantLock lock = new ReentrantLock();

    /**
     * 底层是基于PriorityQueue实现
     */
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * 当前线程
     */
    private Thread leader = null;

    /**
     * 条件队列
     */
    private final Condition available = lock.newCondition();

}

DelayQueue实现了BlockingQueue接口,是一个阻塞队列。并且DelayQueue里面的元素需要实现Delayed接口。使用了ReentrantLock保证线程安全,使用了Condition作条件队列,当队列中没有过期元素的时候,取数据的线程需要在条件队列中等待。

DelayQueue只接受Delayed的实现类作为元素:

java
// 这里继承了Comparable,它支持优先级
public interface Delayed extends Comparable<Delayed> {  

    // 获取剩余等待时间,正数表示还需要进行等待,0或负数表示等待结束
    long getDelay(TimeUnit unit);
}

示例:DelayedTask.java DelayQueueTest.java

输出结果:

java
10:30:10.000 [main] INFO com.test.DelayQueueTest - 开始运行任务
10:30:11.000 [main] INFO com.test.DelayQueueTest - 任务1开始运行
10:30:13.000 [main] INFO com.test.DelayQueueTest - 任务2开始运行
10:30:15.000 [main] INFO com.test.DelayQueueTest - 任务3开始运行
java
/**
 * 自定义延迟任务
 **/
public class DelayedTask implements Delayed {

    /**
     * 任务到期时间
     */
    private long expirationTime;
    
    /**
     * 任务
     */
    private Runnable task;

    public void execute() {
        task.run();
    }

    public DelayedTask(long delay, Runnable task) {
        // 到期时间 = 当前时间 + 延迟时间
        this.expirationTime = System.currentTimeMillis() + delay;
        this.task = task;
    }

    /**
     * 返回延迟时间
     */
    @Override
    public long getDelay(@NotNull TimeUnit unit) {
        return unit.convert(expirationTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 任务列表按照到期时间排序
     */
    @Override
    public int compareTo(@NotNull Delayed o) {
        return Long.compare(this.expirationTime, ((DelayedTask) o).expirationTime);
    }
}
java
/**
 * DelayQueue测试类
 **/
@Slf4j
public class DelayQueueTest {

    public static void main(String[] args) throws InterruptedException {
        // 初始化延迟队列
        DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();

        // 添加3个任务,延迟时间分别是3秒、1秒、5秒
        delayQueue.add(new DelayedTask(3000, () -> log.info("任务2开始运行")));
        delayQueue.add(new DelayedTask(1000, () -> log.info("任务1开始运行")));
        delayQueue.add(new DelayedTask(5000, () -> log.info("任务3开始运行")));

        // 运行任务
        log.info("开始运行任务");
        while (!delayQueue.isEmpty()) {
            //阻塞获取最先到期的任务
            DelayedTask task = delayQueue.take();
            task.execute();
        }
    }
}

DelayQueue队列具有以下特点:

  1. DelayQueue实现了BlockingQueue接口,提供了四组放数据和读数据的方法,来满足不同的场景。
  2. DelayQueue底层采用组合的方式,复用PriorityQueue的按照延迟时间排序任务的功能,实现了延迟队列。
  3. DelayQueue是线程安全的,内部使用ReentrantLock加锁。

Q&A

ArrayBlockingQueueLinkedBlockingQueue区别是什么?

相同点:

  1. 都是继承自AbstractQueue抽象类,并实现了BlockingQueue接口,所以两者拥有相同的读写方法,出现的地方可以相互替换。

不同点:

  1. 底层结构不同,ArrayBlockingQueue底层基于数组实现,初始化的时候必须指定数组长度,无法扩容。LinkedBlockingQueue底层基于链表实现,链表最大长度是Integer最大值。
  2. 占用内存大小不同,ArrayBlockingQueue一旦初始化,数组长度就确定了,不会随着元素增加而改变。LinkedBlockingQueue会随着元素越多,链表越长,占用内存越大。
  3. 性能不同,ArrayBlockingQueue的入队和出队共用一把锁,并发较低。LinkedBlockingQueue入队和出队使用两把独立的锁,并发情况下性能更高。
  4. 公平锁选项,ArrayBlockingQueue初始化的时候,可以指定使用公平锁或者非公平锁,公平锁模式下,可以按照线程等待的顺序来操作队列。LinkedBlockingQueue只支持非公平锁。
  5. 适用场景不同,ArrayBlockingQueue适用于明确限制队列大小的场景,防止生产速度大于消费速度的时候,造成内存溢出、资源耗尽。LinkedBlockingQueue适用于业务高峰期可以自动扩展消费速度的场景。

阻塞队列的特性各不相同,在使用的时候该怎么选择呢?

image-20240622210909411

阻塞队列实现生产者消费者模型

java
public class Main {
    public static void main(String[] args) throws InterruptedException {
        
        BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
        Runnable supplier = () -> {
            while (true){
                try {
                    String name = Thread.currentThread().getName();
                    System.err.println(time()+"生产者 "+name+" 正在准备餐品...");
                    TimeUnit.SECONDS.sleep(3);
                    System.err.println(time()+"生产者 "+name+" 已出餐!");
                    queue.put(new Object());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        };
        Runnable consumer = () -> {
            while (true){
                try {
                    String name = Thread.currentThread().getName();
                    System.out.println(time()+"消费者 "+name+" 正在等待出餐...");
                    queue.take();
                    System.out.println(time()+"消费者 "+name+" 取到了餐品。");
                    TimeUnit.SECONDS.sleep(4);
                    System.out.println(time()+"消费者 "+name+" 已经将饭菜吃完了!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        };
        for (int i = 0; i < 2; i++) new Thread(supplier, "Supplier-"+i).start();
        for (int i = 0; i < 3; i++) new Thread(consumer, "Consumer-"+i).start();
    }

    private static String time(){
        SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
        return "["+format.format(new Date()) + "] ";
    }
}