ArrayBlockQueue浅析

前阵子, 分析了Java Collection框架(framework)为什么叫框架,而不是叫Java Collection Lib.结尾时候, 提到了Java1.5以后框架里才添加的新接口Queue.

添加Queue接口到集合框架里的原因. 提供一个无须参数即可获取元素的能力.

J K L,公众号:K字的研究为什么Java集合框架(Java Collection Framework) 叫框架?

今天我们聊聊Queue里比较重要的一个ArrayBlockingQueue.

ArrayBlockingQueue

Java里的命名还是比较有规律的, 一般都是:实现方式+接口.看这个名字就能猜出来, 这是一个用CircularArray(循环数组)实现的,有阻塞特性的BockingQueue.

循环数组

内部使用的, 是一个固定尺寸循环数组.也就是一个数组, 加上2个维护当前,在什么位置的index.如果尾已经要越界了, 就折返到开头来继续.

//实现上其实就是取余   
if (++i >= modulus) i = 0;

对应到ArrayBlockingQueue里, 这个实现是这样的:

  1. Object[] items  数组
  2. int takeIndex 开头的index, 因为队列是从队头取的
  3. int putIndex   结尾的index, 因为是从队尾添加的.
ArrayBlockQueue浅析

随着一边添加一边删除的过程, 可能结构会变成这样.数组数据分散开,不连续. 这点会影响后面很多内容的写法.

ArrayBlockQueue浅析

BlockingQueue的功能

Queue的能力, 是添加了:

  • 一套不会异常的offer,pool,peek方法
  • 一套会抛异常的add,remove,element方法

Queue的基础上,BlockingQueue添加另外两套内容:

  • offer&poll的超时版
  • put&take
ArrayBlockQueue浅析

看起来很复杂, 不过说白了, 就是上图这张表.  行为x表现的一个小矩阵.记得:

  1. 三种行为: 添加,删除,查看
  2. 4种特性: 异常, 特殊值,阻塞,超时

这就行了, 剩下都是组合出来的.

ArrayBlockingQueue 4种特性的实现方式

添加为例, 来看下这4种特性都是怎么实现的.

不成功抛Exception

会抛异常这组其实是最简单的, 我们可以直接调用offer接口,然后根据返回值, 判断要不要手动抛异常就行

(也可以反过来实现, 写一个会抛异常的add,然后用add去实现offer.估计会被骂死).

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
}

不成功返回特殊值

这个就是惯常用的,成功true,不成功false,或者找不到返回-1之类的. 用特殊值表示执行结果.代码倒是很简单,  用到了一个ReentrantLock锁.剩下就是满了就失败,不满拉倒.

public boolean offer(E e) {
  //require 系列, 契约式编程....failfast
        Objects.requireNonNull(e);
  // 锁🔐属性转本地变量
        final ReentrantLock lock = this.lock;
  // 加锁, 成功就成功, 不成功则组撒下去
        lock.lock();
  
        try {
          //检查满了没, 满了就失败
            if (count == items.length)
                return false;
            else {
           //入队,成功
                enqueue(e);
                return true;
            }
        } finally {
          //扫尾开锁
            lock.unlock();
        }
    }

有趣的地方在 final ReentrantLock lock = this.lock;

这个声明临时变量的玩法很有趣, 有why技术曾经讲解过,可以去看看.

这代码是 Doug Lea 写的,小 Lea 子这人吧,经常搞一些出其不意的代码和优化。他也因为这些“莫名其妙”的代码闻名,习惯就好了。
歪歪,公众号:why技术从源码里的一个注释,我追溯到了12年前,有点意思。

enqueue暂不着急看, 咱先广度优先(bfs).

带超时的版本

前面的offer,会在满的时候会直接失败. 这个带超时的版本, 满的话会尝试阻塞一定时间,超时了再失败.

想要实现超时, 那肯定还是要借助ReentrantLocknewCondition来实现.一个ArrayBlockingQueue里除了有一个lock,还有2个Condition:

  1. notFull     不满  塞东西时候, 要不满才行
  2. notEmpty  不空  取东西, 要不空才行
    public boolean offer(E e, long timeout, TimeUnit unit)        throws InterruptedException {
        //跟前面的 lock.lock()比多了一条, 响应`interrupt`.
        lock.lockInterruptibly();    
        try {
          // 满则用条件等待.
            while (count == items.length) {
                if (nanos <= 0L)//超时就返回
                    return false;
                //静待不满
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

纯阻塞版本

满的时候就阻塞, 不带什么超时功能了.

    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }

小结

这块的代码还没到正地方, 实现难度也不高, 非常适合拿来学习ReentrantLockCondition.await的使用.

后面的删除代码, remove,take,poll, 也是类似, 我们就不提了.只是把Condition由等待notFull,改成了等待notEmpty.

查看部分的函数, 因为没有阻塞功能, 只有2个elementpeek, 很简单,也不提了.

核心函数 入队和出队(enqueue & dequeue)

按说, 前面的几个函数都是在搞各种特性, 核心功能,应该是很复杂吧?

还真不是.

入队

private void enqueue(E e) {
    final Object[] items = this.items;
    items[putIndex] = e;
    if (++putIndex == items.length) putIndex = 0;//折返
    count++;
    notEmpty.signal();
}

出队

private E dequeue() {
    final Object[] items = this.items;
    E e = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length) takeIndex = 0; //折返
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return e;
}

小结

这里就是个常规的循环数组操作,外加往外放两个信号(signal).

  • 入队时候, 更新putIndex, 放出个不空信号.
  • 出队时候, 更新takeIndex, 放出个不满信号.

两边代码完美对称,  几乎没有冗余代码.因为锁和各自的独立逻辑都在外围已经做掉了,维持了核心代码的整洁.

其他方法

ArrayBlockingQueue里的方法有好多个,除了刚刚提到3x4-2=10个方法.剩下还有几个有趣的方法. 一个一个提一下.

toString

所有Object都是有toString的.不过这种带阻塞,带锁的数据结构, 有点特殊. 引入如果内容如果会变, 这个toString就不太准.要完美处理, 只能加锁才行.

Java8 version

public String toString() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int k = count;
        if (k == 0)
            return "[]";

        final Object[] items = this.items;
        StringBuilder sb = new StringBuilder();
        sb.append('[');
        for (int i = takeIndex; ; ) {
            Object e = items[i];
            sb.append(e == this ? "(this Collection)" : e);
            if (--k == 0)
                return sb.append(']').toString();
            sb.append(',').append(' ');
            if (++i == items.length)
                i = 0;
        }
    } finally {
        lock.unlock();
    }
}

这个是Java8的版本, 加锁->转string->放锁,到了java17里再看,有一点小改动.流程被改为了加锁->toArray->放锁->转string

锁的范围被缩小了, 据说可以增加性能.后面都是以Java17代码为准了,8里面的代码这段真的不如17的.

不过真正有意思的点在这行e == this ? "(this Collection)" : e.  这个是用来对付这种神奇代码的.

ArrayBlockingQueue<ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(10);
queue.add(queue);

如果有人手贱把ArrayBlockingQueue加到自己里面的话, 不会被递归拖死…Java 里几乎所有的集合类都有这个代码.

toArray() & toArray(T[] a)

这两个代码都是toArray, 核心逻辑都是加锁->转数组->放锁. 不过两者略有差异.

无参数的普通toArray

这个就是把所有元素, 复制到一个新的Array里.

因为循环数组里面, 数据可能是散开不连续的.所以有可能需要复制两次, 在代码上表现出来是这样的.

final int end = takeIndex + count;
final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
//如果`end != putIndex`,说明`putIndex`已经跑到`takeIndex`前面去了. 这里的复制就要进行两次.
if (end != putIndex)
    System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
  • System.arraycopy,Arrays.copyOfRange是这里用的复制工具.
  • Arrays.copyOfRange比较有趣, 他其实是System.arraycopy的一个封装, 允许end比数组的长度长.

这里是17的写法, 8的写法是用了3次System.arraycopy.

toArray(T[] a)版本

这个实现就不解释了, 他的功能可以说下,比较复杂,或者说所有的Collection好像都是符合这个规律的.

  1. a.length, 比Queue内元素少, 会返回一个新数组.
  2. a.length >=Queue内元素数量, 会复制到参数内, 余下部分第一个值设为null

示例代码如下:

ArrayBlockingQueue queue = new ArrayBlockingQueue<Integer>(10,true, Arrays.asList(1234));

Integer[] arr = new Integer[10];
Arrays.fill(arr,-1);

queue.toArray(arr);
System.out.println(Arrays.deepToString(arr));

这个代码的输出结果是[1, 2, 3, 4, null, -1, -1, -1, -1, -1]. 结果有点诡异, 记得住就用这个方法, 记不住就坚持传new Integer[0]这样的东西好了.

removeAll & retainAll & removeIf

这三个姐妹花, 在分析ArrayList时候, 曾经介绍过. 这本质上是一个bulkRemove方法, 只是过滤条件不太一样. removeAll & retainAll的条件刚好相反.

锁🔐什么的就不放了, 里面这个删除,非常有意思.

for (int i = takeIndex, end = putIndex,
         to = (i < end) ? end : items.length;
     ; i = 0, to = end) {
    for (; i < to; i++)
        if (filter.test(itemAt(items, i)))
            return bulkRemoveModified(filter, i);
    if (to == end) break;
}

能看出来这段代码的复杂度吗?  两层for循环,是不是

不考虑最内层bulkRemoveModified的话, 外面的代码其实只跑了queue的元素数量那么多次.

ArrayBlockQueue浅析
ArrayBlockQueue浅析

对着这两张张图看,比较简单:

  • 如果是图一的情况,to = (i < end) ? end : items.length这里,to==end.   这时候内层循环跑完, 最后一句break掉了
  • 如果是图二的情况,内层循环第一次跑清理掉后半段.最后end==items.length, 外层循环会第二次进入. 这时候i=0,to=end, 会开始清理前半段…

是的,这代码两次加一起,只跑了次的.

forEach & contains &circularClear&remove

前面的双层循环,其实是对循环数组做循环的非常精妙的方法,整体代码,有多处用了这个手法.

//forEach
for (int i = takeIndex, end = putIndex,
         to = (i < end) ? end : items.length;
     ; i = 0, to = end) {
    for (; i < to; i++)
        action.accept(itemAt(items, i));
    if (to == end) break;
}
//contains
for (int i = takeIndex, end = putIndex,
         to = (i < end) ? end : items.length;
     ; i = 0, to = end) {
    for (; i < to; i++)
        if (o.equals(items[i]))
            return true;
    if (to == end) break;
}
//circularClear
for (int to = (i < end) ? end : items.length;
     ; i = 0, to = end) {
    for (; i < to; i++) items[i] = null;
    if (to == end) break;
}
//remove
for (int i = takeIndex, end = putIndex,
         to = (i < end) ? end : items.length;
     ; i = 0, to = end) {
    for (; i < to; i++)
        if (o.equals(items[i])) {
            removeAt(i);
            return true;
        }
    if (to == end) break;
}

removeAt 删除某个指定位置的元素

毕竟,Queue实现了Collection接口, remove(Object o)的方法是不能少的.虽然大多数时候, 都是从头take元素, 删除某一个位置也是可以的. 这里用到了removeAt方法.

这个分两种情况, 如果刚好, 要删除的的removeIndex,就是takeIndex, 那就是简单置空,移动下takeIndex就行了. 对应着方法的前半段.

// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
    itrs.elementDequeued()

后半段, 是处理从半截拦腰的地方删一个元素的.很明显, 这里要删除元素, 然后把他后面的元素前移一下.

for (int i = removeIndex, putIndex = this.putIndex;;) {
    int pred = i;
    if (++i == items.length) i = 0; //折返到数组开头
    if (i == putIndex) {
        items[pred] = null;
        this.putIndex = pred;
        break;
    }
    //前移后面的元素.. 这里的i自增过了,所以是后面的元素.
    items[pred] = items[i];
}

clear

这个方法的主体, 会用到circularClear, 还有一件额外的事,是如果有被阻塞在等待不满条件的,会被唤醒.

for (; k > 0 && lock.hasWaiters(notFull); k--)
    notFull.signal();

drainTo

这个方法是把Queue里的元素, 全都倒到另一个Collection里去.

try {
    while (i < n) {
        @SuppressWarnings("unchecked")
        E e = (E) items[take];
        c.add(e);
        items[take] = null;
        if (++take == items.length) take = 0;
        i++;
    }
    return n;
} finally {
    // Restore invariants even if c.add() threw
    if (i > 0) {
        count -= i;
        takeIndex = take;
        if (itrs != null) {
            if (count == 0)
                itrs.queueIsEmpty();
            else if (i > take)
                itrs.takeIndexWrapped();
        }
        for (; i > 0 && lock.hasWaiters(notFull); i--)
            notFull.signal();
    }
}

这个方法看着贼长,也贼丑.

  • 前半部分的try里,展示的是另一种对循环数组进行循环的方法.
  • 后半部分是复位takeIndex 并发不满信号. 取走i个,就发i次, count也减少i个.

其实这两块能写在一起的, 每次循环都发信号,处理counttakeIndex.不过这样有点慢, 批量化,到最后一起搞了是为了性能. 这个tryfinally不加catch的写法不错.

遗留问题

今天就先看到这, 剩下的迭代器部分和bulkRemoveModified我觉得再水一篇都水不完,那玩意有点扎嘴,将来再说吧.

  • 如果只是想复习下ReentrantLockCondition的用法, 10个基础方法的写法应该就够用了.
  • 想看如何操作循环数组, 重点是看明白那几个奇怪的双层循环.

对了,还有一个有趣的东西,作者在里面留了个调试信息.

invariantsSatisfied   循环不变量

在计算机科学中,循环不变式,是一组在循环体内、每次迭代均保持为真的性质(表达式),通常被用来证明程式或伪码的正确性
wiki

很少有哪个类会在里面带上循环不变量的, 但是这里带了. 可见这玩意多么难写正确,不知道到底改起来有多崩溃.

capacity > 0
       && items.getClass() == Object[].class
       && (takeIndex | putIndex | count) >= 0
       && takeIndex <  capacity
       && putIndex  <  capacity
       && count     <= capacity
       && (putIndex - takeIndex - count) % capacity == 0
       && (count == 0 || items[takeIndex] != null)
       && (count == capacity || items[putIndex] == null)
       && (count == 0 || items[dec(putIndex, capacity)] != null)


ArrayBlockQueue浅析



5.1了,你们放假了吗?

原文始发于微信公众号(K字的研究):ArrayBlockQueue浅析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/24768.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!