LinkedBlockingQueue是一种基于链表实现的可选边界的阻塞队列,该队列排序元素FIFO。队列的队首是在该队列上停留时间最长的元素,队列的队尾是在该队列上停留最短时间的元素。在队列尾部插入新的元素,队列检索操作在队列的头部获取元素。 在大多数并发应用程序中,基于链表实现的队列通常具有比基于数组实现的队列更高的吞吐量,但性能上未必占优势。 LinkedBlockingQueue在初始化时可以指定容量也可以不指定容量。当初始化LinkedBlockingQueue指定容量时,是有界队列;当初始化LinkedBlockingQueue未指定容量时,其内部会以Integer.MAX_VALUE值作为容量。当然,因为Integer.MAX_VALUE值非常大,近似无限大,因此LinkedBlockingQueue未指定容量时也可以近似认为是无界队列。 为防止队列的过度的扩展,建议在LinkedBlockingQueue初始化时指定容量。LinkedBlockingQueue内部的链接节点在每次入队元素时动态创建,除非这会使队列超过容量。 LinkedBlockingQueue类及其迭代器实现了Collection和Iterator接口的所有可选方法。LinkedBlockingQueue是Java Collections Framework的一个成员。  1.   LinkedBlockingQueue的声明LinkedBlockingQueue的接口和继承关系如下 public class LinkedBlockingQueue<E> extends AbstractQueue<E>        implements BlockingQueue<E>, java.io.Serializable {   …}  完整的接口继承关系如下图所示。      从上述代码可以看出,LinkedBlockingQueue既实现了BlockingQueue<E>和java.io.Serializable接口,又继承了java.util.AbstractQueue<E>。其中,AbstractQueue是Queue接口的抽象类,此处不再赘述。   2.   LinkedBlockingQueue的成员变量和构造函数  以下是LinkedBlockingQueue的构造函数和成员变量。      // 容量    private final int capacity;     // 当前元素个数    private final AtomicInteger count = new AtomicInteger();     // 链表头结点    // 不变式: head.item == null    transient Node<E> head;     // 链表尾结点    // 不变式: last.next == null    private transient Node<E> last;     // 用于锁住take、poll等操作    private final ReentrantLock takeLock = new ReentrantLock();     // 队列非空,唤醒消费者    private final Condition notEmpty = takeLock.newCondition();     // 用于锁住put、offer等操作    private final ReentrantLock putLock = new ReentrantLock();     // 队列非满,唤醒生产者private final Condition notFull = putLock.newCondition(); public LinkedBlockingQueue() {        this(Integer.MAX_VALUE);    }     public LinkedBlockingQueue(int capacity) {        if (capacity <= 0) throw new IllegalArgumentException();        this.capacity = capacity;        last = head = new Node<E>(null);    }     public LinkedBlockingQueue(Collection<? extends E> c) {        this(Integer.MAX_VALUE);        final ReentrantLock putLock = this.putLock;        putLock.lock();  // 只锁可见,不互斥        try {            int n = 0;            for (E e : c) {                if (e == null)                    throw new NullPointerException();                if (n == capacity)                    throw new IllegalStateException("Queue full");                enqueue(new Node<E>(e));                ++n;            }            count.set(n);        } finally {            putLock.unlock();        } }  从上述代码可以看出,构造函数有三种。构造函数中的参数含义如下 l  capacity用于设置队列容量。该参数是可选的,如果未设置,则取Integer.MAX_VALUE值作为容量l  c用于设置最初包含给定集合的元素,按集合迭代器的遍历顺序添加 类成员last和head分别指代链表的尾结点和头结点。链表中的结点用Node类型表示,代码如下:  static class Node<E> {        E item;         /**         * next有以下三种场景:         * - 真正的后继结点         * - 当前结点是头结点,则后继结点是head.next         * - 值为null,表示当前结点是尾结点,没有后继结点         */        Node<E> next;         Node(E x) { item = x; }}  访问策略是通过ReentrantLock来实现的。通过两个加锁条件notEmpty、notFull来实现并发控制。与ArrayBlockingQueue所不同的是,LinkedBlockingQueue使用了takeLock和putLock两把锁来分别锁住出队操作和入队操作。 count用于记录当前队列里面的元素个数。3.   LinkedBlockingQueue的核心方法以下对LinkedBlockingQueue常用核心方法的实现原理进行解释。  3.1.     offer(e)执行offer(e)方法后有两种结果l  队列未满时,返回 truel  队列满时,返回 false LinkedBlockingQueue的offer (e)方法源码如下: public boolean offer(E e) {        if (e == null) throw new NullPointerException();  // 判空        final AtomicInteger count = this.count;        if (count.get() == capacity)            return false;        final int c;        final Node<E> node = new Node<E>(e);        final ReentrantLock putLock = this.putLock;        putLock.lock();  // 加锁        try {            if (count.get() == capacity)                return false;            enqueue(node); // 入队            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal(); // 标识当前队列非满        } finally {            putLock.unlock(); // 解锁        }        if (c == 0)            signalNotEmpty();  // 标识当前队列已经是非空        return true;    } 从上面代码可以看出,执行offer(e)方法时,分为以下几个步骤: l  判断待入队的元素e是否为null。为null则抛出NullPointerException异常。l  判断count是否超过了容量的限制,如果是则证明队列已经满了,直接返回false。l  为了确保并发操作的安全先做了加锁处理。l  再次判断count是否超过了容量的限制,如果是则证明队列已经满了,直接返回false;否则将元素e做入队处理,并返回true。l  解锁。l  c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。 enqueue(node)方法代码如下: private void enqueue(Node<E> node) {        last = last.next = node;    } enqueue(node)方法就在链表的尾部插入数据元素。 signalNotEmpty()方法代码如下: private void signalNotEmpty() {        final ReentrantLock takeLock = this.takeLock;        takeLock.lock();        try {            notEmpty.signal();        } finally {            takeLock.unlock();        }    } 思考:细心的读者可能会发现,在offer (e)方法方法中做了两次判断count是否超过了容量的限制。那么为什么要判断两次呢?3.2.     put(e)执行put(e)方法后有两种结果:•      l  队列未满时,直接插入没有返回值l  队列满时,会阻塞等待,一直等到队列未满时再插入 LinkedBlockingQueue的put(e)方法源码如下:  public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        final int c;        final Node<E> node = new Node<E>(e);        final ReentrantLock putLock = this.putLock;        final AtomicInteger count = this.count;        putLock.lockInterruptibly();  // 获取锁        try {            while (count.get() == capacity) {                notFull.await();  // 使线程等待            }            enqueue(node);            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();  // 标识当前队列非满        } finally {            putLock.unlock();  // 解锁        }        if (c == 0)            signalNotEmpty();  // 标识当前队列已经是非空} 从上面代码可以看出,put(e)方法的实现,分为以下几个步骤: l  先是要获取锁。l  而后判断count是否等于容量,如果是则证明队列已经满了,就等待;否则执行enqueue(e)方法做元素的入队。l  解锁。l  c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。3.3.     offer(e,time,unit)offer(e,time,unit)方法与offer(e)方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false。执行offer(e,time,unit)方法有两种结果:•      l  队列未满时,返回 truel  队列满时,会阻塞等待,如果在指定时间内还不能往队列中插入数据则返回 false LinkedBlockingQueue的offer(e,time,unit)方法源码如下: public boolean offer(E e, long timeout, TimeUnit unit)        throws InterruptedException {         if (e == null) throw new NullPointerException();        long nanos = unit.toNanos(timeout);        final int c;        final ReentrantLock putLock = this.putLock;        final AtomicInteger count = this.count;        putLock.lockInterruptibly();  // 获取锁        try {            while (count.get() == capacity) {                if (nanos <= 0L)                    return false;                nanos = notFull.awaitNanos(nanos); // 使线程等待指定的时间            }            enqueue(new Node<E>(e));            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();  // 标识当前队列非满        } finally {            putLock.unlock();  // 解锁        }        if (c == 0)            signalNotEmpty();  // 标识当前队列已经是非空        return true;} 从上面代码可以看出,offer(e,time,unit)方法的实现,分为以下几个步骤: l  先是要获取锁。l  而后判断count是否等于容量,如果是则证明队列已经满了,就等待;否则执行enqueue(e)方法做元素的入队。l  解锁。l  c是元素e入队前队列中的元素个数。如果是0,则说明之前的队列是空的,还需要执行signalNotEmpty()方法来标识当前队列已经是非空了。 3.4.     add(e)执行add(e)方法后有两种结果 l  队列未满时,返回 truel  队列满时,则抛出异常 ArrayBlockingQueue的add(e)方法源码如下:     public boolean add(E e) {        return super.add(e);    }  从上面代码可以看出,add(e)方法的实现,直接是调用了父类AbstractQueue的add(e)方法。而AbstractQueue的add(e)方法源码如下:  public boolean add(E e) {        if (offer(e))            return true;        else            throw new IllegalStateException("Queue full");}  从上面代码可以看出,add(e)方法又调用了offer(e)方法。offer(e)方法此处不再赘述。    3.5.     poll ()执行poll ()方法后有两种结果: l  队列不为空时,返回队首值并移除l  队列为空时,返回 null  LinkedBlockingQueue的poll ()方法源码如下: public E poll() {        final AtomicInteger count = this.count;        if (count.get() == 0)            return null;        final E x;        final int c;        final ReentrantLock takeLock = this.takeLock;        takeLock.lock();  // 加锁        try {            if (count.get() == 0)                return null;            x = dequeue();  // 出队            c = count.getAndDecrement();            if (c > 1)                notEmpty.signal();  // 标识当前队列非空        } finally {            takeLock.unlock();  // 解锁        }        if (c == capacity)            signalNotFull();  // 标识当前队列已经是非满        return x;    }从上面代码可以看出,执行poll()方法时,分为以下几个步骤: l  先是判断count是否等于0,如果等于0则证明队列为空,直接返回null。l  为了确保并发操作的安全先做了加锁处理。l  再次判断count是否等于0,如果等于0则证明队列为空,直接返回null;否则执行dequeue()方法做元素的出队。l  解锁。l  c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。 dequeue()方法源码如下:  private E dequeue() {        Node<E> h = head;        Node<E> first = h.next;        h.next = h; // 利于GC        head = first;        E x = first.item;        first.item = null;        return x;} 上面代码比较简单,就是移除链表的头结点。 3.6.     take()执行take()方法后有两种结果: l  队列不为空时,返回队首值并移除l  队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值 LinkedBlockingQueue的take ()方法源码如下: public E take() throws InterruptedException {        final E x;        final int c;        final AtomicInteger count = this.count;        final ReentrantLock takeLock = this.takeLock;        takeLock.lockInterruptibly();  // 获取锁        try {            while (count.get() == 0) {                notEmpty.await();  // 使线程等待            }            x = dequeue();  // 出队            c = count.getAndDecrement();            if (c > 1)                notEmpty.signal();  // 标识当前队列非空        } finally {            takeLock.unlock();  // 解锁        }        if (c == capacity)            signalNotFull();  // 标识当前队列已经是非满        return x;    } 从上面代码可以看出,执行take()方法时,分为以下几个步骤: l  先是要获取锁。l  而后判断count是否等于0,如果等于0则证明队列为空,会阻塞等待;否则执行dequeue()方法做元素的出队。l  解锁。l  c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。 dequeue()和signalNotFull ()方法此处不再赘述。 3.7.     poll(time,unit)poll(time,unit)方法与poll()方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内队列还为空,则返回null。执行poll(time,unit)方法后有两种结果: l  队列不为空时,返回队首值并移除l  队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null LinkedBlockingQueue的poll(time,unit)方法源码如下: public E poll(long timeout, TimeUnit unit) throws InterruptedException {        final E x;        final int c;        long nanos = unit.toNanos(timeout);        final AtomicInteger count = this.count;        final ReentrantLock takeLock = this.takeLock;        takeLock.lockInterruptibly();  // 获取锁        try {            while (count.get() == 0) {                if (nanos <= 0L)                    return null;                nanos = notEmpty.awaitNanos(nanos); // 使线程等待指定的时间            }            x = dequeue();  // 出队            c = count.getAndDecrement();            if (c > 1)                notEmpty.signal();  // 标识当前队列非空        } finally {            takeLock.unlock();  // 解锁        }        if (c == capacity)            signalNotFull();  // 标识当前队列已经是非满        return x;}  从上面代码可以看出,执行poll(time,unit)方法时,分为以下几个步骤: l  先是要获取锁。l  而后判断count是否等于0,如果等于0则证明队列为空,会阻塞等待;否则执行dequeue()方法做元素的出队。l  解锁。l  c是元素e入队前队列中的元素个数。如果是等于队列的容量,则说明之前的队列是满的,还需要执行signalNotFull ()方法来标识当前队列已经是非满了。 dequeue()和signalNotFull ()方法此处不再赘述。  3.8.     remove()执行remove()方法后有两种结果: l  队列不为空时,返回队首值并移除l  队列为空时,抛出异常 LinkedBlockingQueue的remove()方法其实是调用了父类AbstractQueue的remove ()方法,源码如下: public E remove() {        E x = poll();        if (x != null)            return x;        else            throw new NoSuchElementException();} 从上面代码可以看出,remove()直接调用了poll()方法。如果poll()方法返回结果为null,则抛出NoSuchElementException异常。 poll()方法此处不再赘述。 3.9.     peek()执行peek()方法后有两种结果: l  队列不为空时,返回队首值但不移除l  队列为空时,返回null  peek()方法源码如下: public E peek() {        final AtomicInteger count = this.count;        if (count.get() == 0)            return null;        final ReentrantLock takeLock = this.takeLock;        takeLock.lock();  // 加锁        try {            return (count.get() > 0) ? head.next.item : null;  // 空则返回null        } finally {            takeLock.unlock();  // 解锁        }} 从上面代码可以看出,peek()方法比较简单,直接就是获取了链表里面头结点的元素值。 3.10.            element()执行element()方法后有两种结果: l  队列不为空时,返回队首值但不移除l  队列为空时,抛出异常  element()方法其实是调用了父类AbstractQueue的element()方法,源码如下: public E element() {        E x = peek();        if (x != null)            return x;        else            throw new NoSuchElementException();} 从上面代码可以看出,执行element()方法时,先是获取peek()方法的结果,如果结果是null,则抛出NoSuchElementException异常。   4.   LinkedBlockingQueue的单元测试 LinkedBlockingQueue的单元测试如下:   package com.waylau.java.demo.datastructure; import static org.junit.jupiter.api.Assertions.assertEquals;import static org.junit.jupiter.api.Assertions.assertFalse;import static org.junit.jupiter.api.Assertions.assertNotNull;import static org.junit.jupiter.api.Assertions.assertNull;import static org.junit.jupiter.api.Assertions.assertThrows;import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.NoSuchElementException;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; /** * LinkedBlockingQueue Test * * @since 1.0.0 2020年5月24日 * @author <a href="https://waylau.com">Way Lau</a> */class LinkedBlockingQueueTests {    @Test    void testOffer() {        // 初始化队列        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);         // 测试队列未满时,返回 true        boolean resultNotFull = queue.offer("Java");        assertTrue(resultNotFull);         // 测试队列满则,返回 false        queue.offer("C");        queue.offer("Python");        boolean resultFull = queue.offer("C++");        assertFalse(resultFull);    }     @Test    void testPut() throws InterruptedException {        // 初始化队列        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);         // 测试队列未满时,直接插入没有返回值;        queue.put("Java");         // 测试队列满则, 会阻塞等待,一直等到队列未满时再插入。        queue.put("C");        queue.put("Python");        queue.put("C++");  // 阻塞等待    }     @Test    void testOfferTime() throws InterruptedException {        // 初始化队列        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);         // 测试队列未满时,返回 true        boolean resultNotFull = queue.offer("Java", 5, TimeUnit.SECONDS);        assertTrue(resultNotFull);         // 测试队列满则,返回 false        queue.offer("C");        queue.offer("Python");        boolean resultFull = queue.offer("C++", 5, TimeUnit.SECONDS); // 等5秒        assertFalse(resultFull);    }     @Test    void testAdd() {        // 初始化队列        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);         // 测试队列未满时,返回 true        boolean resultNotFull = queue.add("Java");        assertTrue(resultNotFull);         // 测试队列满则抛出异常        queue.add("C");        queue.add("Python");         Throwable excpetion = assertThrows(IllegalStateException.class, () -> {            queue.add("C++");// 抛异常        });         assertEquals("Queue full", excpetion.getMessage());    }     @Test    void testPoll() throws InterruptedException {        // 初始化队列        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);         // 测试队列为空时,返回 null        String resultEmpty = queue.poll();        assertNull(resultEmpty);         // 测试队列不为空时,返回队首值并移除        queue.put("Java");        queue.put("C");        queue.put("Python");        String resultNotEmpty = queue.poll();        assertEquals("Java", resultNotEmpty);    }     @Test    void testTake() throws InterruptedException {        // 初始化队列        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);         // 测试队列不为空时,返回队首值并移除        queue.put("Java");        queue.put("C");        queue.put("Python");        String resultNotEmpty = queue.take();        assertEquals("Java", resultNotEmpty);         // 测试队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值        queue.clear();        String resultEmpty = queue.take(); // 阻塞等待        assertNotNull(resultEmpty);    }     @Test    void testPollTime() throws InterruptedException {        // 初始化队列        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);         // 测试队列不为空时,返回队首值并移除        queue.put("Java");        queue.put("C");        queue.put("Python");        String resultNotEmpty = queue.poll(5, TimeUnit.SECONDS);        assertEquals("Java", resultNotEmpty);         // 测试队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null        queue.clear();        String resultEmpty = queue.poll(5, TimeUnit.SECONDS); // 等待5秒        assertNull(resultEmpty);    }     @Test    void testRemove() throws InterruptedException {        // 初始化队列        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);         // 测试队列为空时,抛出异常        Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {            queue.remove();// 抛异常        });         assertEquals(null, excpetion.getMessage());         // 测试队列不为空时,返回队首值并移除        queue.put("Java");        queue.put("C");        queue.put("Python");        String resultNotEmpty = queue.remove();        assertEquals("Java", resultNotEmpty);} @Test    void testPeek() throws InterruptedException {        // 初始化队列        Queue<String> queue = new LinkedBlockingQueue<String>(3);         // 测试队列不为空时,返回队首值并但不移除        queue.add("Java");        queue.add("C");        queue.add("Python");        String resultNotEmpty = queue.peek();        assertEquals("Java", resultNotEmpty);        resultNotEmpty = queue.peek();        assertEquals("Java", resultNotEmpty);        resultNotEmpty = queue.peek();        assertEquals("Java", resultNotEmpty);         // 测试队列为空时,返回null        queue.clear();        String resultEmpty = queue.peek();        assertNull(resultEmpty);    }     @Test    void testElement() throws InterruptedException {        // 初始化队列        Queue<String> queue = new LinkedBlockingQueue<String>(3);         // 测试队列不为空时,返回队首值并但不移除        queue.add("Java");        queue.add("C");        queue.add("Python");        String resultNotEmpty = queue.element();        assertEquals("Java", resultNotEmpty);        resultNotEmpty = queue.element();        assertEquals("Java", resultNotEmpty);        resultNotEmpty = queue.element();        assertEquals("Java", resultNotEmpty);         // 测试队列为空时,抛出异常        queue.clear();        Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {            queue.element();// 抛异常        });         assertEquals(null, excpetion.getMessage());    }}  5.   LinkedBlockingQueue的应用案例以下是一个生产者-消费者的示例。该示例模拟了1个生产者,2个消费者。当队列满时,则会阻塞生产者生产;当队列空时,则会阻塞消费者消费。 package com.waylau.java.demo.datastructure; import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.BlockingQueue; /** * LinkedBlockingQueue Demo * * @since 1.0.0 2020年5月23日 * @author <a href="https://waylau.com">Way Lau</a> */public class LinkedBlockingQueueDemo {    public static void main(String[] args) {        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);         // 1个生产者        Producer p = new Producer(queue);         // 2个消费者        Consumer c1 = new Consumer("c1", queue);        Consumer c2 = new Consumer("c2", queue);         // 启动线程        new Thread(p).start();        new Thread(c1).start();        new Thread(c2).start();    } } class Producer implements Runnable {    private final BlockingQueue<String> queue;     Producer(BlockingQueue<String> queue) {        this.queue = queue;    }     public void run() {        try {            while (true) {                // 模拟耗时操作                Thread.sleep(1000L);                 queue.put(produce());            }        } catch (InterruptedException ex) {            ex.printStackTrace();        }    }     String produce() {        String apple = "apple: " + System.currentTimeMillis();        System.out.println("produce " + apple);        return apple;    }} class Consumer implements Runnable {    private final BlockingQueue<String> queue;     private final String name;     Consumer(String name, BlockingQueue<String> queue) {        this.queue = queue;        this.name = name;    }     public void run() {        try {            while (true) {                // 模拟耗时操作                Thread.sleep(2000L);                 consume(queue.take());            }        } catch (InterruptedException ex) {            ex.printStackTrace();        }    }     void consume(Object x) {        System.out.println(this.name + " consume " + x);    }}  运行上述程序,输出内容如下: produce apple: 1590308520134c1 consume apple: 1590308520134produce apple: 1590308521135c2 consume apple: 1590308521135produce apple: 1590308522142c1 consume apple: 1590308522142produce apple: 1590308523147c2 consume apple: 1590308523147produce apple: 1590308524156c1 consume apple: 1590308524156produce apple: 1590308525157c2 consume apple: 1590308525157produce apple: 1590308526157c1 consume apple: 1590308526157produce apple: 1590308527157c2 consume apple: 1590308527157 6.   参考引用本系列归档至《Java数据结构及算法实战》:https://github.com/waylau/java-data-structures-and-algorithms-in-action《数据结构和算法基础(Java语言实现)》(柳伟卫著,北京大学出版社出版):https://item.jd.com/13014179.html

作者:|老卫|,原文链接: http://www.imooc.com/article/325253

文章推荐

平衡二叉树的实现

动态规划解0-1背包问题

Python数据分析--Numpy常用函数介绍(3)

有趣的BUG之Stack Overflow

elasticsearch-spark的用法

Arch Linux配置gnome桌面

vue 代码调试神器

python多线程同步售票系统解决思路

软件架构-分布式集中配置中心Spring Cloud Config详解

JavaScript 运行堆栈与闭包

TiDB Cloud GA,助力全球企业在云上构建新一代云原生应用

【组件封装】改造 Element-UI 多选框组件 (el-check