源码分析HashMap及ConcurrentHashMap

崩天的勾玉
崩天的勾玉
崩天的勾玉
65
文章
4
评论
2021年5月6日17:03:32
评论
202 34968字

概述

util包下的Map接口定义了4个实现类,分别是HashMap、Hashtable、LinkedHashMap和TreeMap,类继承关系如下图所示:

(1) HashMap:它根据键的hashCode值存储数据,大多数情况下可以直接定位到它的值,因而具有很快的访问速度,但遍历顺序却是不确定的。 HashMap最多只允许一条记录的键为null,允许多条记录的值为null。HashMap非线程安全,即任一时刻可以有多个线程同时写HashMap,可能会导致数据的不一致。如果需要满足线程安全,可以用 Collections的synchronizedMap方法使HashMap具有线程安全的能力,或者使用ConcurrentHashMap。

(2) Hashtable:Hashtable是遗留类,很多映射的常用功能与HashMap类似,不同的是它承自Dictionary类,并且是线程安全的,任一时间只有一个线程能写Hashtable,并发性不如ConcurrentHashMap,因为ConcurrentHashMap引入了分段锁。Hashtable不建议在新代码中使用,不需要线程安全的场合可以用HashMap替换,需要线程安全的场合可以用ConcurrentHashMap替换。初始容量11,负载因子0.75。Hashtable 扩容规则为当前容量翻倍 + 1。

(3) LinkedHashMap:LinkedHashMap是HashMap的一个子类,保存了记录的插入顺序,在用Iterator遍历LinkedHashMap时,先得到的记录肯定是先插入的,也可以在构造时带参数,按照访问次序排序。

(4) TreeMap:TreeMap实现SortedMap接口,能够把它保存的记录根据键排序,默认是按键值的升序排序,也可以指定排序的比较器,当用Iterator遍历TreeMap时,得到的记录是排过序的。如果使用排序的映射,建议使用TreeMap。在使用TreeMap时,key必须实现Comparable接口或者在构造TreeMap传入自定义的Comparator,否则会在运行时抛出java.lang.ClassCastException类型的异常

HashMap

哈希表相当于将数组的下标替换成key,数组的值为value,从而获得了类似数组的高速查找效率O(1)(因为内存被设计成访问任一位置时间都是一样的),增加和删除也是,无需遍历整个数组,一次哈希计算一步到胃。

因为存在哈希冲突的现象(不同元素通过哈希函数计算出相同的存储地址),如果要插入的位置已经有元素了,那么就与这个元素结合形成链表向下延伸。在jdk1.7中为头插法,即新的元素将旧元素替换掉,旧元素被挤到链表的下一节。在jdk1.8中改成尾插法。

有哪些解决哈希冲突的方法

1、开放定址法:继续寻找下一块未被占用的存储地址,只要空位比较多总能找到;耗空间

2、再哈希:提供多个哈希函数,这个不行就换一个。耗时

3、链地址法:将哈希地址相同的元素放在同一个链表中。该方法确保一定能到地址,但遍历链表损耗性能。适合哈希冲突多的情况。hashmap使用该方法。

4、公共溢出区:把所有哈希冲突的记录放溢出区里,先对基本表的响应位置查,不相等就去溢出区遍历。适合哈希冲突小的情况

hashmap的底层结构是什么?

在1.7hashmap的数据结构为数组+链表,到了1.8改成了数组+链表+红黑树。

数组里的元素叫node节点(1.7叫entry),每个node包含hash(用来定位数组索引位置)、key、value、next(链表的下一个node)。Node是HashMap的一个内部类,实现了Map.Entry接口,本质是就是一个映射(键值对)。图中的每个黑色圆点就是一个Node对象。

如果哈希桶数组(node[] table)很大,即使较差的Hash算法也会比较分散,如果哈希桶数组数组很小,即使好的Hash算法也会出现较多碰撞,所以就需要在空间成本和时间成本之间权衡,其实就是在根据实际情况确定哈希桶数组的大小,并在此基础上设计好的hash算法减少Hash碰撞。那么通过什么方式来控制map使得Hash碰撞的概率又小,哈希桶数组(Node[] table)占用空间又少呢?答案就是好的Hash算法和扩容机制

从HashMap的默认构造函数源码可知,构造函数就是对下面几个字段进行初始化,源码如下:

     int threshold;             // 所能容纳的key-value对极限 
     final float loadFactor;    // 负载因子
     int modCount;  
     int size;                  //HashMap中实际存在的键值对数量

首先,Node[] table的初始化长度length(默认值是16),Load factor为负载因子(默认值是0.75),threshold是HashMap所能容纳的最大数据量的Node(键值对)个数。threshold = length * Load factor。也就是说,在数组定义好长度之后,负载因子越大,所能容纳的键值对个数越多。

结合负载因子的定义公式可知,threshold就是在此Load factor和length(数组长度)对应下允许的最大元素数目,超过这个数目就重新resize(扩容),扩容后的HashMap容量是之前容量的两倍。默认的负载因子0.75是对空间和时间效率的一个平衡选择,建议大家不要修改,除非在时间和空间比较特殊的情况下,如果内存空间很多而又对时间效率要求很高,可以降低负载因子Load factor的值;相反,如果内存空间紧张而对时间效率要求不高,可以增加负载因子loadFactor的值,这个值可以大于1。

size这个字段其实很好理解,就是HashMap中实际存在的键值对数量。注意和table的长度length、容纳最大键值对数量threshold的区别。而modCount字段主要用来记录HashMap内部结构发生变化的次数,主要用于迭代的快速失败。强调一点,内部结构发生变化指的是结构发生变化,例如put新键值对,但是某个key对应的value值被覆盖不属于结构变化。

为什么要引入红黑树?

这里存在一个问题,即使负载因子和Hash算法设计的再合理,也免不了会出现拉链过长的情况,一旦出现拉链过长,则会严重影响HashMap的性能。于是,在JDK1.8版本中,对数据结构做了进一步的优化,引入了红黑树。而当链表长度太长(默认超过8)时,链表就转换为红黑树,利用红黑树快速增删改查的特点提高HashMap的性能,其中会用到红黑树的插入、删除、查找等算法。

hashmap是如何根据key计算出存储位置的?

前面说过HashMap的数据结构是数组和链表的结合,所以我们当然希望这个HashMap里面的元素位置尽量分布均匀些,尽量使得每个位置上的元素数量只有一个,那么当我们用hash算法求得这个位置的时候,马上就可以知道对应位置的元素就是我们要的,不用遍历链表,大大优化了查询的效率。HashMap定位数组索引位置,直接决定了hash方法的离散性能。先看看源码的实现(方法一+方法二):

方法一:
static final int hash(Object key) {   //jdk1.8 & jdk1.7
     int h;
     // h = key.hashCode() 为第一步 取hashCode值
     // h ^ (h >>> 16)  为第二步 高位参与运算
     return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
方法二:
static int indexFor(int h, int length) {  //jdk1.7的源码,jdk1.8没有这个方法,但是实现原理一样的
     return h & (length-1);  //第三步 取模运算
}

这里的Hash算法本质上就是三步:取key的hashCode值、高位运算、取模运算

对于任意给定的对象,只要它的hashCode()返回值相同,那么程序调用方法一所计算得到的Hash码值总是相同的。我们首先想到的就是把hash值对数组长度取模运算,这样一来,元素的分布相对来说是比较均匀的。但是,模运算的消耗还是比较大的,在HashMap中是这样做的:调用方法二来计算该对象应该保存在table数组的哪个索引处。

这个方法非常巧妙,它通过h & (table.length -1)来得到该对象的保存位,而HashMap底层数组的长度总是2的n次方,这是HashMap在速度上的优化。当length总是2的n次方时,h& (length-1)运算等价于对length取模,也就是h%length,但是&比%具有更高的效率。

在JDK1.8的实现中,优化了高位运算的算法,通过hashCode()的高16位异或低16位实现的:(h = k.hashCode()) ^ (h >>> 16),主要是从速度、功效、质量来考虑的,这么做可以在数组table的length比较小的时候,也能保证考虑到高低Bit都参与到Hash的计算中,同时不会有太大的开销。

下面举例说明下,n为table的长度。

HashMap的put方法

①.判断键值对数组table[i]是否为空或为null,否则执行resize()进行扩容;

②.根据键值key计算hash值得到插入的数组索引i,如果table[i]==null,直接新建节点添加,转向⑥,如果table[i]不为空,转向③;

③.判断table[i]的首个元素是否和key一样,如果相同直接覆盖value,否则转向④,这里的相同指的是hashCode以及equals;

④.判断table[i] 是否为treeNode,即table[i] 是否是红黑树,如果是红黑树,则直接在树中插入键值对,否则转向⑤;

⑤.遍历table[i],判断链表长度是否大于8,大于8的话把链表转换为红黑树,在红黑树中执行插入操作,否则进行链表的插入操作;遍历过程中若发现key已经存在直接覆盖value即可;

⑥.插入成功后,判断实际存在的键值对数量size是否超多了最大容量threshold,如果超过,进行扩容。

JDK1.8HashMap的put方法源码如下:

public V put(K key, V value) {
    // 对key的hashCode()做hash
    return putVal(hash(key), key, value, false, true);
}
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        // 步骤①:tab为空则创建
        if ((tab = table) == null || (n = tab.length) == 0)
            n = (tab = resize()).length;
        // 步骤②:计算index,并对null做处理
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);
        else {
            Node<K,V> e; K k;
            // 步骤③:节点key存在,直接覆盖value
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                e = p;
            // 步骤④:判断该链为红黑树
            else if (p instanceof TreeNode)
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            // 步骤⑤:该链为链表
            else {
                for (int binCount = 0; ; ++binCount) {
                    if ((e = p.next) == null) {
                        p.next = newNode(hash, key, value, null);
                         //链表长度大于8转换为红黑树进行处理
                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                            treeifyBin(tab, hash);
                        break;
                    }
                    // key已经存在直接覆盖value
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        break;
                    p = e;
                }
            }
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    e.value = value;
                afterNodeAccess(e);
                return oldValue;
            }
        }
        ++modCount;
    // 步骤⑥:超过最大容量 就扩容
        if (++size > threshold)
            resize();
        afterNodeInsertion(evict);
        return null;
    }

HashMap扩容机制

扩容(resize)就是重新计算容量,向HashMap对象里不停的添加元素,而HashMap对象内部的数组无法装载更多的元素时,对象就需要扩大数组的长度,以便能装入更多的元素。当然Java里的数组是无法自动扩容的,方法是使用一个新的数组代替已有的容量小的数组,就像我们用一个小桶装水,如果想装更多的水,就得换大水桶。

我们分析下resize的源码,鉴于JDK1.8融入了红黑树,较复杂,为了便于理解我们仍然使用JDK1.7的代码,好理解一些,本质上区别不大,具体区别后文再说。

什么时候扩容:当向容器添加元素的时候,会判断当前容器的元素个数,如果大于等于阈值(知道这个阈字怎么念吗?不念fa值,念yu值四声)---即当前数组的长度乘以加载因子的值的时候,就要自动扩容啦。

    void resize(int newCapacity) {   //传入新的容量
        Entry[] oldTable = table;    //引用扩容前的Entry数组
        int oldCapacity = oldTable.length;
        if (oldCapacity == MAXIMUM_CAPACITY) {  //扩容前的数组大小如果已经达到最大(2^30)了
            threshold = Integer.MAX_VALUE; //修改阈值为int的最大值(2^31-1),这样以后就不会扩容了
            return;
        }

        Entry[] newTable = new Entry[newCapacity];  //初始化一个新的Entry数组
        transfer(newTable);                         //!!将数据转移到新的Entry数组里
        table = newTable;                           //HashMap的table属性引用新的Entry数组
        threshold = (int) (newCapacity * loadFactor);//修改阈值
    }

这里就是使用一个容量更大的数组来代替已有的容量小的数组,transfer()方法将原有Entry数组的元素拷贝到新的Entry数组里。

    void transfer(Entry[] newTable) {
        Entry[] src = table;                   //src引用了旧的Entry数组
        int newCapacity = newTable.length;
        for (int j = 0; j < src.length; j++) { //遍历旧的Entry数组
            Entry<K, V> e = src[j];             //取得旧Entry数组的每个元素
            if (e != null) {
                src[j] = null;//释放旧Entry数组的对象引用(for循环后,旧的Entry数组不再引用任何对象)
                do {
                    Entry<K, V> next = e.next;
                    int i = indexFor(e.hash, newCapacity); //!!重新计算每个元素在数组中的位置
                    e.next = newTable[i]; //标记[1]
                    newTable[i] = e;      //将元素放在数组上
                    e = next;             //访问下一个Entry链上的元素
                } while (e != null);
            }
        }
    }

newTable[i]的引用赋给了e.next,也就是使用了单链表的头插入方式,同一位置上新元素总会被放在链表的头部位置;这样先放在一个索引上的元素终会被放到Entry链的尾部(如果发生了hash冲突的话),这一点和Jdk1.8有区别,下文详解。在旧数组中同一条Entry链上的元素,通过重新计算索引位置后,有可能被放到了新数组的不同位置上。

下面举个例子说明下扩容过程。假设了我们的hash算法就是简单的用key mod 一下表的大小(也就是数组的长度)。其中的哈希桶数组table的size=2, 所以key = 3、7、5,put顺序依次为 5、7、3。在mod 2以后都冲突在table[1]这里了。这里假设负载因子 loadFactor=1,即当键值对的实际大小size 大于 table的实际大小时进行扩容。接下来的三个步骤是哈希桶数组 resize成4,然后所有的Node重新rehash的过程。

下面我们讲解下JDK1.8做了哪些优化。经过观测可以发现,我们使用的是2次幂的扩展(指长度扩为原来2倍),所以,元素的位置要么是在原位置,要么是在原位置再移动2次幂的位置。看下图可以明白这句话的意思,n为table的长度,图(a)表示扩容前的key1和key2两种key确定索引位置的示例,图(b)表示扩容后key1和key2两种key确定索引位置的示例,其中hash1是key1对应的哈希与高位运算结果。

元素在重新计算hash之后,因为n变为2倍,那么n-1的mask范围在高位多1bit(红色),因此新的index就会发生这样的变化:

因此,我们在扩充HashMap的时候,不需要像JDK1.7的实现那样重新计算hash,只需要看看原来的hash值新增的那个bit是1还是0就好了,是0的话索引没变,是1的话索引变成“原索引+oldCap”,可以看看下图为16扩充为32的resize示意图:

这个设计确实非常的巧妙,既省去了计算hash值得到索引的时间,而且同时,由于新增的1bit是0还是1可以认为是随机的,因此resize的过程,均匀的把之前的冲突的节点分散到新的bucket了。这一块就是JDK1.8新增的优化点。有一点注意区别,JDK1.7中rehash的时候,旧链表迁移新链表的时候,如果在新表的数组索引位置相同,则链表元素会倒置,但是从上图可以看出,JDK1.8不会倒置。有兴趣的同学可以研究下JDK1.8的resize源码,写的很赞,如下:

final Node<K,V>[] resize() {
    Node<K,V>[] oldTab = table;
    int oldCap = (oldTab == null) ? 0 : oldTab.length;
    int oldThr = threshold;
    int newCap, newThr = 0;
    if (oldCap > 0) {
        // 超过最大值就不再扩充了,就只好随你碰撞去吧
        if (oldCap >= MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return oldTab;
        }
        // 没超过最大值,就扩充为原来的2倍
        else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                  oldCap >= DEFAULT_INITIAL_CAPACITY)
             newThr = oldThr << 1; // double threshold
     }
     else if (oldThr > 0) // initial capacity was placed in threshold
         newCap = oldThr;
     else {               // zero initial threshold signifies using defaults
         newCap = DEFAULT_INITIAL_CAPACITY;
         newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
     }
     // 计算新的resize上限
     if (newThr == 0) {

         float ft = (float)newCap * loadFactor;
         newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                   (int)ft : Integer.MAX_VALUE);
     }
     threshold = newThr;
     @SuppressWarnings({"rawtypes","unchecked"})
         Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
    table = newTab;
    if (oldTab != null) {
        // 把每个bucket都移动到新的buckets中
        for (int j = 0; j < oldCap; ++j) {
            Node<K,V> e;
            if ((e = oldTab[j]) != null) {
                oldTab[j] = null;
                if (e.next == null)
                    newTab[e.hash & (newCap - 1)] = e;
                else if (e instanceof TreeNode) //如果是红黑树节点
                    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                else { // 链表优化重hash的代码块
                    Node<K,V> loHead = null, loTail = null;
                    Node<K,V> hiHead = null, hiTail = null;
                    Node<K,V> next;
                    do {
                        next = e.next;
                        // 原索引
                        if ((e.hash & oldCap) == 0) {
                            if (loTail == null)
                                loHead = e;
                            else
                                loTail.next = e;
                            loTail = e;
                        }
                        // 原索引+oldCap
                        else {
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    } while ((e = next) != null);
                    // 原索引放到bucket里
                    if (loTail != null) {
                        loTail.next = null;
                        newTab[j] = loHead;
                    }
                    // 原索引+oldCap放到bucket里
                    if (hiTail != null) {
                        hiTail.next = null;
                        newTab[j + oldCap] = hiHead;
                    }
                }
            }
        }
    }
    return newTab;
}

