`

集合框架 Queue篇(3)---ConcurrentLinkedQueue

 
阅读更多
Queue

------------
1.ArrayDeque,
2.PriorityQueue,
3.ConcurrentLinkedQueue,

4.DelayQueue,
5.ArrayBlockingQueue,
6.LinkedBlockingQueue,
7.LinkedBlockingDeque
8.PriorityBlockingQueue,
9.SynchronousQueue
------------

---------------------------------------

ConcurrentLinkedQueue(基于链表的无界线程安全队列)

提供了高效的、可伸缩的、线程安全的非阻塞FIFO队列。


先来了解一下AtomicReferenceFieldUpdater的API:

public abstract class AtomicReferenceFieldUpdater<T,V> extends Object
基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。该类用于原子数据结构,该结构中同一节点的几个引用字段都独立受原子更新控制。例如,树节点可能声明为

class Node {
   private volatile Node left, right;

   private static final AtomicReferenceFieldUpdater leftUpdater =
     AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "left");
   private static AtomicReferenceFieldUpdater rightUpdater =
     AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "right");

   Node getLeft() { return left;  }
   boolean compareAndSetLeft(Node expect, Node update) {
     return leftUpdater.compareAndSet(this, expect, update);
   }
   // ... and so on
 }

其中的compareAndSet调用Sun的UnSafe的compareAndSwapInt方法,此方法是native方法,compareAndSwapInt是基于CPU的CAS原语来实现的。

CAS(Compare -And -Swap)简单来说就是由CPU比较内存位置上的值是否与当前值expect相同,如果是则将其设置为update,如果不是则返回false。
基于CAS的操作可认为是无阻塞的,并且由于CAS操作时CPU原语,因此其性能会好于之前同步锁的方式。

ConcurrentLinkedQueue并未使用原子化的引用,而是使用普通的volatile引用来代表每一个节点,并通过基于反射的AtomicReferenceFieldUpdater来进行更新。原子化的域更新器类,代表着已存在的volatile域基于反射的“视图”,使得CAS能够用于已有的volatile域。更新器类没有构造函数,为了创建,需要调用newUpdater的工厂方法,声明类和域的名称。域更新器并不依赖特定的实例;它可以用于更新目标类任何实例的目标域。更新器类提供的原子保护比普通的原子类差一些,因为不能保证底层的域不被直接修改----compareAndSet和算术方法只在其他线程使用原子化的域更新器方法时保证其原子性。
在ConcurrentLinkedQueue中,更新Node的next域是通过使用nextUpdater的compareAndSet方法实现的。这个有点迂回的方案是因性能原因使用的。对于频繁分配的、生命周期短暂的对象,比如队列的链接节点,减少每个Node的AtomicReference创建,对于减小插入操作的开销是非常有效的。

----
ABA问题是因为算法中误用比较交换引起的反常现象,节点被循环使用(主要存在于不能被GC的环境中)。在某些算法中把V的值由A转换为B,再转换为A任然记为一次改变,这时需要重新进行算法中的某些步骤。
简单的解决方案:更新一对值,包括引用和版本号,而不是仅更新该值的引用。即使值由A改为B,又改回A,版本号也是不同的。

------------------------------

/**
*ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。
*队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。
*当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。 
*
*此实现采用了基于CAS的原子操作,性能高于synchronized方法。
*/
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {
  
    /*
    *采用基于AtomicReferenceFieldUpdater的Node结构,内部实现采用了基于CAS的原子操作
    */
    private static class Node<E> {
        private volatile E item;
        private volatile Node<E> next;

        private static final AtomicReferenceFieldUpdater<Node, Node>  nextUpdater =
               AtomicReferenceFieldUpdater.newUpdater (Node.class, Node.class, "next");
        private static final AtomicReferenceFieldUpdater<Node, Object> itemUpdater =
               AtomicReferenceFieldUpdater.newUpdater (Node.class, Object.class, "item");

        Node(E x) { item = x; }

        Node(E x, Node<E> n) { item = x; next = n; }

        E getItem() {
            return item;
        }

        boolean casItem(E cmp, E val) {
            return itemUpdater.compareAndSet(this, cmp, val);
        }

        void setItem(E val) {
            itemUpdater.set(this, val);
        }

        Node<E> getNext() {
            return next;
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return nextUpdater.compareAndSet(this, cmp, val);
        }

        void setNext(Node<E> val) {
            nextUpdater.set(this, val);
        }

    }

    //定义tailUpdater、headUpdater 分别实现对头尾的cas操作
    private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> tailUpdater =
            AtomicReferenceFieldUpdater.newUpdater(ConcurrentLinkedQueue.class, Node.class, "tail");
    private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> headUpdater =
            AtomicReferenceFieldUpdater.newUpdater(ConcurrentLinkedQueue.class,  Node.class, "head");

    private boolean casTail(Node<E> cmp, Node<E> val) {//对尾部cas操作
        return tailUpdater.compareAndSet(this, cmp, val);
    }

    private boolean casHead(Node<E> cmp, Node<E> val) {//对头部cas操作
        return headUpdater.compareAndSet(this, cmp, val);
    }

    /**
     * 头结点(虚节点,第一个节点其实是head.next())
     */
    private transient volatile Node<E> head = new Node<E>(null, null);

    /** 尾节点,队列的最后一个元素 **/
    private transient volatile Node<E> tail = head;

    /**
     * 空构造
     */
    public ConcurrentLinkedQueue() {}

    /**
     * 通过集合构建队列
     */
    public ConcurrentLinkedQueue(Collection<? extends E> c) {
        for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
            add(it.next());
    }

    // Have to override just to update the javadoc
    /**
     * 插入元素(在队尾)
     */
    public boolean add(E e) {
        return offer(e);
    }

    /**
     * 插入元素(在队尾)
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> n = new Node<E>(e, null);//新节点n
        for (;;) {//放到无限循环中,直到找到正确的位置并插入
            Node<E> t = tail;
            Node<E> s = t.getNext();
            if (t == tail) {//再次验证
                if (s == null) {
                    if (t.casNext(s, n)) {//如果t的next为s,则把n设为t的next
                        casTail(t, n);//t为尾的情况下,把n设为尾节点
                        return true;
                    }
                } else {//如果s不为空,判断t是否为尾,是的情况下把s设为尾节点
                    casTail(t, s);
                }
            }
        }
    }

    public E poll() {
        for (;;) {
            Node<E> h = head;
            Node<E> t = tail;
            Node<E> first = h.getNext();
            if (h == head) {
                if (h == t) {//如果头尾是同一节点
                    if (first == null)//空队的情况
                        return null;
                    else
                        casTail(t, first);//用cas把h的next节点设置为尾节点
                } else if (casHead(h, first)) {//用cas把h的next节点first设置为新的头节点
                    E item = first.getItem();
                    if (item != null) {
                        first.setItem(null);//头结点是一个虚节点
                        return item;
                    }
                    // else skip over deleted item, continue loop,
                }
            }
        }
    }

    public E peek() { // 和poll操作类似,只是不删除元素,只是返回头节点的next的item
        for (;;) {
            Node<E> h = head;
            Node<E> t = tail;
            Node<E> first = h.getNext();
            if (h == head) {
                if (h == t) {
                    if (first == null)
                        return null;
                    else
                        casTail(t, first);
                } else {
                    E item = first.getItem();
                    if (item != null)
                        return item;
                    else // remove deleted node and continue
                        casHead(h, first);
                }
            }
        }
    }

    /**
     * 和poll/peek类似,不过返回的是头节点的next节点Node。(在下面的方法中有调用,如size()等)
     */
    Node<E> first() {
        for (;;) {
            Node<E> h = head;
            Node<E> t = tail;
            Node<E> first = h.getNext();
            if (h == head) {
                if (h == t) {
                    if (first == null)
                        return null;
                    else
                        casTail(t, first);
                } else {
                    if (first.getItem() != null)
                        return first;
                    else // remove deleted node and continue
                        casHead(h, first);
                }
            }
        }
    }


    public boolean isEmpty() {
        return first() == null;
    }

    public int size() {
        int count = 0;
        for (Node<E> p = first(); p != null; p = p.getNext()) {
            if (p.getItem() != null) {
                // Collections.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
            }
        }
        return count;
    }

    public boolean contains(Object o) {
        if (o == null) return false;
        for (Node<E> p = first(); p != null; p = p.getNext()) {
            E item = p.getItem();
            if (item != null &&
                o.equals(item))
                return true;
        }
        return false;
    }

    public boolean remove(Object o) {
        if (o == null) return false;
        for (Node<E> p = first(); p != null; p = p.getNext()) {
            E item = p.getItem();
            if (item != null &&
                o.equals(item) &&
                p.casItem(item, null))
                return true;
        }
        return false;
    }

    public Object[] toArray() {
        // Use ArrayList to deal with resizing.
        ArrayList<E> al = new ArrayList<E>();
        for (Node<E> p = first(); p != null; p = p.getNext()) {
            E item = p.getItem();
            if (item != null)
                al.add(item);
        }
        return al.toArray();
    }

    public <T> T[] toArray(T[] a) {
        // try to use sent-in array
        int k = 0;
        Node<E> p;
        for (p = first(); p != null && k < a.length; p = p.getNext()) {
            E item = p.getItem();
            if (item != null)
                a[k++] = (T)item;
        }
        if (p == null) {
            if (k < a.length)
                a[k] = null;
            return a;
        }

        // If won't fit, use ArrayList version
        ArrayList<E> al = new ArrayList<E>();
        for (Node<E> q = first(); q != null; q = q.getNext()) {
            E item = q.getItem();
            if (item != null)
                al.add(item);
        }
        return (T[])al.toArray(a);
    }

  ……

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics