`

集合框架 Map篇(4)----ConcurrentHashMap

阅读更多
Map
------


1.HashMap
2.LinkedHashMap
3.IdentityHashMap
4.WeakHashMap
5.TreeMap
6.EnumMap
7.ConcurrentHashMap
8.ConcurrentSkipListMap
--------------------------------------

------------------ConcurrentHashMap-----------------

ConcurrentHashMap是一个线程安全的哈希表。ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁。

ConcurrentHashMap存储结构

ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment(分段)的结构,ConcurrentHashMap内部有一个Segment的数组实现的哈希表。每个Segment又包含一个Entry数组实现的hash表(这个hash 表类似于HashMap中的实现方式),结构图如下:
      
                                                                

ConcurrentHashMap存储结构图

   从上面的结构我们可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部。
  因此,这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(当且仅当这些写操作都非常平均地分布在所有的Segment上时),所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。

Segment 的具体实现
// Segment是ReentrantLock的子类实现,这样方便锁的管理
static final class Segment<K,V> extends ReentrantLockimplements Serializable {
    //该片段(Segment)中元素的个数,注意volatile修饰符(保证了count的修改可见性)
    transientvolatileint count;
    //对Segment中的table修改的次数
    transient int modCount; 
    //临界值,当Segment中的元素个数大于该值是,就对Segment的table进行扩容
    transient int threshold;   
    transient volatile HashEntry<K,V>[] table;// Segment中的hash表结构
    final float loadFactor;//负载因子=元素个数和表长的比值
    get(){…}
    Put(){…}
    …
} 


HashEntry的具体实现
/**
*与HashMap中的Entry实现还是有较大区别的:
*1.不是Map.Entry的子类实现,没有重写equals/hashCode方法;
*2.value 是volatile类型的(保证了修改的可见性)
*3.next是final类型的,意味着next引用一经赋值是不可变的
*/
   static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;

        HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
            this.key = key;
            this.hash = hash;
            this.next = next;
            this.value = value;
        }

 @SuppressWarnings("unchecked")
 static final <K,V> HashEntry<K,V>[] newArray(int i) {
     return new HashEntry[i];
 }
}

ConcurrentHashMap初始化
/**
* ConcurrentHashMap初始化需要3个参数:
*1. initialCapacity:表示 初始化容量;
*2. loadFactor:表示 负载因子
*3. concurrencyLevel表示 片段(Segment)的数量
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();

        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        //找到大于concurrencyLevel(分段个数)的最小的2的指数:
        //例如concurrencyLevel=12,ssize=16
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        segmentShift = 32 - sshift;
        segmentMask = ssize - 1;
        this.segments = Segment.newArray(ssize);

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;//把总的容量,平均分成ssize段
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)  //得到每段的2的指数的容量(也就是Segment的容量)
            cap <<= 1;

        for (int i = 0; i < this.segments.length; ++i)//初始化segments数组
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }
Segment的数量以及Segment的容量都是2的指数,这样的好处就是方便通过移位的方式计算hash,提高了hash效率。

ConcurrentHashMap的get操作
/**
*分两步:1.得到segment 段 的index
*        2.得到所在段的 entry的index
* 所用的hash值相同,只不过通过取不同的位来确定段index和Entry的index
*/
public V get(Object key) {
     int hash = hash(key.hashCode());
     return segmentFor(hash).get(key, hash);
}

/*
*得到段的Index
*假设segment的数量为2^p,则:
*segmentShift = 32 - p; segmentMask = 2^p - 1;
*通过无符号右移hash值得到其高p位&0111111…1(其中1的个数为p-1个),
*得到[0~2^p-1]直接的index作为segment数组的下标
*/
final Segment<K,V> segmentFor(int hash){
     return segments[(hash >>> segmentShift) & segmentMask];
}
确定段后,再调用该段的get方法以获得entry的index:
V get(Object key, int hash) {
            if (count != 0) { // read-volatile
                HashEntry<K,V> e = getFirst(hash);
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;
                        if (v != null)
                            return v;
                            //只有在value为空的情况(其他线程正执行put操作),
                            //才进入加锁获取
                        return readValueUnderLock(e);                 
                     }
                    e = e.next;
                }
            }
            return null;
 }
得到key位置的链表的头位置index方法如下:
HashEntry<K,V> getFirst(int hash) {
       HashEntry<K,V>[] tab = table;
       return tab[hash & (tab.length - 1)];
}
ConcurrentHashMap的put操作
//同样是先找到segment,然后调用segment的put方法
public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, false);
}

//segment的put方法,整个方法在加锁的情况下进行
//(该锁紧针对所在的segment分段,而不是整个Map)
V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash();
                HashEntry<K,V>[] tab = table;
                //通过key的hash定位到table中的index处得到链结构的头结点
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                //遍历链表寻找key对应的Entry
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;
               
                V oldValue;
                if (e != null) {//如果有key对应的Entry,覆盖原来的值
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {//没有对应的entry,新建entry置于table的index处作链头
                  oldValue = null;
                  ++modCount;
                  tab[index] = new HashEntry<K,V>(key, hash, first, value);
                  count = c; // write-volatile
                }
                return oldValue;
            } finally {
                unlock();
            }
 }
对Segment的put操作是加锁完成的,如果Segment中元素的数量超过了阈值(由构造函数中的loadFactor算出)需要进行对Segment扩容,并且要进行rehash.


ConcurrentHashMap的remove操作

remove操作的前面一部分和前面的get和put操作一样,都是定位Segment的过程,然后再调用Segment的remove方法:
//remove的操作也是在加锁的情况下进行
V remove(Object key, int hash, Object value) {
            lock();
            try {
                int c = count - 1;
                //通过key的hash定位到table中的index处得到链结构的头结点
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                //遍历链表寻找key对应的Entry
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue = null;
                if (e != null) {
                    V v = e.value;
                    if (value == null || value.equals(v)) {/
                      oldValue = v;
                      ++modCount;
                      
                      HashEntry<K,V> newFirst = e.next;
                      for (HashEntry<K,V> p = first; p != e; p = p.next)
                        newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value);

                      tab[index] = newFirst;
                      count = c; // write-volatile
                    }
                }
                return oldValue;
            } finally {
                unlock();
            }
}
执行删除操作也是在加锁的情况下:e后边的next关系不变;/重头结点开始遍历e前面的元素,最终把e前面的结点next关系//翻转,并把翻转后的第一个结点(也就是翻转前,e的“前驱”)作为了新链的头,保存到table的index处。下图以删除E3为例进行了演示:



ConcurrentHashMap的获取size()方法
上面的get/put/remove操作都是在单个Segment中进行的,但是ConcurrentHashMap有一些操作是在多个Segment中进行,比如size操作,ConcurrentHashMap的size操作也采用了一种比较巧的方式,来尽量避免对所有的Segment都加锁。
//size()方法的实现
public int size() {
        final Segment<K,V>[] segments = this.segments;
        long sum = 0;
        long check = 0;
        int[] mc = new int[segments.length];
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
            check = 0;//第二遍 遍历segments得到的总元素个数
            sum = 0; //第一遍 遍历segments得到的总元素个数
            int mcsum = 0;//改变的次数
            for (int i = 0; i < segments.length; ++i) {
                sum += segments[i].count;
                mcsum += mc[i] = segments[i].modCount;
            }
            if (mcsum != 0) {//有改变的情况下才如下操作,否则直接下一步
                for (int i = 0; i < segments.length; ++i) {
                    check += segments[i].count;
                    //如果任一segment的modCoun有变化,
                    //结束操作,进入下一遍历比较
                    if (mc[i] != segments[i].modCount) {
                        check = -1; // force retry
                        break;
                    }
                }
            }
            if (check == sum)
                break;
        }
        if (check != sum) { // 依赖锁住所有的segment,然后一一计算相加
            sum = 0;
            for (int i = 0; i < segments.length; ++i)//所有的segment加锁
                segments[i].lock();
            for (int i = 0; i < segments.length; ++i)
                sum += segments[i].count;
            for (int i = 0; i < segments.length; ++i) //所有的segment解锁
                segments[i].unlock();
        }
        if (sum > Integer.MAX_VALUE)
            return Integer.MAX_VALUE;
        else
            return (int)sum;
    }
Size()操作就是遍历了两次Segment,每次记录Segment的count值,然后将两次的count进行比较,如果相同,则表示期间没有发生过修改操作,就将原先遍历的结果返回,如果不相同,则把这个过程再重复做一次(重复检查的次数是有Map的常量static final RETRIES_BEFORE_LOCK=2决定的),如果再不相同,则就需要将所有的Segment都锁住,然后一个一个遍历了。



  • 大小: 60.8 KB
  • 大小: 40 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics