Java并发编程之 ConcurrentHashMap

x33g5p2x  于2021-10-06 转载在 Java  
字(7.2k)|赞(0)|评价(0)|浏览(123)

JDK1.7

首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。

Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。

一个ConcurrentHashMap里包含一个Segment数组Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment

可以说,ConcurrentHashMap是一个二级哈希表。在一个总的哈希表下面,有若干个子哈希表(Segment)。

优点:多个线程去访问不同的Segment,没有冲突,可以提高效率
缺点:Segment数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒加载

初始化

  • initialCapacity:初始容量,即HashEntry的数量,默认为16
  • loadFactor:负载因子,默认为0.75
  • concurrencyLevel:并发级别,这个值用来确定Segment的个数。Segment的个数是大于等于concurrencyLevel的第一个2 n 2^n2n的数。比如,如果concurrencyLevel为12,13,14,15,16这些数,则Segment的数目为16(2的4次方)。默认值为16。理想情况下ConcurrentHashMap的真正的并发访问量能够达到concurrencyLevel,因为有concurrencyLevel个Segment,假如有concurrencyLevel个线程需要访问Map,并且需要访问的数据都恰好分别落在不同的Segment中,则这些线程能够无竞争地自由访问(因为他们不需要竞争同一把锁),达到同时访问的效果。这也是为什么这个参数起名为“并发级别”的原因。

如果initialCapacity=64,concurrencyLevel=16,那么每个Segment下面的HashEntry容量就是 64/16 = 4。
默认每个Segment下面的HashEntry容量就是 16/16 = 1。

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    // 验证参数的合法性
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // concurrencyLevel也就是Segment的个数不能超过规定的最大Segment的个数,默认值为static final int MAX_SEGMENTS = 1 << 16;,如果超过这个值,设置为这个值
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    int sshift = 0;
    int ssize = 1;
    // 使用循环找到大于等于concurrencyLevel的第一个2的n次方的数ssize,这个数就是Segment数组的大小
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 计算每个Segment平均应该放置多少个元素。比如初始容量为15,Segment个数为4,则每个Segment平均需要放置4个元素
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); 
    this.segments = ss;
}

get()

get()方法没有加锁,而是使用UNSAFE.getObjectVolatile()来保证可见性

  1. 获得输入key的hash值
  2. 通过hash值,定位到对应的Segment对象
  3. 再次通过hash值,定位到Segment对应的HashEntry对象
  4. 在该HashEntry中进行查找(与HashMap的查找方式类似)
public V get(Object key) {
    Segment<K,V> s; 
    HashEntry<K,V>[] tab;
    int h = hash(key);
    // u为Segment对象的下标
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // s即为Segment对象
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // e即为定位到的HashEntry
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

put()

  1. 获取输入的key的hash值
  2. 通过hash值定位到对应的Segment对象
  3. 获取可重入锁,定位到Segment当中HashEntry的具体位置(进入Segment的get()方法)
  4. 插入(头插)或覆盖HashEntry对象(与JDK1.7的HashMap的添加方式类似)
  5. 释放锁
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 获取输入key的hash值
    int hash = hash(key);
    // 计算出Segment的下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 定位到对应的Segment对象,判断是否为null,如果是则创建Segment
    if ((s = (Segment<K,V>)UNSAFE.getObject         
         (segments, (j << SSHIFT) + SBASE)) == null)
        // CAS确保segment被出啊昂见且不被重复创建 
        s = ensureSegment(j);
    // 进入Segment的put()流程
    return s.put(key, hash, value, false);
}

JDK1.8

ConcurrentHashMap在1.8中的实现,相比于1.7的版本基本上全部都变掉了。首先,取消了Segment分段锁的数据结构,取而代之的是数组+链表+红黑树的结构。而对于锁的粒度,调整为对每个数组元素加锁(链表头/树根 Node)。然后是定位节点的hash算法被简化了,这样带来的弊端是hash冲突会加剧。因此在链表节点数量大于8时,会将链表转化为红黑树进行存储。这样一来,查询的时间复杂度就会由原先的O(n)变为O(logN)。下面是其基本结构:

重要属性和内部类

// 默认为0
// 初始化时为-1
// 扩容时为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为下一次扩容的阈值大小
private transient volatile int sizeCtl;

// 整个ConcurrentHashMap就是一个Node类型的数组table
static class Node<K,V> implements Map.Entry<K,V> {}
// 懒加载,负责存储Node结点
transient volatile Node<K,V>[] table;

// 扩容时的新数组
private transient volatile Node<K,V>[] nextTable;

初始化

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 初始容量小于并发度时会将初始容量改为并发度
    if (initialCapacity < concurrencyLevel)  
        initialCapacity = concurrencyLevel;  
    // 懒加载,在构造器中仅仅计算了数组的容量大小,只有在后面第一次使用时(put())才会真正创建
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    // tableSizeFor()保证数组容量为2^n
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

get()

get()全程不需要加锁是因为Node对象的value是用volatile修饰的(volatile V val

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // spread()确保返回结果是正整数
    int h = spread(key.hashCode());
    // 定位链表头所在的数组下标
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果头结点就是要查找的key,直接返回value
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 正在扩容或链表树化,去新链表或红黑树中去找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 顺着链表正常找下去
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

put()

  1. 通过hash直接定位到桶,拿到首结点后进行判断
  2. 如果数组为空则先CAS初始化数组(懒加载)
  3. 首结点为空则CAS进行插入
  4. 首结点的hash为-1则说明正在扩容,则帮忙扩容
  5. 否则对首结点加锁后再往链表(或树)后面添加
public V put(K key, V value) {
    return putVal(key, value, false);
}

// onlyIfAbsent:是否选择用新值覆盖掉旧值
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        // f是链表的头结点
        // fh是链表头结点的hash
        // i是链表在table数组的下标
        Node<K,V> f; int n, i, fh;
        // 此时table数组还没有被初始化(懒加载)
        if (tab == null || (n = tab.length) == 0)
            // CAS初始化
            tab = initTable();
        // 首结点为null
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // CAS进行添加
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                  
        }
        // 帮忙扩容
        // static final int MOVED = -1;
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 对首结点加锁后再往链表(或树)后面添加
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

区别

JDK1.8的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,JDK1.8的ConcurrentHashMap只是增加了同步的操作来控制并发。

JDK1.7:ReentrantLock + Segment + HashEntry
JDK1.8:synchronized + CAS + HashEntry + Node

  1. JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首结点)
  2. JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,但是由于粒度的降低,实现的复杂度也增加了
  3. JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档
  4. JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock:因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了
  5. JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然
  6. 在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据

相关文章

最新文章

更多