`

集合框架 Queue篇(8)---PriorityBlockingQueue、SynchronousQueue

 
阅读更多
Queue

------------
1.ArrayDeque, (数组双端队列)
2.PriorityQueue, (优先级队列)
3.ConcurrentLinkedQueue, (基于链表的并发队列)

4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
5.ArrayBlockingQueue,           (基于数组的并发阻塞队列)
6.LinkedBlockingQueue,        (基于链表的FIFO阻塞队列)
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
8.PriorityBlockingQueue,        (带优先级的无界阻塞队列)
9.SynchronousQueue                       (并发同步阻塞队列)
-----------------------------------------------------
PriorityBlockingQueue
一个无界优先级阻塞队列,使用与类 PriorityQueue 相同的顺序规则,内部有一个PriorityQueue对象用来实现操作。

public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    private final PriorityQueue<E> q; //内部的PriorityQueue对象用来实现操作
    private final ReentrantLock lock = new ReentrantLock(true);//true表示按照FIFO顺序
    private final Condition notEmpty = lock.newCondition();//notEmpty条件,由于是无界的,就不会越界,不会用到notFull条件

    /**
     * 默认构造,创建一个默认构造的PriorityQueue 
     */
    public PriorityBlockingQueue() {
        q = new PriorityQueue<E>();
    }

    /**
     * 初始化容量的构造,创建对应容量的PriorityQueue 
     */
    public PriorityBlockingQueue(int initialCapacity) {
        q = new PriorityQueue<E>(initialCapacity, null);
    }

    /**
     * 指定容量和比较器的构造,同样是创建对应的PriorityQueue 
     */
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        q = new PriorityQueue<E>(initialCapacity, comparator);
    }

    /**
     *指定初始化集合的构造
     */
    public PriorityBlockingQueue(Collection<? extends E> c) {
        q = new PriorityQueue<E>(c);
    }

    /**
     * 将指定元素插入此优先级队列,调用offer
     */
    public boolean add(E e) {
        return offer(e);
    }

    /**
     * 在加锁的情况下,通过PriorityQueue插入 
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            boolean ok = q.offer(e);
            assert ok;
            notEmpty.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 调用offer,插入
     */
    public void put(E e) {
        offer(e); // never need to block
    }

    /**
     * 无界,插入不需等待
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e); // never need to block
    }
   
    //获取并移除此队列的头,如果此队列为空,则返回 null。
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.poll();
        } finally {
            lock.unlock();
        }
    }
    //获取并移除此队列的头部,在元素变得【可用】之前一直等待(如果有必要)。
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (q.size() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = q.poll();
            assert x != null;
            return x;
        } finally {
            lock.unlock();
        }
    }  
    //获取并移除此队列的头部,在【指定的等待时间】前等待可用的元素(如果有必要)。
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E x = q.poll();
                if (x != null)
                    return x;
                if (nanos <= 0)
                    return null;
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            lock.unlock();
        }
    }
    //获取但不移除此队列的头;如果此队列为空,则返回 null。
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

    ……

    /**
     * 在加锁的情况下,调用PriorityQueue 实现remove
     */
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }

    /**
     * 在加锁的情况下,调用PriorityQueue 实现contains
     */
    public boolean contains(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.contains(o);
        } finally {
            lock.unlock();
        }
    }

……

}
-------------------------------------------
/**
* SynchronousQueue
* 没有内部容量的阻塞队列,因此每一个put()都必须等待一个take(),反之亦然。
* 它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、
* 事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。 
*/
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    
    //TransferStack 、TransferQueue 两个类是该抽象类的实现。
    //该队列同传入参数fair,如果为true表示队列可保证线程以 FIFO 的顺序进行访问(此刻是用TransferQueue 实现操作),
    //否则按自然顺序(此刻用TransferStack实现操作)。
    //Transferer类通过transfer中参数Object e为空与否,来实现对队列的元素的生产(non-null时)和消费(null时)
    static abstract class Transferer {
        abstract Object transfer(Object e, boolean timed, long nanos);
    }

    //time常量
    static final int NCPUS = Runtime.getRuntime().availableProcessors();
    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
    static final int maxUntimedSpins = maxTimedSpins * 16;
    static final long spinForTimeoutThreshold = 1000L;
    /** Dual stack */
    static final class TransferStack extends Transferer { ……  }
    /** Dual Queue */
    static final class TransferQueue extends Transferer { ……  }

    private transient volatile Transferer transferer;
    public SynchronousQueue() {
        this(false);
    }
    public SynchronousQueue(boolean fair) {
        transferer = (fair)? new TransferQueue() : new TransferStack();
    }
    //将指定元素添加到此队列
    public void put(E o) throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, false, 0) == null) {
     Thread.interrupted();
            throw new InterruptedException();
 }
    }
   //将指定元素插入到此队列,如有必要则等待指定的时间
   public boolean offer(E o, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }

    //将指定元素插入到此队列
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }
    //获取并移除此队列的头
    public E take() throws InterruptedException {
        Object e = transferer.transfer(null, false, 0);
        if (e != null)
            return (E)e;
 Thread.interrupted();
        throw new InterruptedException();
    }
    //获取并移除此队列的头,如有必要则等待指定的时间
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        Object e = transferer.transfer(null, true, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return (E)e;
        throw new InterruptedException();
    }

    public E poll() {
        return (E)transferer.transfer(null, true, 0);
    }

    public boolean isEmpty() {
        return true;
    }

    public int size() {
        return 0;
    }

    public int remainingCapacity() {
        return 0;
    }

    public void clear() {
    }

    public boolean contains(Object o) {
        return false;
    }

    public boolean remove(Object o) {
        return false;
    }

    public boolean containsAll(Collection<?> c) {
        return c.isEmpty();
    }

    public boolean removeAll(Collection<?> c) {
        return false;
    }

    public boolean retainAll(Collection<?> c) {
        return false;
    }

    public E peek() {
        return null;
    }
  ……   
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics