`

集合框架 Queue篇(7)---LinkedBlockingDeque

 
阅读更多
Queue

------------
1.ArrayDeque, (数组双端队列)
2.PriorityQueue, (优先级队列)
3.ConcurrentLinkedQueue, (基于链表的并发队列)

4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
5.ArrayBlockingQueue,           (基于数组的并发阻塞队列)
6.LinkedBlockingQueue,        (基于链表的FIFO阻塞队列)
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
8.PriorityBlockingQueue,        (带优先级的无界阻塞队列)
9.SynchronousQueue                       (并发同步阻塞队列)
-----------------------------------------------------
LinkedBlockingDeque
是基于链表的无界的双端阻塞队列

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>,  java.io.Serializable {

    /** 包含前驱和后继节点的双向链式结构 */
    static final class Node<E> {
 E item;
        Node<E> prev;
        Node<E> next;
        Node(E x, Node<E> p, Node<E> n) {
            item = x;
            prev = p;
            next = n;
        }
    }

    /** 头节点 */
    private transient Node<E> first;
    /** 尾节点 */
    private transient Node<E> last;
    /** 元素个数*/
    private transient int count;
    /** 队列容量 */
    private final int capacity;
    /** 锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** notEmpty条件 */
    private final Condition notEmpty = lock.newCondition();
    /** notFull条件 */
    private final Condition notFull = lock.newCondition();

    /** 构造方法 */
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }
    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        for (E e : c)
            add(e);
    }


    /**
     * 添加元素作为新的头节点
     */
    private boolean linkFirst(E e) {
        if (count >= capacity)
            return false;
        ++count;
        Node<E> f = first;
        Node<E> x = new Node<E>(e, null, f);
        first = x;
        if (last == null)
            last = x;
        else
            f.prev = x;
        notEmpty.signal();
        return true;
    }

    /**
     * 添加尾元素
     */
    private boolean linkLast(E e) {
        if (count >= capacity)
            return false;
        ++count;
        Node<E> l = last;
        Node<E> x = new Node<E>(e, l, null);
        last = x;
        if (first == null)
            first = x;
        else
            l.next = x;
        notEmpty.signal();
        return true;
    }

    /**
     * 返回并移除头节点
     */
    private E unlinkFirst() {
        Node<E> f = first;
        if (f == null)
            return null;
        Node<E> n = f.next;
        first = n;
        if (n == null)
            last = null;
        else
            n.prev = null;
        --count;
        notFull.signal();
        return f.item;
    }

    /**
     * 返回并移除尾节点
     */
    private E unlinkLast() {
        Node<E> l = last;
        if (l == null)
            return null;
        Node<E> p = l.prev;
        last = p;
        if (p == null)
            first = null;
        else
            p.next = null;
        --count;
        notFull.signal();
        return l.item;
    }

    /**
     * 移除节点x
     */
    private void unlink(Node<E> x) {
        Node<E> p = x.prev;
        Node<E> n = x.next;
        if (p == null) {//x是头的情况
            if (n == null)
                first = last = null;
            else {
                n.prev = null;
                first = n;
            }
        } else if (n == null) {//x是尾的情况
            p.next = null;
            last = p;
        } else {//x是中间的情况
            p.next = n;
            n.prev = p;
        }
        --count;
        notFull.signalAll();
    }

    //--------------------------------- BlockingDeque 双端阻塞队列方法实现
    public void addFirst(E e) {
        if (!offerFirst(e))
            throw new IllegalStateException("Deque full");
    }
    public void addLast(E e) {
        if (!offerLast(e))
            throw new IllegalStateException("Deque full");
    }
    public boolean offerFirst(E e) {
        if (e == null) throw new NullPointerException();
        lock.lock();
        try {
            return linkFirst(e);
        } finally {
            lock.unlock();
        }
    }
    public boolean offerLast(E e) {
        if (e == null) throw new NullPointerException();
        lock.lock();
        try {
            return linkLast(e);
        } finally {
            lock.unlock();
        }
    }
    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        lock.lock();
        try {
            while (!linkFirst(e))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        lock.lock();
        try {
            while (!linkLast(e))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    public boolean offerFirst(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (e == null) throw new NullPointerException();
 long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (linkFirst(e))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
        } finally {
            lock.unlock();
        }
    }
    public boolean offerLast(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (e == null) throw new NullPointerException();
 long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (linkLast(e))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
        } finally {
            lock.unlock();
        }
    }
    public E removeFirst() {
        E x = pollFirst();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    public E removeLast() {
        E x = pollLast();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    public E pollFirst() {
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }
    public E pollLast() {
        lock.lock();
        try {
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }
    public E takeFirst() throws InterruptedException {
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    public E takeLast() throws InterruptedException {
        lock.lock();
        try {
            E x;
            while ( (x = unlinkLast()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
 long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            for (;;) {
                E x = unlinkFirst();
                if (x != null)
                    return x;
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
        } finally {
            lock.unlock();
        }
    }
    public E pollLast(long timeout, TimeUnit unit)
        throws InterruptedException {
 long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            for (;;) {
                E x = unlinkLast();
                if (x != null)
                    return x;
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
        } finally {
            lock.unlock();
        }
    }
    public E getFirst() {
        E x = peekFirst();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    public E getLast() {
        E x = peekLast();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    public E peekFirst() {
        lock.lock();
        try {
            return (first == null) ? null : first.item;
        } finally {
            lock.unlock();
        }
    }
    public E peekLast() {
        lock.lock();
        try {
            return (last == null) ? null : last.item;
        } finally {
            lock.unlock();
        }
    }
    public boolean removeFirstOccurrence(Object o) {
        if (o == null) return false;
        lock.lock();
        try {
            for (Node<E> p = first; p != null; p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

    public boolean removeLastOccurrence(Object o) {
        if (o == null) return false;
        lock.lock();
        try {
            for (Node<E> p = last; p != null; p = p.prev) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

    //---------------------------------- BlockingQueue阻塞队列 方法实现
    public boolean add(E e) {
 addLast(e);
 return true;
    }
    public boolean offer(E e) {
 return offerLast(e);
    }
    public void put(E e) throws InterruptedException {
 putLast(e);
    }
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
 return offerLast(e, timeout, unit);
    }
    public E remove() {
 return removeFirst();
    }
    public E poll() {
 return pollFirst();
    }
    public E take() throws InterruptedException {
 return takeFirst();
    }
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 return pollFirst(timeout, unit);
    }
    public E element() {
 return getFirst();
    }
    public E peek() {
 return peekFirst();
    }

    //------------------------------------------- Stack 方法实现
    public void push(E e) {
 addFirst(e);
    }
    public E pop() {
 return removeFirst();
    }

    //------------------------------------------- Collection 方法实现
    public boolean remove(Object o) {
 return removeFirstOccurrence(o);
    }
    public int size() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
    public boolean contains(Object o) {
        if (o == null) return false;
        lock.lock();
        try {
            for (Node<E> p = first; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
            lock.unlock();
        }
    }
    boolean removeNode(Node<E> e) {
        lock.lock();
        try {
            for (Node<E> p = first; p != null; p = p.next) {
                if (p == e) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
  ……
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics