
3.5 BlockingQueue
在多线程环境中,经常会用到“生产者-消费者”模式,负责生产的线程要把数据交给负责消费的线程,那么,自然需要一个数据共享容器,由生产者存入,消费者取出。这个容器就像是一个仓库,生产出来的货物堆积在里面,需要消费的时候再搬运出来,这个时候,就需要队列(Queue)来实现该仓库,一般而言,该队列有两种存取方式:
先进先出(FIFO,First In First Out):先插入的元素先取出,也就是按顺序排队;
后进先出(LIFO,Last In First Out):后插入的元素先取出,这是个栈结构(Stack),强调的是优先处理最新的物件。
设想这样一个问题,如果生产的线程太积极,消费线程来不及处理,仓库满了,又或者消费线程太迅速,生产线程产能跟不上消费,那么要如何处理?
这就是生产者-消费者模型(Producer-Consumer)所解决的问题了。这个模型又称为有界缓存模型,它主要包括了三个基本部分:
1)产品仓库:用于存放产品;
2)生产者:负责生产产品,并把生产出来的产品存入仓库;
3)消费者:消费仓库里的产品。
这个模型的特性在于:仓库里没有产品的时候,消费者没法继续消费产品,只能等待新的产品产生;当仓库装满之后,生产者没有办法存放产品,只能等待消费者消耗掉产品之后,才能继续存放。
该特性应用在多线程环境中,可以表达为:生产者线程在仓库装满之后会被阻塞,消费者线程则是在仓库清空后阻塞。
在Java Concurrent包发布之前,该模型需要程序员自己维护阻塞队列,但自己实现的队列往往会在性能和安全性上有所缺陷,Java Concurrent包提供了BlockingQueue接口及其实现类来实现生产者-消费者模型。
java.util.concurrent.BlockingQueue是一个阻塞队列接口。当BlockingQueue操作无法立即响应时,有四种处理方式:
1)抛出异常;
2)返回特定的值,根据操作不同,可能是null或者false中的一个;
3)无限期的阻塞当前线程,直到操作可以成功为止;
4)根据阻塞超时设置来进行阻塞;
BlockingQueue的核心方法和未响应处理方式的对应形式见表3-2。
表3-2 BlockingQueue的核心方法

BlockingQueue有很多实现类,图3-16给出了部分常用的实现类。
3.5.1 ArrayBlockingQueue
ArrayBlockingQueue是基于数组实现的有界BlockingQueue,该队列满足先入先出(FIFO)的特性。它是一个典型的“有界缓存”,由一个固定大小的数组保存元素,一旦创建好以后,容量就不能改变了。
当队列满时,存数据的操作会被阻塞;队列空时,取数据的操作会被阻塞。
除了数组以外,它还维护了两个int变量,分别对应队头和队尾的下标,队头存放的是入队最早的元素,而队尾则是入队最晚的元素。

图3-16 Queue类图
下面给出一个使用ArrayBlockingQueue实现的生产者消费者模型的简单示例代码。


程序运行结果为:

这段代码创建了一个生产者和两个消费者,生产者每隔1s中就会生产一件商品并放入到队列中,如果队列满了,那么生产者会一直等待,直到有消费者消费了商品后生产者才能把商品放入到队列中。而消费者则每隔5s消费一件。
3.5.2 LinkedBlockingQueue
链表阻塞队列,从命名可以看出它是基于链表实现的。同样这也是个先入先出队列(FIFO),队头是队列里入队时间最长的元素,队尾则是入队时间最短的。理论上它的吞吐量要超出数组阻塞队列ArrayBlockingQueue。LinkedBlockQueue可以指定容量限制,在没有指定的情况下,默认为Integer.MAX_VALUE。
与ArrayBlockingQueue相比,LinkedBlockingQueue的重入锁被分成了两份,分别对应存值和取值。这种实现方法被称为双锁队列算法,这样做的好处在于,读写操作的lock操作是由两个锁来控制的,互不干涉,因此可以同时进行读操作和写操作,这也是LinkedBlockingQueue吞吐量超过ArrayBlockingQueue的主要原因。但是,使用两个锁要比一个锁复杂很多,需要考虑各种死锁的状况。
LinkedBlockQueue的使用方式与ArrayBlockingQueue是相同的,示例代码如下:

3.5.3 PriorityBlockingQueue
优先级阻塞队列PriorityBlockQueue不是FIFO(先入先出)队列,它要求使用者提供一个Comparetor比较器,或者队列内部元素实现Comparable接口,队头元素会是整个队列里的最小元素。
PriorityBlockQueue是用数组实现的最小堆结构,利用的原理是:在数组实现的完全二叉树中,根结点的下标为子结点下标除以2。
PriorityBlockQueue是不定长的,会随着数据的增长逐步扩容,其最大容量为Integer.MAX_VALUE-8。如果容量超出这个值,那么会产生OutOfMemoryError。
下面给出一个使用PriorityBlockQueue实现的“生产者-消费者”模型的代码,生产者会把生产的产品放入队列中,消费者会根据商品的优先级进行消费。


