并发编程-并发容器
CopyOnWriteArrayList(jdk8)
CopyOnWriteArrayList是一种线程安全的ArrayList,底层是基于数组实现,该数组使用了volatile关键字修饰。
使用ReentrantLock加锁,用来保证线程安全。使用数组存储元素,数组使用volatile修饰,用来保证内存可见性。
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
// 加锁,用来保证线程安全
final transient ReentrantLock lock = new ReentrantLock();
// 存储元素的数组,使用了volatile修饰
private transient volatile Object[] array;
// 数组的get/set方法
final Object[] getArray() {
return array;
}
final void setArray(Object[] a) {
array = a;
}
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}
}
构造方法-初始化
List<Integer> list = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList初始化的时候,不支持指定数组长度。
初始化过程即 创建了一个长度为0的数组。
添加元素-add(E e)
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 直接加锁,保证同一时间只有一个线程进行添加操作
lock.lock();
try {
// 获取当前存储元素的数组
Object[] elements = getArray();
int len = elements.length;
// 直接复制一份数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 修改复制出来的数组
newElements[len] = e;
// 将元素数组设定为复制出来的数组
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
添加元素的流程:
- 先使用ReentrantLock加锁,保证线程安全。
- 再创建一个新数组,长度是原数组长度+1,并把原数组元素拷贝到新数组里面。
- 然后在新数组末尾位置赋值
- 使用新数组替换掉原数组
- 最后释放锁
add() 方法添加元素的时候,并没有在原数组上进行赋值,而是创建一个新数组,在新数组上赋值后,再用新数组替换原数组。这是为了利用volatile关键字的特性,如果直接在原数组上进行修改,其他线程是感知不到的。只有重新对原数组对象进行赋值,其他线程才能感知到
。 还有一个需要注意的点是,每次添加元素的时候都会创建一个新数组,并涉及数组拷贝,相当于每次都进行扩容操作。当数组较大,性能消耗较为明显。所以CopyOnWriteArrayList适用于读多写少的场景,如果存在较多的写操作场景,性能也是一个需要考虑的因素。
获取元素-get(int index)
public E get(int index) {
return get(getArray(), index);
}
对于读操作不加锁。
删除元素-remove()
// 按照下标删除元素
public E remove(int index) {
// 加锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取原数组
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
// 计算需要移动的元素个数
int numMoved = len - index - 1;
if (numMoved == 0) {
// 0表示删除的是数组末尾的元素
setArray(Arrays.copyOf(elements, len - 1));
} else {
// 创建一个新数组,长度是原数组长度-1
Object[] newElements = new Object[len - 1];
// 把原数组下标前后两段的元素都拷贝到新数组里面
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
// 替换原数组
setArray(newElements);
}
return oldValue;
} finally {
// 释放锁
lock.unlock();
}
}
删除元素的流程:
- 先使用ReentrantLock加锁,保证线程安全。
- 再创建一个新数组,长度是原数组长度-1,并把原数组中剩余元素(不包含需要删除的元素)拷贝到新数组里面。
- 使用新数组替换掉原数组
- 最后释放锁
批量删除-removeAll()
// 批量删除元素
public boolean removeAll(Collection<?> c) {
// 参数判空
if (c == null) {
throw new NullPointerException();
}
// 加锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取原数组
Object[] elements = getArray();
int len = elements.length;
if (len != 0) {
// 创建一个新数组,长度暂时使用原数组的长度,因为不知道要删除多少个元素。
Object[] temp = new Object[len];
// newlen表示新数组中元素个数
int newlen = 0;
// 遍历原数组,把需要保留的元素放到新数组中
for (int i = 0; i < len; ++i) {
Object element = elements[i];
if (!c.contains(element)) {
temp[newlen++] = element;
}
}
// 如果新数组没有满,就释放空白位置,并覆盖原数组
if (newlen != len) {
setArray(Arrays.copyOf(temp, newlen));
return true;
}
}
return false;
} finally {
// 释放锁
lock.unlock();
}
}
批量删除元素的流程:
- 先使用ReentrantLock加锁,保证线程安全。
- 再创建一个新数组,长度暂时使用原数组的长度,因为不知道要删除多少个元素。
- 然后遍历原数组,把需要保留的元素放到新数组中。
- 释放掉新数组中空白位置,再使用新数组替换掉原数组。
- 最后释放锁
如果遇到需要一次删除多个元素的场景,尽量使用 removeAll()
方法,因为 removeAll()
方法只涉及一次数组拷贝,性能比单个删除元素更好。
Q&A
当遍历CopyOnWriteArrayList的过程中,同时增删CopyOnWriteArrayList中的元素,会发生什么情况?
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class Test {
public static void main(String[] args) {
List<Integer> list = new CopyOnWriteArrayList<>();
list.add(1);
list.add(2);
list.add(2);
list.add(3);
// 遍历ArrayList
for (Integer key : list) {
// 判断如果元素等于2,则删除
if (key.equals(2)) {
list.remove(key);
}
}
// 输出结果: [1, 3]
System.out.println(list);
}
}
没有抛出异常,还把CopyOnWriteArrayList中重复的元素也都删除了。
原因是CopyOnWriteArrayList重新实现迭代器,拷贝了一份原数组的快照,在快照数组上进行遍历。
这样做的优点是其他线程对数组的并发修改,不影响对快照数组的遍历,但是遍历过程中无法感知其他线程对数组修改。
jdk 11 对 CopyOnWriteArrayList 源码做了修改,lock采用 synchronized
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
final transient Object lock = new Object();
private transient volatile Object[] array;
final void setArray(Object[] a) {
array = a;
}
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}
public boolean add(E e) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
es = Arrays.copyOf(es, len + 1);
es[len] = e;
setArray(es);
return true;
}
}
ConcurrentHashMap()
JDK7之前,ConcurrentHashMap将所有数据分为一段一段地存储,先分很多段出来,每一段都给一把锁,当一个线程占锁访问时,只会占用其中一把锁,也就是仅仅锁了一小段数据,而其他段的数据依然可以被其他线程正常访问。
JDK8之后,采用 CAS 算法配合 synchronized 锁机制实现。
public class ConcurrentHashMap<K, V>
extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
/**
* 数组最大容量
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 数组初始容量
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* 负载系数(超过就要扩容)
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 树化阈值(节点数超过阈值,需要把链表转换成红黑树)
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 反树化阈值(当红黑树节点数量小于此阈值,重新转换成链表)
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 转换成红黑树的最少元素数量
*/
static final int MIN_TREEIFY_CAPACITY = 64;
/*
* 特殊节点的哈希值
*/
static final int MOVED = -1; // 转移节点的哈希值
static final int TREEBIN = -2; // 红黑树根节点的哈希值
static final int RESERVED = -3; // 临时节点的哈希值
/**
* 节点数组
*/
transient volatile Node<K, V>[] table;
/**
* 备用数组,用于扩容
*/
private transient volatile Node<K, V>[] nextTable;
/**
* 元素个数(没有统计并发的操作)
*/
private transient volatile long baseCount;
/**
* 并发的元素个数(想要统计总数量,需要加上baseCount)
*/
private transient volatile CounterCell[] counterCells;
/**
* 表大小调整的控件
* 1. 是负数时,表示正在调整初始化或者扩容,值为 -(1 + 并发线程数)
* 2. 是正数时,表示扩容的阈值(第一次是12)
*/
private transient volatile int sizeCtl;
}
ConcurrentHashMap
跟HashMap一样,都是继承自AbstractMap,并实现了Map接口。
ConcurrentHashMap
实现的是ConcurrentMap接口,而ConcurrentMap接口继承自Map接口。所以ConcurrentHashMap
跟HashMap拥有的方法是一样的,出现的地方可以相互替换。
ConcurrentHashMap
类里面的属性跟HashMap类似,但也有一些不同:
链表与红黑树的相互转换,当链表大于等于TREEIFY_THRESHOLD(树化阈值,默认是8),并且元素个数大于MIN_TREEIFY_CAPACITY(默认是64),就会把链表转换成红黑树。当红黑树节点个数小于等于UNTREEIFY_THRESHOLD(反树化阈值,默认是6),就会把红黑树转换成链表。
比如特殊节点的哈希值,MOVED(-1)、TREEBIN(-2)、RESERVED(-3),都是负数。一般key的哈希值都是正数,
ConcurrentHashMap
把这些节点的哈希值设置成负数,表示这些节点正处于某种状态,相当于复用了节点的哈希值。table数组用于存储元素节点,nextTable数组用在扩容期间临时存储元素。
ConcurrentHashMap
并没有用于记录元素个数的size变量,而是使用baseCount和counterCells相加之和,计算元素数量。baseCount用于记录正常情况下的元素个数,counterCells用于记录多个线程并发的元素个数。sizeCtl变量,负数时表示
ConcurrentHashMap
正在初始化或者扩容,正数时表示扩容的阈值,也是一值多用,大家在做架构时也可以参考这种设计。ConcurrentHashMap
还定义了4个节点内部类,分别是链表节点Node、红黑树节点TreeNode、管理红黑树的容器TreeBin、扩容时的临时转移节点ForwardingNode。javapublic class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { /** * 链表节点类 */ static class Node<K, V> implements Map.Entry<K, V> { final int hash; final K key; volatile V val; volatile Node<K, V> next; Node(int hash, K key, V val, Node<K, V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } } /** * 红黑树节点类 */ static final class TreeNode<K, V> extends Node<K, V> { TreeNode<K, V> parent; TreeNode<K, V> left; TreeNode<K, V> right; TreeNode<K, V> prev; boolean red; TreeNode(int hash, K key, V val, Node<K, V> next, TreeNode<K, V> parent) { super(hash, key, val, next); this.parent = parent; } } /** * 管理红黑树的容器 */ static final class TreeBin<K, V> extends Node<K, V> { TreeNode<K, V> root; volatile TreeNode<K, V> first; volatile Thread waiter; volatile int lockState; } /** * 转移节点(扩容时使用) */ static final class ForwardingNode<K, V> extends Node<K, V> { final Node<K, V>[] nextTable; ForwardingNode(Node<K, V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } } }
构造方法源码
ConcurrentHashMap
常见的构造方法有四个:
- 无参构造方法
- 指定容量大小的构造方法
- 指定容量大小、负载系数的构造方法
- 指定容量大小、负载系数、并发度的构造方法
/**
* 无参初始化
*/
Map<Integer, Integer> map1 = new ConcurrentHashMap<>();
/**
* 指定容量大小的初始化
*/
Map<Integer, Integer> map2 = new ConcurrentHashMap<>(16);
/**
* 指定容量大小、负载系数的初始化
*/
Map<Integer, Integer> map3 = new ConcurrentHashMap<>(16, 0.75f);
/**
* 指定容量大小、负载系数、并发度的初始化
*/
Map<Integer, Integer> map4 = new ConcurrentHashMap<>(16, 0.75f, 1);
对应的构造方法的源码实现:
/**
* 无参初始化
*/
public ConcurrentHashMap() {
}
/**
* 指定容量大小的初始化
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0) {
throw new IllegalArgumentException();
}
// 计算合适的初始容量(必须是2的倍数)
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
/**
* 指定容量大小、负载系数的初始化
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
// 调用下面的构造方法,并发度默认是1
this(initialCapacity, loadFactor, 1);
}
/**
* 指定容量大小、负载系数、并发度的初始化
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) {
throw new IllegalArgumentException();
}
if (initialCapacity < concurrencyLevel) {
initialCapacity = concurrencyLevel;
}
// 计算合适的初始容量(必须是2的倍数)
long size = (long) (1.0 + (long) initialCapacity / loadFactor);
int cap = (size >= (long) MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int) size);
this.sizeCtl = cap;
}
ConcurrentHashMap
跟HashMap一样,初始化的时候并没有初始化数组table,只是记录了数组的大小sizeCtl,sizeCtl的大小必须是2的倍数,原因是可以使用与运算计算key所在数组下标位置,比求余更快
。
ConcurrentHashMap
初始化的时候数组大小是多少?答案是0,因为ConcurrentHashMap
初始化的时候,并没有初始化数组,而是在第一次调用put()
方法时,才初始化数组。
put方法源码
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 键值不能为空
if (key == null || value == null) throw new NullPointerException();
// 计算键的hash值,用于确定在哈希表中的位置
int hash = spread(key.hashCode());
// 用来记录链表长度
int binCount = 0;
// CAS自旋锁
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 如果数组(哈希表)为空肯定是要进行初始化的,然后再重新进下一轮循环
tab = initTable();
// 如果哈希表该位置为null,直接CAS插入结点作为头结即可(注意这里会将f设置当前哈希表位置上的头结点)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
// 如果CAS成功,直接break结束put方法,失败那就继续下一轮循环
break;
// 头结点哈希值为-1,这里只需要知道是因为正在扩容即可
} else if ((fh = f.hash) == MOVED)
// 帮助进行迁移,完事之后再来下一次循环
tab = helpTransfer(tab, f);
// 正常流程走这里
else {
V oldVal = null;
// 前面的循环中f肯定是被设定为了哈希表某个位置上的头结点,这里把它作为锁加锁了,防止同一时间其他线程也在操作哈希表中这个位置上的链表或是红黑树
synchronized (f) {
if (tabAt(tab, i) == f) {
// 头结点的哈希值大于等于0说明是链表,下面就是针对链表的一些列操作
if (fh >= 0) {
...实现细节略
// 肯定不大于0,肯定也不是-1,还判断是不是TreeBin,所以不用猜了,肯定是红黑树,下面就是针对红黑树的情况进行操作
} else if (f instanceof TreeBin) {
// 在ConcurrentHashMap并不是直接存储的TreeNode,而是TreeBin
...实现细节略
}
}
}
// 根据链表长度决定是否要进化为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 注意这里只是可能会进化为红黑树,如果当前哈希表的长度小于64,它会优先考虑对哈希表进行扩容
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
put()方法的源码主要步骤如下:
- 先计算key的哈希值。
- 然后使用死循环遍历数组,保证put操作必须成功。
- 判断数组是否为空,如果为空,就执行初始化数组的操作,这时候才进行数组初始化。
- 如果数组不为空,就计算key所在的数组下标位置,也就是使用哈希值对
数组长度-1
进行与运算,((n - 1) & hash)
,相当于对数组长度-1
求余,这就是为什么要求数组长度必须是2的倍数。如果数组下标位置为空,直接插入元素即可,使用CAS保证插入成功。 - 判断如果当前节点哈希值等于MOVED(-1,表示正在扩容),就帮忙扩容。
- 如果下标位置不为空,使用synchronized锁住当前节点,防止其他线程并发修改。
- 再次检查当前节点是否被修改过(双检锁,配合synchronized使用,防止加锁之前刚好被修改)。
- 判断当前节点哈希值,如果大于0,表示是链表
- 循环遍历链表
- 如果在链表中找到相同的key,直接返回
- 如果遍历链表结束了还没找到,就创建新节点,并追加到链表末尾。
- 判断如果当前节点类型是红黑树,就执行红黑树的插入逻辑。
- 如果插入了新节点,就判断链表长度,是否转换成红黑树。
- 如果链表长度大于等于树化阈值,就把链表转换成红黑树
- 把本次操作添加到元素计数器中,并判断是否需要扩容,如果需要则执行扩容逻辑。
put 方法初始化数组
/**
* 初始化数组
*/
private final Node<K, V>[] initTable() {
Node<K, V>[] tab;
int sc;
// 1. 使用自旋保证初始化成功
while ((tab = table) == null || tab.length == 0) {
// 2. 如果sizeCtl小于零,表示有其他线程正在执行初始化,则让出CPU调度权
if ((sc = sizeCtl) < 0) {
Thread.yield();
} else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 3. 使用CAS更新sizeCtl值为-1,表示正在初始化
try {
// 4. 再次检查数组是否为空
if ((tab = table) == null || tab.length == 0) {
// 5. 执行初始化数组,容量为默认容量16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
初始化数组的时候,充分考虑了并发的情况,如果一个正在执行初始化数组操作,其他线程该怎么办?
首先使用while死循环保证可以初始化成功,然后判断sizeCtl
大小,如果小于零,表示其他线程正在执行初始化,则调用Thread.yield()
让出CPU调度权,而获得CPU调度权线程使用CAS把sizeCtl设置成-1,最后使用双重检查锁,并初始化数组容量是默认大小16。
put 方法 扩容
**扩容的流程用一句话概括就是:**先创建一个新数组,然后遍历原数组,拷贝元素到新数组中,最后把新数组直接赋值给原数组。 扩容具体流程如下:
创建一个新数组,容量是原数组的2倍
遍历原数组,从后向前遍历。
每遍历一个元素,就使用
synchronized
锁住当前下标位置节点,防止其他线程修改。如果当前下标节点是链表,就执行链表的拷贝方法。如果是红黑树,就执行红黑树的拷贝方法。至于为什么要拆分成低位和高位两段数据,这是个取巧的方法,具体原因可以看一下前面剖析HashMap源码的文章。
把元素拷贝到新数组后,再把原数组下标位置设置成转移节点类型ForwardingNode
遍历完成后,直接把新数组直接赋值给原数组。
/**
* 扩容方法
*
* @param tab 元素数组
* @param nextTab 临时数组(用来临时存储扩容后的元素),此时传参为null
*/
private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
stride = MIN_TRANSFER_STRIDE;
}
// 1. 初始化临时数组,容量是原数组的2倍
if (nextTab == null) {
try {
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
boolean advance = true;
boolean finishing = false;
// 2. 循环遍历原数组,并将元素拷贝临时数组
for (int i = 0, bound = 0; ; ) {
Node<K, V> f;
int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing) {
advance = false;
} else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
} else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
// 3. 从后向前遍历
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
// 4. 遍历完成后,将临时数组赋值给原数组
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
return;
}
finishing = advance = true;
i = n;
}
} else if ((f = tabAt(tab, i)) == null) {
advance = casTabAt(tab, i, null, fwd);
} else if ((fh = f.hash) == MOVED) {
advance = true;
} else {
// 5. 锁住当前下标位置节点,防止其他线程修改
synchronized (f) {
if (tabAt(tab, i) == f) {
// 6. 哈希值大于0,表示是链表,就执行链表的拷贝方式(拆分成低位链表ln和高位链表hn)
Node<K, V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K, V> lastRun = f;
for (Node<K, V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
} else {
hn = lastRun;
ln = null;
}
for (Node<K, V> p = f; p != lastRun; p = p.next) {
int ph = p.hash;
K pk = p.key;
V pv = p.val;
if ((ph & n) == 0) {
ln = new Node<K, V>(ph, pk, pv, ln);
} else {
hn = new Node<K, V>(ph, pk, pv, hn);
}
}
// 7. 将低位链表和高位链表拷贝到临时数组,
// 并把原数组的下标位置设置成转移节点ForwardingNode
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
} else if (f instanceof TreeBin) {
// 8. 如果节点类型是红黑树,就执行红黑树的拷贝方式
// 跟链表一样,拆分成低位红黑树和高位红黑树
TreeBin<K, V> t = (TreeBin<K, V>) f;
TreeNode<K, V> lo = null, loTail = null;
TreeNode<K, V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K, V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K, V> p = new TreeNode<K, V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null) {
lo = p;
} else {
loTail.next = p;
}
loTail = p;
++lc;
} else {
if ((p.prev = hiTail) == null) {
hi = p;
} else {
hiTail.next = p;
}
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K, V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K, V>(hi) : t;
// 9. 将低位红黑树和高位红黑树拷贝到临时数组,
// 并把原数组的下标位置设置成转移节点ForwardingNode
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
综上,ConcurrentHashMap的put操作,实际上是对哈希表上的所有头结点元素分别加锁,理论上来说哈希表的长度很大程度上决定了ConcurrentHashMap在同一时间能够处理的线程数量,这也是为什么treeifyBin()
会优先考虑为哈希表进行扩容的原因。显然,这种加锁方式比JDK7的分段锁机制性能更好。
get方法源码
/**
* get方法入口
*/
public V get(Object key) {
Node<K, V>[] tab;
Node<K, V> e, p;
int n, eh;
K ek;
// 1. 计算哈希值
int h = spread(key.hashCode());
// 2. 判断如果数组不为空,下标位置也不为空,就进行查找
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 3. 如果找到对应的key,直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek))) {
return e.val;
}
} else if (eh < 0) {
// 4. 如果哈希值小于0,表示是转移节点或者红黑树,则执行特殊查找逻辑
return (p = e.find(h, key)) != null ? p.val : null;
}
// 5. 否则是链表,则循环遍历链表
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
return e.val;
}
}
}
return null;
}
remove方法源码
/**
* 删除方法入口
*/
public V remove(Object key) {
// 调用替换节点的方法
return replaceNode(key, null, null);
}
/**
* 替换节点(只是把节点值替换成null,并不删除节点)
*/
final V replaceNode(Object key, V value, Object cv) {
// 1. 计算哈希值
int hash = spread(key.hashCode());
// 2. 遍历数组
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f;
int n, i, fh;
// 3. 如果数组为空,或者数组所在下标位置为null,则退出
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null) {
break;
} else if ((fh = f.hash) == MOVED) {
// 4. 判断如果当前节点哈希值等于MOVED(表示正在扩容),就帮忙扩容
tab = helpTransfer(tab, f);
} else {
V oldVal = null;
boolean validated = false;
// 5. 如果下标位置不为空,先锁住当前节点
synchronized (f) {
// 6. 再次检查当前节点是否被修改过
if (tabAt(tab, i) == f) {
// 7. 判断当前节点哈希值,如果大于0,表示是链表
if (fh >= 0) {
validated = true;
// 8. 循环遍历链表
for (Node<K, V> e = f, pred = null; ; ) {
K ek;
// 9. 如果找到相同的key,就重置value值
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null) {
e.val = value;
} else if (pred != null) {
pred.next = e.next;
} else {
setTabAt(tab, i, e.next);
}
}
break;
}
pred = e;
// 10. 如果遍历结束了还没找到,就退出
if ((e = e.next) == null) {
break;
}
}
} else if (f instanceof TreeBin) {
// 11. 判断如果当前节点是红黑树,就执行红黑树的遍历逻辑
validated = true;
TreeBin<K, V> t = (TreeBin<K, V>) f;
TreeNode<K, V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
// 12. 如果找到相同的key,就重置value值
if (cv == null || cv == pv || (pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null) {
p.val = value;
} else if (t.removeTreeNode(p)) {
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
}
if (validated) {
// 13. 如果删除成功了,就把元素个数减一
if (oldVal != null) {
if (value == null) {
addCount(-1L, -1);
}
return oldVal;
}
break;
}
}
}
return null;
}
Q&A
ConcurrentHashMap是怎么保证线程安全的?
核心就是使用synchronized和cas。
put和扩容的时候,使用synchronized锁住头节点,防止其他线程修改。
初始化的时候使用while死循环+cas,保证初始化成功。
ConcurrentHashMap的分段锁有哪些缺点?
ConcurrentHashMap的分段锁不全是优点。首先,ConcurrentHashMap分段锁的设计非常复杂,增加了理解难度和维护成本。由于ConcurrentHashMap使用的是分段锁,而不是整体加锁,导致需要需要访问所有元素的方法无法保证强一致性,比如contains()、size()方法等。
ConcurrentHashMap在Java8版本对分段锁做了哪些改进?
在Java8之前,调用put方法的时候,使用ReentrantLock锁住数组整个下标位置。如果需要扩容,还要等扩容结束,才会释放锁,加锁粒度更大、耗时更长。 在Java8中,调用put方法的时候,使用synchronized锁住下标位置的头节点,put结束后立即释放锁。如果需要扩容,再重新每个下标位置的头节点加锁。与Java8之前相比,加锁粒度更小,耗时更短,而且还引入了cas无锁操作,进一步提升了并发性能。