HashMap为什么线程不安全

在多线程使用场景中,应该尽量避免使用线程不安全的HashMap,而使用线程安全的ConcurrentHashMap。那么为什么说HashMap是线程不安全的,下面举例子说明在并发的多线程使用场景中使用HashMap可能造成死循环。代码例子如下(便于理解,仍然使用JDK1.7的环境):

public class HashMapInfiniteLoop {  

    private static HashMap<Integer,String> map = new HashMap<Integer,String>(2,0.75f);  
    public static void main(String[] args) {  
        map.put(5, "C");  

        new Thread("Thread1") {  
            public void run() {  
                map.put(7, "B");  
                System.out.println(map);  
            };  
        }.start();  
        new Thread("Thread2") {  
            public void run() {  
                map.put(3, "A);  
                System.out.println(map);  
            };  
        }.start();        
    }  
}

其中,map初始化为一个长度为2的数组,loadFactor=0.75,threshold=2*0.75=1,也就是说当put第二个key的时候,map就需要进行resize。

通过设置断点让线程1和线程2同时debug到transfer方法(3.3小节代码块)的首行。注意此时两个线程已经成功添加数据。放开thread1的断点至transfer方法的“Entry next = e.next;” 这一行;然后放开线程2的的断点,让线程2进行resize。结果如下图。

注意,Thread1的 e 指向了key(3),而next指向了key(7),其在线程二rehash后,指向了线程二重组后的链表。

线程一被调度回来执行,先是执行 newTalbe[i] = e, 然后是e = next,导致了e指向了key(7),而下一次循环的next = e.next导致了next指向了key(3)。

e.next = newTable[i] 导致 key(3).next 指向了 key(7)。注意:此时的key(7).next 已经指向了key(3), 环形链表就这样出现了

于是,当我们用线程一调用map.get(11)时,悲剧就出现了——Infinite Loop。

在JDK1.8 的 HashMap 改为采用尾插法,已经不存在死循环的问题了

换个说法讲为什么线程不安全:

假如现在有两个线程都执行到了上图中的划线处。当线程一判断为空之后,CPU 时间片到了,被挂起。线程二也执行到此处判断为空,继续执行下一句,创建了一个新节点,插入到此下标位置。然后,线程一解挂,同样认为此下标的元素为空,因此也创建了一个新节点放在此下标处,因此造成了元素的覆盖。

线程安全的方案,由于并发度较好选择方案3(1和2锁住整张表,效率低下)

  • 使用Collections.synchronizedMap(Map)创建线程安全的map集合;
  • Hashtable
  • ConcurrentHashMap

所以,思考一下,既然锁住整张表的话,并发效率低下,那我把整张表分成 N 个部分,并使元素尽量均匀的分布到每个部分中,分别给他们加锁,互相之间并不影响,这种方式岂不是更好 。这就是在 JDK1.7 中 ConcurrentHashMap 采用的方案,被叫做锁分段技术,每个部分就是一个 Segment(段)

在JDK1.8中,完全重构了,采用的是 Synchronized + CAS ,把锁的粒度进一步降低,而放弃了 Segment 分段。(此时的 Synchronized 已经升级了,效率得到了很大提升,锁升级可以了解一下)

JDK1.8与JDK1.7的性能对比

HashMap中,如果key经过hash算法得出的数组索引位置全部不相同,即Hash算法非常好,那样的话,getKey方法的时间复杂度就是O(1),如果Hash算法技术的结果碰撞非常多,假如Hash算极其差,所有的Hash算法结果得出的索引位置一样,那样所有的键值对都集中到一个桶中,或者在一个链表中,或者在一个红黑树中,时间复杂度分别为O(n)和O(lgn)。 鉴于JDK1.8做了多方面的优化,总体性能优于JDK1.7,下面我们从两个方面用例子证明这一点。

Hash较均匀的情况

通过观测测试结果可知,JDK1.8的性能要高于JDK1.7 15%以上,在某些size的区域上,甚至高于100%。由于Hash算法较均匀,JDK1.8引入的红黑树效果不明显,下面我们看看Hash不均匀的的情况。

Hash极不均匀的情况

从表中结果中可知,随着size的变大,JDK1.7的花费时间是增长的趋势,而JDK1.8是明显的降低趋势,并且呈现对数增长稳定。当一个链表太长的时候,HashMap会动态的将它替换成一个红黑树,这话的话会将时间复杂度从O(n)降为O(logn)。hash算法均匀和不均匀所花费的时间明显也不相同,这两种情况的相对比较,可以说明一个好的hash算法的重要性。

HashMap小结

(1) 扩容是一个特别耗性能的操作,所以当程序员在使用HashMap的时候,估算map的大小,初始化的时候给一个大致的数值,避免map进行频繁的扩容。

(2) 负载因子是可以修改的,也可以大于1,但是建议不要轻易修改,除非情况非常特殊。

(3) HashMap是线程不安全的,不要在并发的环境中同时操作HashMap,建议使用ConcurrentHashMap。

(4) JDK1.8引入红黑树大程度优化了HashMap的性能。

(5) 还没升级JDK1.8的,现在开始升级吧。HashMap的性能提升仅仅是JDK1.8的冰山一角。

HashTable

跟HashMap相比Hashtable是线程安全的,适合在多线程的情况下使用,但是效率可不太乐观。

他在对数据操作的时候都会上synchronized锁,所以效率比较低下。

Hashtable 是不允许键或值为 null 的,HashMap 的键值则都可以为 null,Hashtable在我们put 空值的时候会直接抛空指针异常,但是HashMap却做了特殊处理:

static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

因为Hashtable使用的是安全失败机制(fail-safe),这种机制会使你此次读到的数据不一定是最新的数据。如果你使用null值,就会使得其无法判断对应的key是不存在还是为空,因为你无法再调用一次contain(key)来对key是否存在进行判断,ConcurrentHashMap同理。

ConcurrentHashMap

ConcurrentHashMap 是 HashMap 的线程安全版本,其内部和 HashMap 一样,也是采用了数组 + 链表 + 红黑树的方式来实现。

如何实现线程的安全性?加锁。但是这个锁应该怎么加呢?在 HashTable 中,是直接在 put 和 get 方法上加上了 synchronized,理论上来说 ConcurrentHashMap 也可以这么做,但是这么做锁的粒度太大,会非常影响并发性能,所以在 ConcurrentHashMap 中并没有采用这么直接简单粗暴的方法,其内部采用了非常精妙的设计,大大减少了锁的竞争,提升了并发性能。ConcurrentHashMap 中的初始化和 HashMap 中一样,而且容量也会调整为 2 的 N 次幂.

1.7底层结构

在 JDK1.7 版本中,ConcurrentHashMap 由数组 + Segment + 分段锁实现,其内部分为一个个段(Segment)数组,Segment 通过继承ReentrantLock来进行加锁,通过每次锁住一个segment来降低锁的粒度而且保证了每个segment内的操作的线程安全性,从而实现全局线程安全。在1.7中的数据结构:

Segment 是 ConcurrentHashMap 的一个内部类,主要的组成如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {

    private static final long serialVersionUID = 2249069246763182397L;

    // 和 HashMap 中的 HashEntry 作用一样,真正存放数据的桶
    transient volatile HashEntry<K,V>[] table;

    transient int count;
        // 记得快速失败(fail—fast)么?
    transient int modCount;
        // 大小
    transient int threshold;
        // 负载因子
    final float loadFactor;

}

HashEntry跟HashMap差不多的,但是不同点是,他使用volatile去修饰了他的数据Value还有下一个节点next。

以下是volatile特性:

  • 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。(实现可见性
  • 禁止进行指令重排序。(实现有序性
  • volatile 只能保证对单次读/写的原子性。i++ 这种操作不能保证原子性

1.7中这样的结构缺陷就是每次通过 hash 确认位置时需要 2 次才能定位到当前 key 应该落在哪个槽:

  1. 通过 hash 值和 段数组长度-1 进行位运算确认当前 key 属于哪个段,即确认其在 segments 数组的位置。
  2. 再次通过 hash 值和 table 数组(即 ConcurrentHashMap 底层存储数据的数组)长度 - 1进行位运算确认其所在桶。

为了进一步优化性能,在 jdk1.8 版本中,对 ConcurrentHashMap 做了优化,取消了分段锁的设计,取而代之的是通过 cas 操作和 synchronized 关键字来实现优化,而扩容的时候也利用了一种分而治之的思想来提升扩容效率,在 JDK1.8 中 ConcurrentHashMap 的存储结构和 HashMap 基本一致:

image-20210506154147726

1.7常用变量,内部类,方法

//默认初始化容量,这个和 HashMap中的容量是一个概念,表示的是整个 Map的容量
static final int DEFAULT_INITIAL_CAPACITY = 16;

//默认加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;

//默认的并发级别,这个参数决定了 Segment 数组的长度
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//最大的容量
static final int MAXIMUM_CAPACITY = 1 << 30;

//每个Segment中table数组的最小长度为2,且必须是2的n次幂。
//由于每个Segment是懒加载的,用的时候才会初始化,因此为了避免使用时立即调整大小,设定了最小容量2
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

//用于限制Segment数量的最大值,必须是2的n次幂
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

//在size方法和containsValue方法,会优先采用乐观的方式不加锁,直到重试次数达到2,才会对所有Segment加锁
//这个值的设定,是为了避免无限次的重试。后边size方法会详讲怎么实现乐观机制的。
static final int RETRIES_BEFORE_LOCK = 2;

//segment掩码值,用于根据元素的hash值定位所在的 Segment 下标。后边会细讲
final int segmentMask;

//和 segmentMask 配合使用来定位 Segment 的数组下标,后边讲。
final int segmentShift;

// Segment 组成的数组,每一个 Segment 都可以看做是一个特殊的 HashMap
final Segment<K,V>[] segments;

//Segment 对象,继承自 ReentrantLock 可重入锁。
//其内部的属性和方法和 HashMap 神似,只是多了一些拓展功能。
static final class Segment<K,V> extends ReentrantLock implements Serializable {

 //这是在 scanAndLockForPut 方法中用到的一个参数,用于计算最大重试次数
 //获取当前可用的处理器的数量,若大于1,则返回64,否则返回1。
 static final int MAX_SCAN_RETRIES =
  Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

 //用于表示每个Segment中的 table,是一个用HashEntry组成的数组。
 transient volatile HashEntry<K,V>[] table;

 //Segment中的元素个数,每个Segment单独计数(下边的几个参数同样的都是单独计数)
 transient int count;

 //每次 table 结构修改时,如put,remove等,此变量都会自增
 transient int modCount;

 //当前Segment扩容的阈值,同HashMap计算方法一样也是容量乘以加载因子
 //需要知道的是,每个Segment都是单独处理扩容的,互相之间不会产生影响
 transient int threshold;

 //加载因子
 final float loadFactor;

 //Segment构造函数
 Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
  this.loadFactor = lf;
  this.threshold = threshold;
  this.table = tab;
 }

 ...
 // put(),remove(),rehash() 方法都在此类定义
}

// HashEntry,存在于每个Segment中,它就类似于HashMap中的Node,用于存储键值对的具体数据和维护单向链表的关系
static final class HashEntry<K,V> {
 //每个key通过哈希运算后的结果,用的是 Wang/Jenkins hash 的变种算法,此处不细讲,感兴趣的可自行查阅相关资料
 final int hash;
 final K key;
 //value和next都用 volatile 修饰,用于保证内存可见性和禁止指令重排序
 volatile V value;
 //指向下一个节点
 volatile HashEntry<K,V> next;

 HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
  this.hash = hash;
  this.key = key;
  this.value = value;
  this.next = next;
 }
}

构造函数

并发度高的原因

ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。

不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。

每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。

就是说如果容量大小是16他的并发度就是16,可以同时允许16个线程操作16个Segment而且还是线程安全的。

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();//这就是为啥他不可以put null值的原因
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          
         (segments, (j << SSHIFT) + SBASE)) == null) 
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

他先定位到Segment,然后再进行put操作。

我们看看他的put源代码:

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
          // 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
 // 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value。
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                 // 不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容。
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
               //释放锁
                unlock();
            }
            return oldValue;
        }

首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁。

  1. 尝试自旋获取锁。
  2. 如果重试的次数达到了 MAX_SCAN_RETRIES 则改为阻塞锁获取,保证能获取成功。

1.7get操作

只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。

由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。

ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁

1.8底层结构

其中抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。

跟HashMap很像,也把之前的HashEntry改成了Node,但是作用不变,把值和next采用了volatile去修饰,保证了可见性,并且也引入了红黑树,在链表大于一定值的时候会转换(默认是8)。

1.8的put方法

从 put 方法开始,我们看下,它在插入新元素的时候,是怎么保证线程安全的吧。

ConcurrentHashMap在进行put操作的还是比较复杂的,大致可以分为以下步骤:

  1. 根据 key 计算出 hashcode 。
  2. 判断是否需要进行初始化。
  3. 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
  4. 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
  5. 如果都不满足,则利用 synchronized 锁写入数据。
  6. 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树
public V put(K key, V value) {
 return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
 //可以看到,在并发情况下,key 和 value 都是不支持为空的。
 if (key == null || value == null) throw new NullPointerException();
 //这里和1.8 HashMap 的hash 方法大同小异,只是多了一个操作,如下
 //( h ^ (h >>> 16)) & HASH_BITS;  HASH_BITS = 0x7fffffff;
 // 0x7fffffff ,二进制为 0111 1111 1111 1111 1111 1111 1111 1111 。
 //所以,hash值除了做了高低位异或运算,还多了一步,保证最高位的 1 个 bit 位总是0。
 //这里,我并没有明白它的意图,仅仅是保证计算出来的hash值不超过 Integer 最大值,且不为负数吗。
 //同 HashMap 的hash 方法对比一下,会发现连源码注释都是相同的,并没有多说明其它的。
 //我个人认为意义不大,因为最后 hash 是为了和 capacity -1 做与运算,而 capacity 最大值为 1<<30,
 //即 0100 0000 0000 0000 0000 0000 0000 0000 ,减1为 0011 1111 1111 1111 1111 1111 1111 1111。
 //即使 hash 最高位为 1(无所谓0),也不影响最后的结果,最高位也总会是0.
 int hash = spread(key.hashCode());
 //用来计算当前链表上的元素个数
 int binCount = 0;
 for (Node<K,V>[] tab = table;;) {
  Node<K,V> f; int n, i, fh;
  //如果表为空,则说明还未初始化。
  if (tab == null || (n = tab.length) == 0)
   //初始化表,只有一个线程可以初始化成功。
   tab = initTable();
  //若表已经初始化,则找到当前 key 所在的桶,并且判断是否为空
  else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
   //若当前桶为空,则通过 CAS 原子操作,把新节点插入到此位置,
   //这保证了只有一个线程可以 CAS 成功,其它线程都会失败。
   if (casTabAt(tab, i, null,
       new Node<K,V>(hash, key, value, null)))
    break;                   // no lock when adding to empty bin
  }
  //若所在桶不为空,则判断节点的 hash 值是否为 MOVED(值是-1)
  else if ((fh = f.hash) == MOVED)
   //若为-1,说明当前数组正在进行扩容,则需要当前线程帮忙迁移数据
   tab = helpTransfer(tab, f);
  else {
   V oldVal = null;
   //这里用加同步锁的方式,来保证线程安全,给桶中第一个节点对象加锁
   synchronized (f) {
    //recheck 一下,保证当前桶的第一个节点无变化,后边很多这样类似的操作,不再赘述
    if (tabAt(tab, i) == f) {
     //如果hash值大于等于0,说明是正常的链表结构
     if (fh >= 0) {
      binCount = 1;
      //从头结点开始遍历,每遍历一次,binCount计数加1
      for (Node<K,V> e = f;; ++binCount) {
       K ek;
       //如果找到了和当前 key 相同的节点,则用新值替换旧值
       if (e.hash == hash &&
        ((ek = e.key) == key ||
         (ek != null && key.equals(ek)))) {
        oldVal = e.val;
        if (!onlyIfAbsent)
         e.val = value;
        break;
       }
       Node<K,V> pred = e;
       //若遍历到了尾结点,则把新节点尾插进去
       if ((e = e.next) == null) {
        pred.next = new Node<K,V>(hash, key,
                value, null);
        break;
       }
      }
     }
     //否则判断是否是树节点。这里提一下,TreeBin只是头结点对TreeNode的再封装
     else if (f instanceof TreeBin) {
      Node<K,V> p;
      binCount = 2;
      if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                value)) != null) {
       oldVal = p.val;
       if (!onlyIfAbsent)
        p.val = value;
      }
     }
    }
   }
   //注意下,这个判断是在同步锁外部,因为 treeifyBin内部也有同步锁,并不影响
   if (binCount != 0) {
    //如果节点个数大于等于 8,则转化为红黑树
    if (binCount >= TREEIFY_THRESHOLD)
     treeifyBin(tab, i);
    //把旧节点值返回
    if (oldVal != null)
     return oldVal;
    break;
   }
  }
 }
 //给元素个数加 1,并有可能会触发扩容,比较复杂,稍后细讲
 addCount(1L, binCount);
 return null;
}

initTable()方法

先看下当数组为空时,是怎么初始化表的。

private final Node<K,V>[] initTable() {
 Node<K,V>[] tab; int sc;
 //循环判断表是否为空,直到初始化成功为止。
 while ((tab = table) == null || tab.length == 0) {
  //sizeCtl 这个值有很多情况,默认值为0,
  //当为 -1 时,说明有其它线程正在对表进行初始化操作
  //当表初始化成功后,又会把它设置为扩容阈值
  //当为一个小于 -1 的负数,用来表示当前有几个线程正在帮助扩容(后边细讲)
  if ((sc = sizeCtl) < 0)
   //若 sc 小于0,其实在这里就是-1,因为此时表是空的,不会发生扩容,sc只能为正数或者-1
   //因此,当前线程放弃 CPU 时间片,只是自旋。
   Thread.yield(); // lost initialization race; just spin
  //通过 CAS 把 sc 的值设置为-1,表明当前线程正在进行表的初始化,其它失败的线程就会自旋
  else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
   try {
    //重新检查一下表是否为空
    if ((tab = table) == null || tab.length == 0) {
     //如果sc大于0,则为sc,否则返回默认容量 16。
     //当调用有参构造创建 Map 时,sc的值是大于0的。
     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
     @SuppressWarnings("unchecked")
     //创建数组
     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
     table = tab = nt;
     //n减去 1/4 n ,即为 0.75n ,表示扩容阈值
     sc = n - (n >>> 2);
    }
   } finally {
    //更新 sizeCtl 为扩容阈值
    sizeCtl = sc;
   }
   //若当前线程初始化表成功,则跳出循环。其它自旋的线程因为判断数组不为空,也会停止自旋
   break;
  }
 }
 return tab;
}

addCount()方法

put 方法元素插入成功之后,则会调用此方法,传入参数为 addCount(1L, binCount)。这个方法的目的很简单,就是把整个 table 的元素个数加 1 。但是,实现比较难。

我们先思考一下,如果让我们自己去实现这样的统计元素个数,怎么实现?

类比 1.8 的 HashMap ,我们可以搞一个 size 变量来存储个数统计。但是,这是在多线程环境下,需要考虑并发的问题。因此,可以把 size 设置为 volatile 的,保证可见性,然后通过 CAS 乐观锁来自增 1。

这样虽然也可以实现。但是,设想一下现在有非常多的线程,都在同一时间操作这个 size 变量,将会造成特别严重的竞争。所以,基于此,这里做了更好的优化。让这些竞争的线程,分散到不同的对象里边,单独操作它自己的数据(计数变量),用这样的方式尽量降低竞争。到最后需要统计 size 的时候,再把所有对象里边的计数相加就可以了。

上边提到的 size ,在此用 baseCount 表示。分散到的对象用 CounterCell 表示,对象里边的计数变量用 value 表示。注意这里的变量都是 volatile 修饰的。

当需要修改元素数量时,线程会先去 CAS 修改 baseCount 加1,若成功即返回。若失败,则线程被分配到某个 CounterCell ,然后操作 value 加1。若成功,则返回。否则,给当前线程重新分配一个 CounterCell,再尝试给 value 加1。(这里简略的说,实际更复杂)

CounterCell 会组成一个数组,也会涉及到扩容问题。所以,先画一个示意图帮助理解一下。

//线程被分配到的格子
@sun.misc.Contended static final class CounterCell {
 //此格子内记录的 value 值
    volatile long value;
    CounterCell(long x) { value = x; }
}

//用来存储线程和线程生成的随机数的对应关系
static final int getProbe() {
 return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

// x为1,check代表链表上的元素个数
private final void addCount(long x, int check) {
 CounterCell[] as; long b, s;
 //此处要进入if有两种情况
 //1.数组不为空,说明数组已经被创建好了。
 //2.若数组为空,说明数组还未创建,很有可能竞争的线程非常少,因此就直接 CAS 操作 baseCount
 //若 CAS 成功,则方法跳转到 (2)处,若失败,则需要考虑给当前线程分配一个格子(指CounterCell对象)
 if ((as = counterCells) != null ||
  !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
  CounterCell a; long v; int m;
  //字面意思,是无竞争,这里先标记为 true,表示还没有产生线程竞争
  boolean uncontended = true;
  //这里有三种情况,会进入 fullAddCount 方法
  //1.若数组为空,进方法 (1)
  //2.ThreadLocalRandom.getProbe() 方法会给当前线程生成一个随机数(可以简单的认为也是一个hash值)
  //然后用随机数与数组长度取模,计算它所在的格子。若当前线程所分配到的格子为空,进方法 (1)。
  //3.若数组不为空,且线程所在格子不为空,则尝试 CAS 修改此格子对应的 value 值加1。
  //若修改成功,则跳转到 (3),若失败,则把 uncontended 值设为 fasle,说明产生了竞争,然后进方法 (1)
  if (as == null || (m = as.length - 1) < 0 ||
   (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
   !(uncontended =
     U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
   //方法(1), 这个方法的目的是让当前线程一定把 1 加成功。情况更多,更复杂,稍后讲。
   fullAddCount(x, uncontended);
   return;
  }
  //(3)能走到这,说明数组不为空,且修改 baseCount失败,
  //且线程被分配到的格子不为空,且修改 value 成功。
  //但是这里没明白为什么小于等于1,就直接返回了,这里我怀疑之前的方法漏掉了binCount=0的情况。
  //而且此处若返回了,后边怎么判断扩容?(存疑)
  if (check <= 1)
   return;
  //计算总共的元素个数
  s = sumCount();
 }
 //(2)这里用于检查是否需要扩容(下边这部分很多逻辑不懂的话,等后边讲完扩容,再回来看就理解了)
 if (check >= 0) {
  Node<K,V>[] tab, nt; int n, sc;
  //若元素个数达到扩容阈值,且tab不为空,且tab数组长度小于最大容量
  while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
      (n = tab.length) < MAXIMUM_CAPACITY) {
   //这里假设数组长度n就为16,这个方法返回的是一个固定值,用于当做一个扩容的校验标识
   //可以跳转到最后,看详细计算过程,0000 0000 0000 0000 1000 0000 0001 1011
   int rs = resizeStamp(n);
   //若sc小于0,说明正在扩容
   if (sc < 0) {
       //sc的结构类似这样,1000 0000 0001 1011 0000 0000 0000 0001
    //sc的高16位是数据校验标识,低16位代表当前有几个线程正在帮助扩容,RESIZE_STAMP_SHIFT=16
    //因此判断校验标识是否相等,不相等则退出循环
    //sc == rs + 1,sc == rs + MAX_RESIZERS 这两个应该是用来判断扩容是否已经完成,但是计算方法存疑
    //感兴趣的可以看这个地址,应该是一个 bug ,
    // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427
    //nextTable=null 说明需要扩容的新数组还未创建完成
    //transferIndex这个参数小于等于0,说明已经不需要其它线程帮助扩容了,
    //但是并不说明已经扩容完成,因为有可能还有线程正在迁移元素。稍后扩容细讲就明白了。
    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
     sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
     transferIndex <= 0)
     break;
    //到这里说明当前线程可以帮助扩容,因此sc值加一,代表扩容的线程数加1
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
     transfer(tab, nt);
   }
   //当sc大于0,说明sc代表扩容阈值,因此第一次扩容之前肯定走这个分支,用于初始化新表 nextTable
   //rs<<16
   //1000 0000 0001 1011 0000 0000 0000 0000
   //+2
   //1000 0000 0001 1011 0000 0000 0000 0010
   //这个值,转为十进制就是 -2145714174,用于标识,这是扩容时,初始化新表的状态,
   //扩容时,需要用到这个参数校验是否所有线程都全部帮助扩容完成。
   else if (U.compareAndSwapInt(this, SIZECTL, sc,
           (rs << RESIZE_STAMP_SHIFT) + 2))
    //扩容,第二个参数代表新表,传入null,则说明是第一次初始化新表(nextTable)
    transfer(tab, null);
   s = sumCount();
  }
 }
}

//计算表中的元素总个数
final long sumCount() {
 CounterCell[] as = counterCells; CounterCell a;
 //baseCount,以这个值作为累加基准
 long sum = baseCount;
 if (as != null) {
  //遍历 counterCells 数组,得到每个对象中的value值
  for (int i = 0; i < as.length; ++i) {
   if ((a = as[i]) != null)
    //累加 value 值
    sum += a.value;
  }
 }
 //此时得到的就是元素总个数
 return sum;
} 

//扩容时的校验标识
static final int resizeStamp(int n) {
 return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

//Integer.numberOfLeadingZeros方法的作用是返回 n 的最高位为1的前面的0的个数
//n=16,
0000 0000 0000 0000 0000 0000 0001 0000
//前面有27个0,即27
0000 0000 0000 0000 0000 0000 0001 1011
//RESIZE_STAMP_BITS为16,然后 1<<(16-1),即 1<<15
0000 0000 0000 0000 1000 0000 0000 0000
//它们做或运算,得到 rs 的值
0000 0000 0000 0000 1000 0000 0001 1011

fullAddCount()方法

上边的 addCount 方法还没完,别忘了有可能元素个数加 1 的操作还未成功,就走到 fullAddCount 这个方法了。看方法名,就知道了,全力增加计数值,一定要成功(奥利给)。这个方法和扩容迁移方法是最难的,保持耐心~

//传过来的参数分别为 1 , false
private final void fullAddCount(long x, boolean wasUncontended) {
 int h;
 //如果当前线程的随机数为0,则强制初始化一个值
 if ((h = ThreadLocalRandom.getProbe()) == 0) {
  ThreadLocalRandom.localInit();      // force initialization
  h = ThreadLocalRandom.getProbe();
  //此时把 wasUncontended 设为true,认为无竞争
  wasUncontended = true;
 }
 //用来表示比 contend(竞争)更严重的碰撞,若为true,表示可能需要扩容,以减少碰撞冲突
 boolean collide = false;                // True if last slot nonempty
 //循环内,外层if判断分三种情况,内层判断又分为六种情况
 for (;;) {
  CounterCell[] as; CounterCell a; int n; long v;
  //1. 若counterCells数组不为空。  建议先看下边的2和3两种情况,再回头看这个。 
  if ((as = counterCells) != null && (n = as.length) > 0) {
   // (1) 若当前线程所在的格子(CounterCell对象)为空
   if ((a = as[(n - 1) & h]) == null) {
    if (cellsBusy == 0) {    
     //若无锁,则乐观的创建一个 CounterCell 对象。
     CounterCell r = new CounterCell(x); 
     //尝试加锁
     if (cellsBusy == 0 &&
      U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
      boolean created = false;
      //加锁成功后,再 recheck 一下数组是否不为空,且当前格子为空
      try {               
       CounterCell[] rs; int m, j;
       if ((rs = counterCells) != null &&
        (m = rs.length) > 0 &&
        rs[j = (m - 1) & h] == null) {
        //把新创建的对象赋值给当前格子
        rs[j] = r;
        created = true;
       }
      } finally {
       //手动释放锁
       cellsBusy = 0;
      }
      //若当前格子创建成功,且上边的赋值成功,则说明加1成功,退出循环
      if (created)
       break;
      //否则,继续下次循环
      continue;           // Slot is now non-empty
     }
    }
    //若cellsBusy=1,说明有其它线程抢锁成功。或者若抢锁的 CAS 操作失败,都会走到这里,
    //则当前线程需跳转到(9)重新生成随机数,进行下次循环判断。
    collide = false;
   }
   /**
   *后边这几种情况,都是数组和当前随机到的格子都不为空的情况。
   *且注意每种情况,若执行成功,且不break,continue,则都会执行(9),重新生成随机数,进入下次循环判断
   */
   // (2) 到这,说明当前方法在被调用之前已经 CAS 失败过一次,若不明白可回头看下 addCount 方法,
   //为了减少竞争,则跳转到⑨处重新生成随机数,并把 wasUncontended 设置为true ,认为下一次不会产生竞争
   else if (!wasUncontended)       // CAS already known to fail
    wasUncontended = true;      // Continue after rehash
   // (3) 若 wasUncontended 为 true 无竞争,则尝试一次 CAS。若成功,则结束循环,若失败则判断后边的 (4)(5)(6)。
   else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
    break;
   // (4) 结合 (6) 一起看,(4)(5)(6)都是 wasUncontended=true,且CAS修改value失败的情况。
   //若数组有变化,或者数组长度大于等于当前CPU的核心数,则把 collide 改为 false
   //因为数组若有变化,说明是由扩容引起的;长度超限,则说明已经无法扩容,只能认为无碰撞。
   //这里很有意思,认真思考一下,当扩容超限后,则会达到一个平衡,即 (4)(5) 反复执行,直到 (3) 中CAS成功,跳出循环。
   else if (counterCells != as || n >= NCPU)
    collide = false;            // At max size or stale
   // (5) 若数组无变化,且数组长度小于CPU核心数时,且 collide 为 false,就把它改为 true,说明下次循环可能需要扩容
   else if (!collide)
    collide = true;
   // (6) 若数组无变化,且数组长度小于CPU核心数时,且 collide 为 true,说明冲突比较严重,需要扩容了。
   else if (cellsBusy == 0 &&
      U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
    try {
     //recheck
     if (counterCells == as) {// Expand table unless stale
      //创建一个容量为原来两倍的数组
      CounterCell[] rs = new CounterCell[n << 1];
      //转移旧数组的值
      for (int i = 0; i < n; ++i)
       rs[i] = as[i];
      //更新数组
      counterCells = rs;
     }
    } finally {
     cellsBusy = 0;
    }
    //认为扩容后,下次不会产生冲突了,和(4)处逻辑照应
    collide = false;
    //当次扩容后,就不需要重新生成随机数了
    continue;                   // Retry with expanded table
   }
   // (9),重新生成一个随机数,进行下一次循环判断
   h = ThreadLocalRandom.advanceProbe(h);
  }
  //2.这里的 cellsBusy 参数非常有意思,是一个volatile的 int值,用来表示自旋锁的标志,
  //可以类比 AQS 中的 state 参数,用来控制锁之间的竞争,并且是独占模式。简化版的AQS。
  //cellsBusy 若为0,说明无锁,线程都可以抢锁,若为1,表示已经有线程拿到了锁,则其它线程不能抢锁。
  else if (cellsBusy == 0 && counterCells == as &&
     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
   boolean init = false;
   try {    
    //这里再重新检测下 counterCells 数组引用是否有变化
    if (counterCells == as) {
     //初始化一个长度为 2 的数组
     CounterCell[] rs = new CounterCell[2];
     //根据当前线程的随机数值,计算下标,只有两个结果 0 或 1,并初始化对象
     rs[h & 1] = new CounterCell(x);
     //更新数组引用
     counterCells = rs;
     //初始化成功的标志
     init = true;
    }
   } finally {
    //别忘了,需要手动解锁。
    cellsBusy = 0;
   }
   //若初始化成功,则说明当前加1的操作也已经完成了,则退出整个循环。
   if (init)
    break;
  }
  //3.到这,说明数组为空,且 2 抢锁失败,则尝试直接去修改 baseCount 的值,
  //若成功,也说明加1操作成功,则退出循环。
  else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
   break;                          // Fall back on using base
 }
}

transfer()方法

需要说明的一点是,虽然我们一直在说帮助扩容,其实更准确的说应该是帮助迁移元素。因为扩容的第一次初始化新表(扩容后的新表)这个动作,只能由一个线程完成。其他线程都是在帮助迁移元素到新数组。

这里还是先看下迁移的示意图,帮助理解。

为了方便,上边以原数组长度 8 为例。在元素迁移的时候,所有线程都遵循从后向前推进的规则,即如图A线程是第一个进来的线程,会从下标为7的位置,开始迁移数据。

而且当前线程迁移时会确定一个范围,限定它此次迁移的数据范围,如图 A 线程只能迁移 bound=6到 i=7 这两个数据。

此时,其它线程就不能迁移这部分数据了,只能继续向前推进,寻找其它可以迁移的数据范围,且每次推进的步长为固定值 stride(此处假设为2)。如图中 B线程发现 A 线程正在迁移6,7的数据,因此只能向前寻找,然后迁移 bound=4 到 i=5 的这两个数据。

当每个线程迁移完成它的范围内数据时,都会继续向前推进。那什么时候是个头呢?

这就需要维护一个全局的变量 transferIndex,来表示所有线程总共推进到的元素下标位置。如图,线程 A 第一次迁移成功后又向前推进,然后迁移2,3 的数据。此时,若没有其他线程在帮助迁移,则 transferIndex 即为2。

剩余部分等待下一个线程来迁移,或者有任何的 A 和B线程已经迁移完成,也可以推进到这里帮助迁移。直到 transferIndex=0 。(会做一些其他校验来判断是否迁移全部完成,看代码)。

//这个类是一个标志,用来代表当前桶(数组中的某个下标位置)的元素已经全部迁移完成
static final class ForwardingNode<K,V> extends Node<K,V> {
 final Node<K,V>[] nextTable;
 ForwardingNode(Node<K,V>[] tab) {
  //把当前桶的头结点的 hash 值设置为 -1,表明已经迁移完成,
  //这个节点中并不存储有效的数据
  super(MOVED, null, null, null);
  this.nextTable = tab;
 }
}

//迁移数据
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
 int n = tab.length, stride;
 //根据当前CPU核心数,确定每次推进的步长,最小值为16.(为了方便我们以2为例)
 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  stride = MIN_TRANSFER_STRIDE; // subdivide range
 //从 addCount 方法,只会有一个线程跳转到这里,初始化新数组
 if (nextTab == null) {            // initiating
  try {
   @SuppressWarnings("unchecked")
   //新数组长度为原数组的两倍
   Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
   nextTab = nt;
  } catch (Throwable ex) {      // try to cope with OOME
   sizeCtl = Integer.MAX_VALUE;
   return;
  }
  //用 nextTable 指代新数组
  nextTable = nextTab;
  //这里就把推进的下标值初始化为原数组长度(以16为例)
  transferIndex = n;
 }
 //新数组长度
 int nextn = nextTab.length;
 //创建一个标志类
 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 //是否向前推进的标志
 boolean advance = true;
 //是否所有线程都全部迁移完成的标志
 boolean finishing = false; // to ensure sweep before committing nextTab
 //i 代表当前线程正在迁移的桶的下标,bound代表它本次可以迁移的范围下限
 for (int i = 0, bound = 0;;) {
  Node<K,V> f; int fh;
  //需要向前推进
  while (advance) {
   int nextIndex, nextBound;
   //(1) 先看 (3) 。i每次自减 1,直到 bound。若超过bound范围,或者finishing标志为true,则不用向前推进。
   //若未全部完成迁移,且 i 并未走到 bound,则跳转到 (7),处理当前桶的元素迁移。
   if (--i >= bound || finishing)
    advance = false;
   //(2) 每次执行,都会把 transferIndex 最新的值同步给 nextIndex
   //若 transferIndex小于等于0,则说明原数组中的每个桶位置,都有线程在处理迁移了,
   //于是,需要跳出while循环,并把 i设为 -1,以跳转到④判断在处理的线程是否已经全部完成。
   else if ((nextIndex = transferIndex) <= 0) {
    i = -1;
    advance = false;
   }
   //(3) 第一个线程会先走到这里,确定它的数据迁移范围。(2)处会更新 nextIndex为 transferIndex 的最新值
   //因此第一次 nextIndex=n=16,nextBound代表当次迁移的数据范围下限,减去步长即可,
   //所以,第一次时,nextIndex=16,nextBound=16-2=14。后续,每次都会间隔一个步长。
   else if (U.compareAndSwapInt
      (this, TRANSFERINDEX, nextIndex,
       nextBound = (nextIndex > stride ?
           nextIndex - stride : 0))) {
    //bound代表当次数据迁移下限
    bound = nextBound;
    //第一次的i为15,因为长度16的数组,最后一个元素的下标为15
    i = nextIndex - 1;
    //表明不需要向前推进,只有当把当前范围内的数据全部迁移完成后,才可以向前推进
    advance = false;
   }
  }
  //(4)
  if (i < 0 || i >= n || i + n >= nextn) {
   int sc;
   //若全部线程迁移完成
   if (finishing) {
    nextTable = null;
    //更新table为新表
    table = nextTab;
    //扩容阈值改为原来数组长度的 3/2 ,即新长度的 3/4,也就是新数组长度的0.75倍
    sizeCtl = (n << 1) - (n >>> 1);
    return;
   }
   //到这,说明当前线程已经完成了自己的所有迁移(无论参与了几次迁移),
   //则把 sc 减1,表明参与扩容的线程数减少 1。
   if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
    //在 addCount 方法最后,我们强调,迁移开始时,会设置 sc=(rs << RESIZE_STAMP_SHIFT) + 2
    //每当有一个线程参与迁移,sc 就会加 1,每当有一个线程完成迁移,sc 就会减 1。
    //因此,这里就是去校验当前 sc 是否和初始值是否相等。相等,则说明全部线程迁移完成。
    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
     return;
    //只有此处,才会把finishing 设置为true。
    finishing = advance = true;
    //这里非常有意思,会把 i 从 -1 修改为16,
    //目的就是,让 i 再从后向前扫描一遍数组,检查是否所有的桶都已被迁移完成,参看 (6)
    i = n; // recheck before commit
   }
  }
  //(5) 若i的位置元素为空,则说明当前桶的元素已经被迁移完成,就把头结点设置为fwd标志。
  else if ((f = tabAt(tab, i)) == null)
   advance = casTabAt(tab, i, null, fwd);
  //(6) 若当前桶的头结点是 ForwardingNode ,说明迁移完成,则向前推进 
  else if ((fh = f.hash) == MOVED)
   advance = true; // already processed
  //(7) 处理当前桶的数据迁移。
  else {
   synchronized (f) {  //给头结点加锁
    if (tabAt(tab, i) == f) {
     Node<K,V> ln, hn;
     //若hash值大于等于0,则说明是普通链表节点
     if (fh >= 0) {
      int runBit = fh & n;
      //这里是 1.7 的 CHM 的 rehash 方法和 1.8 HashMap的 resize 方法的结合体。
      //会分成两条链表,一条链表和原来的下标相同,另一条链表是原来的下标加数组长度的位置
      //然后找到 lastRun 节点,从它到尾结点整体迁移。
      //lastRun前边的节点则单个迁移,但是需要注意的是,这里是头插法。
      //另外还有一点和1.7不同,1.7 lastRun前边的节点是复制过去的,而这里是直接迁移的,没有复制操作。
      //所以,最后会有两条链表,一条链表从 lastRun到尾结点是正序的,而lastRun之前的元素是倒序的,
      //另外一条链表,从头结点开始就是倒叙的。看下图。
      Node<K,V> lastRun = f;
      for (Node<K,V> p = f.next; p != null; p = p.next) {
       int b = p.hash & n;
       if (b != runBit) {
        runBit = b;
        lastRun = p;
       }
      }
      if (runBit == 0) {
       ln = lastRun;
       hn = null;
      }
      else {
       hn = lastRun;
       ln = null;
      }
      for (Node<K,V> p = f; p != lastRun; p = p.next) {
       int ph = p.hash; K pk = p.key; V pv = p.val;
       if ((ph & n) == 0)
        ln = new Node<K,V>(ph, pk, pv, ln);
       else
        hn = new Node<K,V>(ph, pk, pv, hn);
      }
      setTabAt(nextTab, i, ln);
      setTabAt(nextTab, i + n, hn);
      setTabAt(tab, i, fwd);
      advance = true;
     }
     //树节点
     else if (f instanceof TreeBin) {
      TreeBin<K,V> t = (TreeBin<K,V>)f;
      TreeNode<K,V> lo = null, loTail = null;
      TreeNode<K,V> hi = null, hiTail = null;
      int lc = 0, hc = 0;
      for (Node<K,V> e = t.first; e != null; e = e.next) {
       int h = e.hash;
       TreeNode<K,V> p = new TreeNode<K,V>
        (h, e.key, e.val, null, null);
       if ((h & n) == 0) {
        if ((p.prev = loTail) == null)
         lo = p;
        else
         loTail.next = p;
        loTail = p;
        ++lc;
       }
       else {
        if ((p.prev = hiTail) == null)
         hi = p;
        else
         hiTail.next = p;
        hiTail = p;
        ++hc;
       }
      }
      ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
       (hc != 0) ? new TreeBin<K,V>(lo) : t;
      hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
       (lc != 0) ? new TreeBin<K,V>(hi) : t;
      setTabAt(nextTab, i, ln);
      setTabAt(nextTab, i + n, hn);
      setTabAt(tab, i, fwd);
      advance = true;
     }
    }
   }
  }
 }
}

迁移后的新数组链表方向示意图,以 runBit =0 为例。

helpTransfer()方法

最后再看 put 方法中的这个方法,就比较简单了。

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
 Node<K,V>[] nextTab; int sc;
 //头结点为 ForwardingNode ,并且新数组已经初始化
 if (tab != null && (f instanceof ForwardingNode) &&
  (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  int rs = resizeStamp(tab.length);
  while (nextTab == nextTable && table == tab &&
      (sc = sizeCtl) < 0) {
   //若校验标识失败,或者已经扩容完成,或推进下标到头,则退出
   if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    sc == rs + MAX_RESIZERS || transferIndex <= 0)
    break;
   //当前线程需要帮助迁移,sc值加1
   if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
    transfer(tab, nextTab);
    break;
   }
  }
  return nextTab;
 }
 return table;
}

相对于 1.7 来说,锁的粒度降低了,效率也提高了。

CAS是什么

CAS 是乐观锁的一种实现方式,是一种轻量级锁,JUC 中很多工具类的实现就是基于 CAS 的。

CAS 操作的流程如下图所示,线程在读取数据时不进行加锁,在准备写回数据时,比较原值是否修改,若未被其他线程修改则写回,若已被修改,则重新执行读取流程。

这是一种乐观策略,认为并发操作并不总会发生。

乐观锁在实际开发场景中非常常见,就比如我现在要修改数据库的一条数据,修改之前我先拿到他原来的值,然后在SQL里面还会加个判断,原来的值和我手上拿到的他的原来的值是否一样,一样我们就可以去修改了,不一样就证明被别的线程修改了你就return错误就好了。

update a set value = newValue where value = #{oldValue}//oldValue就是我们执行前查询出来的值

CAS就一定能保证数据没被别的线程修改过么?

并不是的,比如很经典的ABA问题,CAS就无法判断了。就是说来了一个线程把值改回了B,又来了一个线程把值又改回了A,对于这个时候判断的线程,就发现他的值还是A,所以他就不知道这个值到底有没有被人改过,其实很多场景如果只追求最后结果正确,这是没关系的。但是实际过程中还是需要记录修改过程的,比如资金修改什么的,你每次修改的都应该有记录,方便回溯。

那怎么解决ABA问题?用版本号去保证就好了,就比如说,我在修改前去查询他原来的值的时候再带一个版本号,每次判断就连值和版本号一起判断,判断成功就给版本号加1。

update a set value = newValue ,vision = vision + 1 where value = #{oldValue} and vision = #{vision} // 判断原来的值和版本号是否匹配,中间有别的线程修改,值可能相等,但是版本号100%不一样

时间戳也可以,查询的时候把时间戳一起查出来,对的上才修改并且更新值的时候一起修改更新时间,这样也能保证,方法很多但是跟版本号都是异曲同工之妙,看场景大家想怎么设计吧。

CAS性能很高,但是synchronized性能可不咋地,为啥jdk1.8升级之后反而多了synchronized?

synchronized之前一直都是重量级的锁,但是后来java官方是对他进行过升级的,他现在采用的是锁升级的方式去做的。

针对 synchronized 获取锁的方式,JVM 使用了锁升级的优化方式,就是先使用偏向锁优先同一线程然后再次获取锁,如果失败,就升级为 CAS 轻量级锁,如果失败就会短暂自旋,防止线程被系统挂起。最后如果以上都失败就升级为重量级锁

所以是一步步升级上去的,最初也是通过很多轻量级的方式锁定的。

1.8的get操作

  • 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
  • 如果是红黑树那就按照树的方式获取值。
  • 就不满足那就按照链表的方式遍历获取值。

1.8 在 1.7 的数据结构上做了大的改动,采用红黑树之后可以保证查询效率(O(logn)),甚至取消了 ReentrantLock 改为了 synchronized,这样可以看出在新版的 JDK 中对 synchronized 优化是很到位的。

时间复杂度:

  • 在理想状态下,及未发生任何hash碰撞,数组中的每一个链表都只有一个节点,那么get方法可以通过hash直接定位到目标元素在数组中的位置,时间复杂度为O(1)。
  • 若发生hash碰撞,则可能需要进行遍历寻找,n个元素的情况下,链表时间复杂度为O(n)、红黑树为O(logn)

Hashtable&ConcurrentHashMap跟HashMap基本上就是一套连环组合,我在面试的时候经常能吹上很久,经常被面试官说:好了好了,我们继续下一个话题吧哈哈。

是的因为提到HashMap你肯定会聊到他的线程安全性这一点,那你总不能加锁一句话就搞定了吧,java的作者们也不想,所以人家写开发了对应的替代品,那就是线程安全的Hashtable&ConcurrentHashMap。

两者都有特点,但是线程安全场景还是后者用得多一点,原因我在文中已经大篇幅全方位的介绍了,这里就不再过多赘述了。

你们发现了面试就是一个个的坑,你说到啥面试官可能就怼到你啥,别问我为啥知道嘿嘿。

你知道不确定能不能为这场面试加分,但是不知道肯定是减分的,文中的快速失败(fail—fast)问到,那对应的安全失败(fail—safe)也是有可能知道的,我想读者很多都不知道吧,因为我问过很多仔哈哈。

还有提到CAS乐观锁,你要知道ABA,你要知道解决方案,因为在实际的开发场景真的不要太常用了,sync的锁升级你也要知道。

为什么 key 和 value 不允许为 null

HashMap 中,keyvalue 都是可以为 null 的,但是在 ConcurrentHashMap 中却不允许,这是为什么呢?

作者 Doug Lea 本身对这个问题有过回答,在并发编程中,null 值容易引来歧义, 假如先调用 get(key) 返回的结果是 null,那么我们无法确认是因为当时这个 key 对应的 value 本身放的就是 null,还是说这个 key 值根本不存在,这会引起歧义,如果在非并发编程中,可以进一步通过调用 containsKey 方法来进行判断,但是并发编程中无法保证两个方法之间没有其他线程来修改 key 值,所以就直接禁止了 null 值的存在。

而且作者 Doug Lea 本身也认为,假如允许在集合,如 mapset 等存在 null 值的话,即使在非并发集合中也有一种公开允许程序中存在错误的意思,这也是 Doug LeaJosh BlochHashMap作者之一) 在设计问题上少数不同意见之一,而 ConcurrentHashMapDoug Lea 一个人开发的,所以就直接禁止了 null 值的存在。

发表评论

匿名网友