这个示例使用了一个生产者和消费者,也可以根据需求修改为多个生产者和消费者。
运行结果为:


3.5.4 ConcurrentLinkedQueue
ConcurrentLinkedQueue是一种非阻塞的线程安全队列,与阻塞队列LinkedBlockingQueue相对应。在之前的章节里有过介绍,LinkedBlockingQueue使用两个ReentrantLock分别控制入队和出队以达到线程安全。
ConcurrentLinkedQueue同样也是使用链表实现的FIFO队列,但不同的是,它没有使用任何锁的机制,而是用CAS来实现的线程安全。下面以offer方法为例来介绍ConcurrentLinkedQueue是如何使用CAS实现的。
它是个单向链表,每个结点有一个当前结点的元素和下一个结点的指针,结点的定义如下

它采用先进先出的规则对结点进行排序,当添加一个元素的时候,它会添加到队列的尾部(tail),当获取一个元素时,它会返回队列头部(head)的元素。tail结点和head结点方便快速定位最后一个和第一个元素。
下面给出一个Node类中实现了CAS的方法;

offer方法的实现如下:


ConncurrentLinkedQueue的同步非阻塞算法使用循环+CAS来实现,这一类的源码阅读不能按照线性代码执行的思维去考虑,而是应该用类似于状态机的思路去理解。
只有把握以下原则,才能理解这种类型代码的编程思路:
1)在确认达到执行目的前,循环不会终止;
2)非线程安全的全局变量要用局部变量引用以保证初始状态;
3)由于全局变量可能被其他线程修改,在使用对应局部变量时,要验证是否合法;
4)最终赋值要用CAS方法以保证原子性,避免线程发生不期望的修改。
理解了上面的思路后,来具体分析offer方法的循环体的实现原理。
变量含义:
1)p结点的期望值为最后一个结点;
2)newNode是新结点,期望添加到p结点之后;
3)q结点为p结点的后继结点,可能为null,也可能因为多线程修改而不为空(指向新的结点);
4)t结点为代码执行开始时的tail结点(成员变量),也可能因为多线程修改了tail结点,从而和tail结点不一致;
执行目的:
1)newNode作为新结点,需要插入到最后一个结点的next位置,如果它成为最后一个结点,那么把它设置为尾结点;
2)需要注意的是,多线程环境下,在多个插入同时进行时,不保证结点顺序与执行顺序的一致性,当然,这不影响执行成功。
状态解析:
1)该插入算法,是以p结点的状态判断为核心的;
2)当p结点的下一个结点为null时,说明没有后继结点,此时执行p.casNext(null,newNode),如果失败,那么说明其他线程在之前的瞬间修改了p.next,此时就需要从头开始再找一次尾结点;如果成功,则执行目的达到,循环体可以结束了;
3)当p结点和q结点相等,这时链表发生了闭合(off),这是一个特殊情况,产生的原因有多种,但本质上是因为保证效率导致的意外情况,tail作为尾结点的引用可以在O(1)的时间复杂度内可以找到。但是,tail是可变的,所以其next可能指向它自身(比如重新设置casTail代码可能还没执行)。所以,如果t不是tail,那么使用tail重新计算,如果依然是tail,那么需要重置p为head,从头开始遍历链表,虽然复杂度为O(n),但是能保证以正确的方式找到队尾;
4)如果以上情况都不满足,那么判断p是否还是队尾,如果不是则设置为队尾,否则p重新指向p.next,这里可能会产生疑惑,队尾tail结点的next不应该是null吗?
其实tail只是一个优化算法,不代表真正的队尾,它有三种状态:
1)初始化时,它是head;
2)奇数次插入时,它是队尾;
3)偶数次插入时,它是队尾的前一个结点;
由此可知,p==q一定发生在q!=null的时候。
4)这里需要特别注意下面代码:

在q==null的时候,说明p应当为最后一个结点,如果p !=t,那么说明tail并不是尾结点,而是尾结点的前驱结点,此时需要重新设置tail为newNode,之后,tail会指向真正的尾结点。正是这句代码导致了奇数次插入时tail是队尾,偶数次是队尾的前一个结点。
3.5.5 DelayQueue
DelayQueue是一种延迟队列,它所管理的对象必须实现java.util.concurrent.Delayed接口,该接口提供了一个getDelay方法,用于获取剩余的延迟时间,同时该接口继承自Comparable,其compareTo的实现体一般用于比较延迟时间的大小。
DelayQueue是阻塞的优先级队列。其线程安全由重入锁ReentrantLock实现,而优先级特性则完全由内部组合的PriorityQueue来提供。
PriorityQueue内部使用“最小堆”实现的。下面给出一个DelayQueue的使用示例代码:



运行结果为:
