- 浏览: 144473 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
EclipseEye:
fair_jm 写道不错 蛮详细的 谢谢分享
SWT/JFace专题 --- SWT中Display和多线程 -
fair_jm:
不错 蛮详细的 谢谢分享
SWT/JFace专题 --- SWT中Display和多线程
Queue
------------
1.ArrayDeque,
2.PriorityQueue,
3.ConcurrentLinkedQueue,
4.DelayQueue,
5.ArrayBlockingQueue,
6.LinkedBlockingQueue,
7.LinkedBlockingDeque
8.PriorityBlockingQueue,
9.SynchronousQueue
------------
---------------------------------------
ConcurrentLinkedQueue(基于链表的无界线程安全队列)
提供了高效的、可伸缩的、线程安全的非阻塞FIFO队列。
先来了解一下AtomicReferenceFieldUpdater的API:
public abstract class AtomicReferenceFieldUpdater<T,V> extends Object
基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。该类用于原子数据结构,该结构中同一节点的几个引用字段都独立受原子更新控制。例如,树节点可能声明为
其中的compareAndSet调用Sun的UnSafe的compareAndSwapInt方法,此方法是native方法,compareAndSwapInt是基于CPU的CAS原语来实现的。
CAS(Compare -And -Swap)简单来说就是由CPU比较内存位置上的值是否与当前值expect相同,如果是则将其设置为update,如果不是则返回false。
基于CAS的操作可认为是无阻塞的,并且由于CAS操作时CPU原语,因此其性能会好于之前同步锁的方式。
ConcurrentLinkedQueue并未使用原子化的引用,而是使用普通的volatile引用来代表每一个节点,并通过基于反射的AtomicReferenceFieldUpdater来进行更新。原子化的域更新器类,代表着已存在的volatile域基于反射的“视图”,使得CAS能够用于已有的volatile域。更新器类没有构造函数,为了创建,需要调用newUpdater的工厂方法,声明类和域的名称。域更新器并不依赖特定的实例;它可以用于更新目标类任何实例的目标域。更新器类提供的原子保护比普通的原子类差一些,因为不能保证底层的域不被直接修改----compareAndSet和算术方法只在其他线程使用原子化的域更新器方法时保证其原子性。
在ConcurrentLinkedQueue中,更新Node的next域是通过使用nextUpdater的compareAndSet方法实现的。这个有点迂回的方案是因性能原因使用的。对于频繁分配的、生命周期短暂的对象,比如队列的链接节点,减少每个Node的AtomicReference创建,对于减小插入操作的开销是非常有效的。
----
ABA问题是因为算法中误用比较交换引起的反常现象,节点被循环使用(主要存在于不能被GC的环境中)。在某些算法中把V的值由A转换为B,再转换为A任然记为一次改变,这时需要重新进行算法中的某些步骤。
简单的解决方案:更新一对值,包括引用和版本号,而不是仅更新该值的引用。即使值由A改为B,又改回A,版本号也是不同的。
------------------------------
------------
1.ArrayDeque,
2.PriorityQueue,
3.ConcurrentLinkedQueue,
4.DelayQueue,
5.ArrayBlockingQueue,
6.LinkedBlockingQueue,
7.LinkedBlockingDeque
8.PriorityBlockingQueue,
9.SynchronousQueue
------------
---------------------------------------
ConcurrentLinkedQueue(基于链表的无界线程安全队列)
提供了高效的、可伸缩的、线程安全的非阻塞FIFO队列。
先来了解一下AtomicReferenceFieldUpdater的API:
public abstract class AtomicReferenceFieldUpdater<T,V> extends Object
基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。该类用于原子数据结构,该结构中同一节点的几个引用字段都独立受原子更新控制。例如,树节点可能声明为
class Node { private volatile Node left, right; private static final AtomicReferenceFieldUpdater leftUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "left"); private static AtomicReferenceFieldUpdater rightUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "right"); Node getLeft() { return left; } boolean compareAndSetLeft(Node expect, Node update) { return leftUpdater.compareAndSet(this, expect, update); } // ... and so on }
其中的compareAndSet调用Sun的UnSafe的compareAndSwapInt方法,此方法是native方法,compareAndSwapInt是基于CPU的CAS原语来实现的。
CAS(Compare -And -Swap)简单来说就是由CPU比较内存位置上的值是否与当前值expect相同,如果是则将其设置为update,如果不是则返回false。
基于CAS的操作可认为是无阻塞的,并且由于CAS操作时CPU原语,因此其性能会好于之前同步锁的方式。
ConcurrentLinkedQueue并未使用原子化的引用,而是使用普通的volatile引用来代表每一个节点,并通过基于反射的AtomicReferenceFieldUpdater来进行更新。原子化的域更新器类,代表着已存在的volatile域基于反射的“视图”,使得CAS能够用于已有的volatile域。更新器类没有构造函数,为了创建,需要调用newUpdater的工厂方法,声明类和域的名称。域更新器并不依赖特定的实例;它可以用于更新目标类任何实例的目标域。更新器类提供的原子保护比普通的原子类差一些,因为不能保证底层的域不被直接修改----compareAndSet和算术方法只在其他线程使用原子化的域更新器方法时保证其原子性。
在ConcurrentLinkedQueue中,更新Node的next域是通过使用nextUpdater的compareAndSet方法实现的。这个有点迂回的方案是因性能原因使用的。对于频繁分配的、生命周期短暂的对象,比如队列的链接节点,减少每个Node的AtomicReference创建,对于减小插入操作的开销是非常有效的。
----
ABA问题是因为算法中误用比较交换引起的反常现象,节点被循环使用(主要存在于不能被GC的环境中)。在某些算法中把V的值由A转换为B,再转换为A任然记为一次改变,这时需要重新进行算法中的某些步骤。
简单的解决方案:更新一对值,包括引用和版本号,而不是仅更新该值的引用。即使值由A改为B,又改回A,版本号也是不同的。
------------------------------
/** *ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。 *队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。 *当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。 * *此实现采用了基于CAS的原子操作,性能高于synchronized方法。 */ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { /* *采用基于AtomicReferenceFieldUpdater的Node结构,内部实现采用了基于CAS的原子操作 */ private static class Node<E> { private volatile E item; private volatile Node<E> next; private static final AtomicReferenceFieldUpdater<Node, Node> nextUpdater = AtomicReferenceFieldUpdater.newUpdater (Node.class, Node.class, "next"); private static final AtomicReferenceFieldUpdater<Node, Object> itemUpdater = AtomicReferenceFieldUpdater.newUpdater (Node.class, Object.class, "item"); Node(E x) { item = x; } Node(E x, Node<E> n) { item = x; next = n; } E getItem() { return item; } boolean casItem(E cmp, E val) { return itemUpdater.compareAndSet(this, cmp, val); } void setItem(E val) { itemUpdater.set(this, val); } Node<E> getNext() { return next; } boolean casNext(Node<E> cmp, Node<E> val) { return nextUpdater.compareAndSet(this, cmp, val); } void setNext(Node<E> val) { nextUpdater.set(this, val); } } //定义tailUpdater、headUpdater 分别实现对头尾的cas操作 private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> tailUpdater = AtomicReferenceFieldUpdater.newUpdater(ConcurrentLinkedQueue.class, Node.class, "tail"); private static final AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> headUpdater = AtomicReferenceFieldUpdater.newUpdater(ConcurrentLinkedQueue.class, Node.class, "head"); private boolean casTail(Node<E> cmp, Node<E> val) {//对尾部cas操作 return tailUpdater.compareAndSet(this, cmp, val); } private boolean casHead(Node<E> cmp, Node<E> val) {//对头部cas操作 return headUpdater.compareAndSet(this, cmp, val); } /** * 头结点(虚节点,第一个节点其实是head.next()) */ private transient volatile Node<E> head = new Node<E>(null, null); /** 尾节点,队列的最后一个元素 **/ private transient volatile Node<E> tail = head; /** * 空构造 */ public ConcurrentLinkedQueue() {} /** * 通过集合构建队列 */ public ConcurrentLinkedQueue(Collection<? extends E> c) { for (Iterator<? extends E> it = c.iterator(); it.hasNext();) add(it.next()); } // Have to override just to update the javadoc /** * 插入元素(在队尾) */ public boolean add(E e) { return offer(e); } /** * 插入元素(在队尾) */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node<E> n = new Node<E>(e, null);//新节点n for (;;) {//放到无限循环中,直到找到正确的位置并插入 Node<E> t = tail; Node<E> s = t.getNext(); if (t == tail) {//再次验证 if (s == null) { if (t.casNext(s, n)) {//如果t的next为s,则把n设为t的next casTail(t, n);//t为尾的情况下,把n设为尾节点 return true; } } else {//如果s不为空,判断t是否为尾,是的情况下把s设为尾节点 casTail(t, s); } } } } public E poll() { for (;;) { Node<E> h = head; Node<E> t = tail; Node<E> first = h.getNext(); if (h == head) { if (h == t) {//如果头尾是同一节点 if (first == null)//空队的情况 return null; else casTail(t, first);//用cas把h的next节点设置为尾节点 } else if (casHead(h, first)) {//用cas把h的next节点first设置为新的头节点 E item = first.getItem(); if (item != null) { first.setItem(null);//头结点是一个虚节点 return item; } // else skip over deleted item, continue loop, } } } } public E peek() { // 和poll操作类似,只是不删除元素,只是返回头节点的next的item for (;;) { Node<E> h = head; Node<E> t = tail; Node<E> first = h.getNext(); if (h == head) { if (h == t) { if (first == null) return null; else casTail(t, first); } else { E item = first.getItem(); if (item != null) return item; else // remove deleted node and continue casHead(h, first); } } } } /** * 和poll/peek类似,不过返回的是头节点的next节点Node。(在下面的方法中有调用,如size()等) */ Node<E> first() { for (;;) { Node<E> h = head; Node<E> t = tail; Node<E> first = h.getNext(); if (h == head) { if (h == t) { if (first == null) return null; else casTail(t, first); } else { if (first.getItem() != null) return first; else // remove deleted node and continue casHead(h, first); } } } } public boolean isEmpty() { return first() == null; } public int size() { int count = 0; for (Node<E> p = first(); p != null; p = p.getNext()) { if (p.getItem() != null) { // Collections.size() spec says to max out if (++count == Integer.MAX_VALUE) break; } } return count; } public boolean contains(Object o) { if (o == null) return false; for (Node<E> p = first(); p != null; p = p.getNext()) { E item = p.getItem(); if (item != null && o.equals(item)) return true; } return false; } public boolean remove(Object o) { if (o == null) return false; for (Node<E> p = first(); p != null; p = p.getNext()) { E item = p.getItem(); if (item != null && o.equals(item) && p.casItem(item, null)) return true; } return false; } public Object[] toArray() { // Use ArrayList to deal with resizing. ArrayList<E> al = new ArrayList<E>(); for (Node<E> p = first(); p != null; p = p.getNext()) { E item = p.getItem(); if (item != null) al.add(item); } return al.toArray(); } public <T> T[] toArray(T[] a) { // try to use sent-in array int k = 0; Node<E> p; for (p = first(); p != null && k < a.length; p = p.getNext()) { E item = p.getItem(); if (item != null) a[k++] = (T)item; } if (p == null) { if (k < a.length) a[k] = null; return a; } // If won't fit, use ArrayList version ArrayList<E> al = new ArrayList<E>(); for (Node<E> q = first(); q != null; q = q.getNext()) { E item = q.getItem(); if (item != null) al.add(item); } return (T[])al.toArray(a); } …… }
发表评论
-
Nio Socket
2013-05-16 05:53 0asfda -
结合jdk源码解读,Error Exception
2013-05-10 04:00 0/* * @(#)Error.java 1.17 05/11 ... -
从不同的角度,重新审视class和interface
2013-05-07 03:40 0java开发中,对应class和interface的基本区别都 ... -
java.lang.Object
2013-05-07 03:35 0/* * @(#)Object.java 1.73 06/0 ... -
反射机制+类加载机制
2013-02-18 01:30 0反射机制+类加载机制 -
并发专题----使用开源软件Amino构建并发应用程序/多线程运行时分析工具MTRAT
2013-02-14 00:50 1342使用开源软件构建并发 ... -
并发专题 ---- 线程安全
2013-02-14 00:50 718线程安全 ================== ... -
并发专题 --- 锁
2013-02-14 00:50 778相比于synchronized,ReentrantLock 提 ... -
并发专题 ----(JMM)java内存模型
2013-02-14 00:50 507Java 内存模型 ------------ ... -
并发专题 ---java.util.concurrent 包
2013-02-13 02:26 1797java.util.concurrent 包 原 ... -
集合框架 Queue篇(8)---PriorityBlockingQueue、SynchronousQueue
2013-02-07 12:40 1271Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(7)---LinkedBlockingDeque
2013-02-07 12:40 829Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(6)---LinkedBlockingQueue
2013-02-07 12:39 801Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(5)---ArrayBlockingQueue
2013-02-06 10:39 674Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(4)---阻塞队列和生产者-消费者模式、DelayQueue
2013-02-06 10:39 965Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(2)---PriorityQueue
2013-02-06 10:38 800Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(1)---ArrayDeque
2013-02-06 10:38 898Queue ------------ 1.ArrayDeq ... -
集合框架 Set篇---HashSet、LinkedHashSet、TreeSet、CopyOnWriteArraySet、ConcurrentSkipList
2013-02-05 08:43 1447Set --------- 1.HashSet 2.Link ... -
集合框架 List篇(2)---CopyOnWriteArrayList
2013-02-05 08:43 818List ----------- 1.ArrayList(jd ... -
集合框架 List篇(1)---ArrayList、LinkedList
2013-02-05 08:42 886List ----------- 1.ArrayList(jd ...
相关推荐
python库。 资源全名:persist_queue-0.4.2-py2.py3-none-any.whl
Laravel开发-laravel-queue-aws-batch 用于AWS批处理的Laravel队列,允许用户将作业提交到批处理队列进行处理。
离线安装包,亲测可用
Queue-Queue-Queue
官方离线安装包,亲测可用。使用rpm -ivh [rpm完整包名] 进行安装
离线安装包,亲测可用
python库。 资源全名:django_simple_queue-0.1.5-py3-none-any.whl
资源来自pypi官网。 资源全名:redis-queue-tool-4.1.5.tar.gz
前端开源库-promise-queue-plusPromise Queue Plus,基于Promise的队列。支持超时、重试等。
python库。 资源全名:dask_remote_jobqueue-0.4.2-py3-none-any.whl
资源来自pypi官网。 资源全名:django_simple_queue-0.1.6-py3-none-any.whl
资源分类:Python库 所属语言:Python 资源全名:dask_remote_jobqueue-0.2.3-py3-none-any.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
Implementation-of-the-Queue-Interface-master.rar
Laravel开发-laravel-queue-azure-restarter .zip
资源分类:Python库 所属语言:Python 资源全名:redis_queue_tool-4.0.7-py3-none-any.whl 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059