并发容器之BlockingQueue阻塞队列

BlockingQueue阻塞队列

BlockingQueue接口是在jdk5版本提供的,在线程池中用到了阻塞队列来实现,阻塞队列是深入学习线程池的基础,该队列通常是有限的容量,如果队列已满添加操作就会阻塞,如果队列为空,移除操作就会阻塞。

public interface BlockingQueue<Eextends Queue<E{
    // add/offer/put  插入数据,插入到队列尾部
  // add添加元素,如果队列已满,直接抛出异常IllegalArgumentException
    boolean add(E e);
  // 添加元素,如果队列已满,返回false
    boolean offer(E e);
    // 添加元素,如果队列已满,阻塞
    void put(E e) throws InterruptedException;
    // 超时
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException
;

  
    // take/poll/remove移除数据,移除队列头部
  // 移除元素并返回头元素,如果队列为空,阻塞
    take() throws InterruptedException;
  // 移除元素并返回头元素,如果队列为空,则返回null
  void put(E e) throws InterruptedException;
  // 超时
    poll(long timeout, TimeUnit unit)
        throws InterruptedException
;
  // 移除元素并返回头元素,如果队列为空,抛出NoSuchElementException异常
    boolean remove(Object o);
  
   int remainingCapacity();

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}

在使用阻塞队列时最好使用put()、take()以及可定时的offer()和poll(),而不要使用Queue接口中的方法,否则就丢失了阻塞的效果

BlockingQueue实现类

BlockingQueue接口有多个实现类:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue等

并发容器之BlockingQueue阻塞队列
阻塞队列

ArrayBlockingQueue

ArrayBlockingQueue类底层是由数组支持的有界阻塞队列,创建时需要指定容量,实现了FIFO(先进先出)排序机制。新添加的元素都在队列的尾部,获取操作是从队列头部进行,可以设置公平策略,默认是非公平的

源码分析

内部没有实现读写分离,生产和消费不能完全并行,长度需要定义

// 存储队列元素
final Object[] items;

/** items index for next take, poll, peek or remove */
// 出队的数组下标(下一个待取出的元素索引)
int takeIndex;

/** items index for next put, offer, or add */
// 入队的数组下标(下一个待添加的元素索引)
int putIndex;

// 队列中的元素数量
int count;
// 锁
final ReentrantLock lock;

/** Condition for waiting takes */
// 出队的条件变量(消费者监视器)
private final Condition notEmpty;

/** Condition for waiting puts */
// 入队的条件变量(生产者监视器)
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  // 默认是非公平锁
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}
重要方法

offer方法

offer方法向队列尾部添加元素,如果队列未满,则添加成功,返回true;如果队列已满则丢弃当前元素,返回false。该方法是不阻塞的

public boolean offer(E e) {
  // 如果元素为null,抛出EPN,队列中不可存储null值
    checkNotNull(e);
  // 获取锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      // 队列已满,直接返回false
        if (count == items.length)
            return false;
        else {
          // 添加元素
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  items[putIndex] = x;
  // 计算下一个元素存放的下标位置
  if (++putIndex == items.length)
    putIndex = 0;
  count++;
  // 因为添加了一个元素,所以唤醒因为没有元素而被阻塞的take方法的一个线程
  notEmpty.signal();
}

put方法

put方法向队列尾部添加元素,如果队列未满,则添加成功,返回true;如果队列已满则阻塞当前线程直到队列有空闲为止。该方法是阻塞的

public void put(E e) throws InterruptedException {
  // 如果元素为null,抛出EPN,队列中不可存储null值
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
  // 获取可中断锁
    lock.lockInterruptibly();
    try {
      // 队列已满,阻塞
        while (count == items.length)
          // 需要等待notFull.notify的唤醒
            notFull.await();
      // 如果没有满,添加元素进入队列
        enqueue(e);
    } finally { // 释放锁
        lock.unlock();
    }
}
  • 所有执行put操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁
  • 判断阻塞队列是否满了,如果满了,则调用await方法阻塞这个线程,并标记为notFull条件变量,同时释放lock锁,等待被消费者线程唤醒
  • 如果没有满,则调用enqueue方法将元素put进阻塞队列
  • 唤醒notEmpty条件变量

poll方法

从队列头获取并移除元素,如果队列为空,则直接返回null,该方法是不阻塞的

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      // 如果队列为空,则直接返回null
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  // 会移除元素
  items[takeIndex] = null;
  // 队列头指针计算
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  // 因为移除了一个元素,所以唤醒因为队列已满而被阻塞的put方法的一个线程
  notFull.signal();
  return x;
}

take方法

从队列头获取并移除元素,如果队列为空,则阻塞当前线程直到不为空返回元素,该方法是阻塞的

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
  // 获取可中断的锁
    lock.lockInterruptibly();
    try {
      // 队列为空,阻塞
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
  • 所有执行take操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁
  • 判断阻塞队列是否为空,如果是空,则调用await方法阻塞这个线程,并notEmpty.await条件变量,同时释放locak锁,等待被生产者线程唤醒
  • 如果没有空,则调用dequeue方法
  • 唤醒notFull.notify条件变量

peek方法

与poll方法类似,从队列头获取元素(不会移除元素),如果队列为空,则直接返回null,该方法是不阻塞的

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}
使用案例

logback异步打印日志使用的ArrayBlockQueue

LinkedBlockingQueue

LinkedBlockingQueue类是底层为单向链表的有界阻塞队列,默认容量为Integer.MAX_VALUE,使用先进先出FIFO,线程池中newFixedThreadPool线程池就是使用了该队列

源码分析

可以很好的处理并发数据,其内部实现采用分离锁(读写分离两个锁),可以实现生产和消费操作的完全并行运行。

// 头节点
transient Node<E> head;
//尾结点
private transient Node<E> last;
// 记录队列的个数
private final AtomicInteger count = new AtomicInteger();
// take、poll方法时获取该锁,控制元素出队,同时只有一个线程可以从队列头部获取元素,其他线程阻塞
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
// 条件变量,存放出栈被阻塞的线程
// 当队列为空时,执行出队操作的线程会被放入这个条件队列中阻塞
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
// put、offer方法时获取该锁,控制元素入队,同时只有一个线程可以在队尾添加元素,其他线程阻塞
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
// 条件变量,存放入栈被阻塞的线程
// 当队列满时,执行入队操作的线程会被放入这个条件队列中阻塞
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0throw new IllegalArgumentException();
  this.capacity = capacity;
  last = head = new Node<E>(null);
}
重要方法

offer方法

向尾部插入元素,成功插入返回true,队列已满则丢弃当前元素,返回false,该方法是不阻塞的

public boolean offer(E e) {
  // 如果元素为null,则抛出NPE,队列中不可存储null值
    if (e == nullthrow new NullPointerException();
  // 获取当前队列元素数量
    final AtomicInteger count = this.count;
  // 如果已满,则直接返回false
    if (count.get() == capacity)
        return false;
  
    int c = -1;
  // 构造新节点
    Node<E> node = new Node<E>(e);
  // 获取putLock独占锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
      // 队列不满,则进队
        if (count.get() < capacity) {
          // 进队
            enqueue(node);
          // 队列元素数量+1
            c = count.getAndIncrement();
            if (c + 1 < capacity) // 新元素入队后,队列依然没满,则唤醒notFull条件队列中的线程,通知可以进行入队操作了
                notFull.signal();
        }
    } finally {
      // 释放锁
        putLock.unlock();
    }
    if (c == 0// 说明该元素入队之前,队列为空,此时加入了一个元素,应该通知notEmpty条件队列中的线程,可以进行出队操作了
        signalNotEmpty();
    return c >= 0;
}

put方法

put方法与offer方法类似,只是如果队列已满,则阻塞当前线程,知道队列有空闲才会插入成功返回,该方法是阻塞的

public void put(E e) throws InterruptedException {
  // 如果元素为null,则抛出NPE,队列中不可存储null值
    if (e == nullthrow new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
  // 构建新节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
  // 获取当前队列元素数量
    final AtomicInteger count = this.count;
  // 获取可中断的putLock锁
    putLock.lockInterruptibly();
    try {
        // 如果队列已满,则进行等待,使用while循环防止被虚假唤醒
        while (count.get() == capacity) {
            notFull.await();
        }
       // 入队
        enqueue(node);
      // 队列中元素数量+1
        c = count.getAndIncrement();
        if (c + 1 < capacity)// 新元素入队后,队列依然没满,则唤醒notFull条件队列中的线程,通知可以进行入队操作了
            notFull.signal();
    } finally { // 释放锁
        putLock.unlock();
    }
    if (c == 0)// 说明该元素入队之前,队列为空,此时加入了一个元素,应该通知notEmpty条件队列中的线程,可以进行出队操作了
        signalNotEmpty();
}

poll方法

从队列头部获取元素,并移除该元素,如果队列为空,则直接返回null,该方法是不阻塞的

public E poll() {
  // 获取队列元素数量
    final AtomicInteger count = this.count;
  // 队列为空,则直接返回null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
  // 获取takeLock锁
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
      // 队列不为空
        if (count.get() > 0) {
          // 出队,该方法会移除所取出的元素
            x = dequeue();
          // 队列元素数量-1
            c = count.getAndDecrement();
            if (c > 1// c>1,表示该元素出队之后,至少还有一个元素,可以通知notEmpty条件队列中的线程进行唤醒,来执行出队操作
                notEmpty.signal();
        }
    } finally { // 释放锁
        takeLock.unlock();
    }
    if (c == capacity) // 在进行出队操作前队列中元素是满的,此时可能NotFull是被阻塞的,所以通知NotFull条件队列中的线程进行唤醒,来执行入队操作
        signalNotFull();
    return x;
}

// Removes a node from head of queue.
private E dequeue() {
  // assert takeLock.isHeldByCurrentThread();
  // assert head.item == null;
  Node<E> h = head;
  Node<E> first = h.next;
  h.next = h; // help GC
  head = first;
  E x = first.item;
  first.item = null;
  return x;
}

peek方法

从队列头部获取元素,但是不会移除该元素,如果队列为空,则直接返回null,该方法是不阻塞的

public E peek() {
  // 队列为空,直接返回null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
  // 获取takeLock锁
    takeLock.lock();
    try {
      // 获取头部元素
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

take方法

与poll方法类似,从队列头部获取元素,并移除该元素,但是如果队列为空,则会阻塞当前线程直到队列不为空然后返回元素,该方法是阻塞的

public E take() throws InterruptedException {
    E x;
    int c = -1;
  // 队列元素数量
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
  // 获取可中断的takeLock锁
    takeLock.lockInterruptibly();
    try {
      // 队列为空,则进行阻塞,使用while循环防止虚假唤醒
        while (count.get() == 0) {
            notEmpty.await();
        }
      // 出队并移除元素
        x = dequeue();
      // 队列元素数量-1
        c = count.getAndDecrement();
        if (c > 1)// c>1,表示该元素出队之后,至少还有一个元素,可以通知notEmpty条件队列中的线程进行唤醒,来执行出队操作
            notEmpty.signal();
    } finally { // 释放锁
        takeLock.unlock();
    }
    if (c == capacity)// 在进行出队操作前队列中元素是满的,此时可能NotFull是被阻塞的,所以通知NotFull条件队列中的线程进行唤醒,来执行入队操作
        signalNotFull();
    return x;
}

PriorityBlockingQueue

PriorityBlockingQueue类是一个无界阻塞队列,与LinkedBlockingQueue类似,只是排序是基于优先级的阻塞队列,可以决定元素的优先顺序(使用自然排序或者比较器来进行排序,传入的对象必须实现Comparable接口),会自动进行扩容,内部控制线程同步的锁是公平锁,存储使用的是平衡二叉堆实现的

源码分析
// 存放队列元素
private transient Object[] queue;

// 队列元素数量
private transient int size;


// 比较器,使用该比较器来比较元素大小进行排序,如果为null,则使用元素的自然排序
private transient Comparator<? super E> comparator;

// 控制只能有一个线程进行入队、出队操作
private final ReentrantLock lock;

// 条件变量用来实现take方法阻塞
private final Condition notEmpty;

// 自旋锁,使用cas操作来保证只有一个线程可以扩容队列,状态为0表示当前没有进行扩容,1表示当前正在扩容
private transient volatile int allocationSpinLock;

// 默认队列大小为11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 最大队列大小
 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
     * A plain PriorityQueue used only for serialization,
     * to maintain compatibility with previous versions
     * of this class. Non-null only during serialization/deserialization.
     */

private PriorityQueue<E> q;
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator)
 
{
  if (initialCapacity < 1)
    throw new IllegalArgumentException();
  this.lock = new ReentrantLock();
  this.notEmpty = lock.newCondition();
  this.comparator = comparator;
  this.queue = new Object[initialCapacity];
}
重要方法

offer方法

offer方法在队列中添加添加元素,由于该队列是无界的,所以不会阻塞

public boolean offer(E e) {
   // 如果元素为null,则抛出NPE,队列中不可存储null值
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
  // 获取锁
    lock.lock();
    int n, cap;
    Object[] array;
  // 当前元素个数>=队列容量,进行扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null// 比较器为null,进行元素的自然排序
            siftUpComparable(n, e, array);
        else // 有自定义的比较器,使用自定义比较器进行排序
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal(); // 唤醒由于队列没有元素导致的take操作阻塞的一个线程
    } finally {
        lock.unlock();
    }
    return true;
}

// 扩容操作
private void tryGrow(Object[] array, int oldCap) {
  // 先释放掉主锁,由于扩容比较费时,释放锁可以让其他线程可以做其他的操作
  lock.unlock(); // must release and then re-acquire main lock
  Object[] newArray = null;
  // 进行cas操作,成功则进行扩容,保证只有一个线程可以进行扩容操作
  if (allocationSpinLock == 0 &&
      UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                               01)) {
    try {
      // oldCap小于64,则执行oldCap + 2,否则乘以2
      int newCap = oldCap + ((oldCap < 64) ?
                             (oldCap + 2) : // grow faster if small
                             (oldCap >> 1));
      // 最大容量不能超过MAX_ARRAY_SIZE
      if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
        int minCap = oldCap + 1;
        if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
          throw new OutOfMemoryError();
        newCap = MAX_ARRAY_SIZE;
      }
      if (newCap > oldCap && queue == array)
        newArray = new Object[newCap];
    } finally {
      allocationSpinLock = 0;
    }
  }
  // 在某线程进行扩容时,可能会有其他线程也来进行扩容,由于CAS操作失败,会执行该代码,让出cpu
  if (newArray == null// back off if another thread is allocating
    Thread.yield();
  lock.lock();
  // 扩容成功
  if (newArray != null && queue == array) {
    queue = newArray;
    System.arraycopy(array, 0, newArray, 0, oldCap);
  }
}

// 二叉堆操作
// k传入的是队列的元素数量,也就是所要添加的元素可能会放入的数组下标,x是所要添加的元素,array是当前的队列元素
private static <T> void siftUpComparable(int k, T x, Object[] array) {
  Comparable<? super T> key = (Comparable<? super T>) x;
  // 第一个元素添加进来时不需要进入while循环,直接添加元素即可,因为只有一个元素,不需要排序
  while (k > 0) {
    // 因为是树形,找到该元素的父节点
    int parent = (k - 1) >>> 1;
    Object e = array[parent];
    // 比较该节点和父节点
    if (key.compareTo((T) e) >= 0)
      break;
    // 与父节点交换位置
    array[k] = e;
    k = parent;
  }
  array[k] = key;
}

put方法

与offer方法相同

public void put(E e) {
    offer(e); // never need to block
}

poll方法

poll方法获取第一个元素,如果队列为空,则返回null

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
  int n = size - 1;
  // 队列为空,则直接返回null
  if (n < 0)
    return null;
  else {
    Object[] array = queue;
    // 获取队头元素
    E result = (E) array[0];
    // 获取队尾元素,并赋值为null
    E x = (E) array[n];
    array[n] = null;
    Comparator<? super E> cmp = comparator;
    // 根节点没有了,需要重新进行二叉堆的构建
    if (cmp == null)
      siftDownComparable(0, x, array, n);
    else
      siftDownUsingComparator(0, x, array, n, cmp);
    size = n;
    return result;
  }
}

// 由于根节点被移除了,所以需要重新构建二叉堆
// k为0,x为之前的队尾元素,array为队列,n为队尾数组下标
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n)
 
{
  if (n > 0) {
    Comparable<? super T> key = (Comparable<? super T>)x;
    int half = n >>> 1;           // loop while a non-leaf
    while (k < half) {
      int child = (k << 1) + 1// assume left child is least
      Object c = array[child];
      int right = child + 1;
      if (right < n &&
          ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
        c = array[child = right];
      if (key.compareTo((T) c) <= 0)
        break;
      array[k] = c;
      k = child;
    }
    array[k] = key;
  }
}

take方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
  // 获取可中断的锁
    lock.lockInterruptibly();
    E result;
    try {
      // 如果队列为空,则阻塞
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

DelayQueue

DelayQueue类是一种延迟队列,带有延迟时间,只有当延迟时间到了,才能从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,该队列也是一个没有大小限制的队列,可以用做对缓存超时的数据移除、任务超时处理和空闲连接关闭等。

源码分析
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader变量的使用基于Leader-Follower模式的变体,用于减少不必要的线程等待
// 当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.awaitNanos等待delay时间,但是其他线程则会调用available.await()进行无限等待
private Thread leader = null;
// 条件变量
private final Condition available = lock.newCondition();
public DelayQueue() {}
重要方法

offer方法

插入元素到队列

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      // 入队操作,如果为null会抛出NPE
        q.offer(e);
        if (q.peek() == e) { // 使用的是PriorityQueue,优先级队列,获取的是最先要过期的,所以当前元素时第一个元素,之前的队列没有元素
            leader = null;
          // 入队成功,通知被阻塞的线程进行唤醒
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

take方法

获取并移除队列里面延迟时间过期的元素,如果没有过期元素则等待,会阻塞

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
          // 获取但不移除元素 1
            E first = q.peek();
            if (first == null// 队列中没有元素
              // 会进行阻塞,等待唤醒
                available.await();
            else {
              // 获取该元素的过期时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0// 已经过期,取出
                    return q.poll();
                first = null// 走到这里说明没有过期 don't retain ref while waiting
                if (leader != null// leader不为null,说明有其他线程在进行take操作,进行阻塞等待
                    available.await();
                else { // 当前没有其他线程在进行take操作,选取当前线程作为leader
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread; 
                    try {
                        available.awaitNanos(delay); // 等待该元素过期,然后重新竞争锁
                    } finally {
                      // 重置leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null// 当前线程已经移除了过期元素,并且队列中还有元素,唤醒被阻塞的线程
            available.signal();
        lock.unlock();
    }
}

poll方法

获取并移除队列中的过期元素,没有则返回null,不会阻塞

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
      // 队列为空,或者不为空但是没有过期,直接返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

SynchronousQueue

SynchronousQueue类是一个没有缓冲的队列,不会在队列中维护任何的存储空间,没有存储能力,生产者生产的数据直接会被消费者获取并消费,只会在没有可消费的数据时,阻塞数据的消费者,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间,newCachedThreadPool线程池使用了该队列

TransferQueue

TransferQueue类主要新增了tryTransfer方法和transfer方法,实现类有LinkedTransferQueue

public interface TransferQueue<Eextends BlockingQueue<E{
   
    boolean tryTransfer(E e);
  // 生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费
    void transfer(E e) throws InterruptedException;

    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException
;

    boolean hasWaitingConsumer();

   
    int getWaitingConsumerCount();
}

阻塞方法与非阻塞方法

非阻塞方法

  • add  将元素插入到队列尾部,插入成功返回true,插入失败抛出异常
  • remove 移除队首元素,移除成功返回true,移除失败,抛出异常
  • offer(E e) 将元素插入到队尾,插入成功,返回true,插入失败返回false
  • poll 移除并获取队首元素,成功返回队首元素,否则返回null
  • peek  获取队首元素,成功返回队首元素,否则返回null

阻塞方法

  • put  将元素插入到队尾,如果队列已满,则等待
  • take 从队首取元素,如果队列为空,则等待
  • offer(E e, long timeout, TimeUnit unit) 向队尾存入元素,如果队列已满,则等待一定的时间,如果时间已到,还是没有插入成功,则返回false,否则返回true
  • E poll(long timeout, TimeUnit unit) 从队首取元素,如果队列为空,则等待一段时间,当时间已到,如果没有取到,,则返回null,否则返回取得的元素

https://zhhll.icu/2022/多线程/并发容器/4.BlockingQueue阻塞队列/


原文始发于微信公众号(bug生产基地):并发容器之BlockingQueue阻塞队列

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/256221.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!