第六章 Java 并发容器和框架
6.1 ConcurrentHashMap
ConcurrentHashMap 是线程安全且高效的 HashMap。
6.1.1 为什么使用 ConcurrentHashMap
1. HashMap 线程不安全
在多线程环境下,使用 HashMap 进行 put 操作会引起死循环,导致 CPU 利用率接近 100%,所以在并发情况下不能使用 HashMap。
HashMap 在并发执行 put 操作时会引起死循环,是因为多线程会导致 HashMap 的 Entry 链表形成环形数据结构,一旦形成环形数据结构,Entry 的 next 节点永不为空,就会产生死循环获取 Entry。
2. HashTable 效率低下
HashTable 容器使用 synchronized 来保证线程安全,但在线程竞争激烈的情况下 HashTable 效率低下。
因为当一个线程访问 HashTable 的同步方法,其他线程再访问时,会进入阻塞或轮询状态。
3. ConcurrentHashMap 的锁分段技术
HashTable 在线程竞争激烈时的效率低下原因是所有访问 HashTable 的线程都必须竞争同一把锁。
而ConcurrentHashMap 的锁分段技术,将容器中的数据分段存储起来,为每一段数据配一把锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问。
6.1.2 ConcurrentHashMap 的结构
ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。
Segment 是一种可重入锁(ReentrantLock),扮演锁的角色。Segment与HashMap 类似,是一种数组和链表结构。
HashEntry 则用于存储键值对数据。每个 Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须先获得对应的 Segment 锁。
6.1.3 ConcurrentHashMap 初始化
通过 initalCapacity、loadFactor 和 concurrencyLevel 等几个参数来初始化 segment 数组、段偏移量 segmentShift、段掩码 segmentMask 和每个 segment 里的 HashEntry 数组,以此来初始化 ConcurrentHashMap。
/**
* Creates a new, empty map with an initial table size based on
* the given number of elements ({@code initialCapacity}), table
* density ({@code loadFactor}), and number of concurrently
* updating threads ({@code concurrencyLevel}).
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements,
* given the specified load factor.
* @param loadFactor the load factor (table density) for
* establishing the initial table size
* @param concurrencyLevel the estimated number of concurrently
* updating threads. The implementation may use this value as
* a sizing hint.
* @throws IllegalArgumentException if the initial capacity is
* negative or the load factor or concurrencyLevel are
* nonpositive
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
1. 初始化 Segments 数组
if(concurrencyLevel > MAX_SEGMENTs)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
while(ssize < concurrencyLevel){ // 必须保证 segments 数组长度是 2的N次方,如 concurrencyLevel = 14,ssize = 16
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
Segments 数组的程度是由 ssize 决定的,而ssize 是通过 concurrencyLevel 计算得出的 ,原因是必须保证 ssize 是 2的N次方。
2. 初始化 SegmentShift 和 SegmentMask
段偏移量 SegmentShift:segmentShift = 32 - sshift;
段掩码:segmentMask = ssize - 1; 是散列运算的掩码,等于 ssize - 1,因为 ssize 为2的N次方,因此 segmentMask 掩码的二进制各个位都是 1.
3. 初始化每个 segment
initialCapacity 是 ConcurrentHashMap 的初始化容量,loadFactor 是每个 segment 的负载因子。
4. 定位 Segment
ConcurrentHashMap 在插入和获取元素时,会通过散列算法定位 Segment,并会对元素的 hashCode 进行一次再散列。
在散列的目的是减少散列冲突,使元素能均匀地分布在不同的 Segment 上,从而提高容器的存取效率。
如果不进行再散列的话,无论散列值的高位是多少,只要低位相同,都会被存储到一个Segment 上。
6.1.5 ConcurrentHashMap 操作
1. get操作
先进行再散列,然后使用散列值进行散列运算,定位 Segment,再通过散列算法定位到元素。
public V get(Object key){
int hash = hash(key.hash);
return segmentFor(hash).get(key,hash);
}
整个 get 操作不需要加锁,除非读到的值是空才会加锁重读。原因是 ConcurrentHashMap 将需要共享的变量都定义为 volatile 类型。
2. put操作
put操作会对共享变量进行写操作,所以必须加锁。
put 方法首先定位到 Segment,然后在 Segment 里进行插入操作。在插入之前,先判断是否需要对 Segment 里的 HashEntry 进行扩容,然后定位添加元素的位置,将其放入 HashEntry 数组中。
3. size 操作
统计 ConcurrentHashMap 里元素的大小,就必须统计所有 Segment 里元素的大小后求和。 Segment 里的count 是一个 volatile 变量,但是多线程场景下,某个 segment 的 count 发生改变后,也可能会使结果不准确。
因此 size 操作的具体过程为:先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,如果统计过程中,容器的 count 发生了变化(容器 count 会在 put、remove、clean 时会使 modCount + 1),再采用加锁的方式(将所有 Segment 的 put、remove、clean 方法全部锁住)来统计 Segment 的大小。
6.2 ConcurrentLinkedQueue
ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,采用 FIFO 的规则对节点进行排序。
6.2.1 ConcurrentLinkedQueue 结构
ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个 next 关联,组成一张链表结果的队列。
6.2.2 入队列
/**
* 插入元素到当前队列的尾部,且因为队列是无界的,结果总是true
*/
public boolean offer(E e) {
checkNotNull(e);
// 1. 将插入元素构建为 node
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
if (p.casNext(null, newNode)) {
if (p != t)
casTail(t, newNode);
return true;
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
1. 入队列
入队列就是将入队节点添加到队列的尾部。在入队时,先将入队节点设置成当前队列尾节点的下一个节点,然后更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点,如果 tail 节点的 next 节点为空,则将入队节点设置为 tail 的 next 节点。
在设置入队节点为 tail 的 next 节点时,会使用 CAS 来保证多线程下的安全性。
2. 定位尾节点
/**
* Returns the successor of p, or the head node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
3. 设置入队节点为尾节点
p.casNext(null,n) 将入队节点设置为当前队列尾节点的 next 节点,如果 p 为null,表示 p 是当前队列的尾节点,不为空则表示其他线程更新了尾节点,需要重新获取当前队列的尾节点。
6.2.3 出队列
public E poll() {
restartFromHead:
for (;;) {
// 1. 获取头节点
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 2. 头节点不为空,则用 CAS 将头节点的引用置null;为空则表示头节点被另一线程的出队操作取出
if (item != null && p.casItem(item, null)) {
// 3. CAS 操作成功,直接返回头节点
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 4. 如果取出失败,则继续进行循环取值
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
6.3 阻塞队列
6.3.1 什么是阻塞队列
阻塞队列是一个支持阻塞插入和阻塞移除的方法。
插入和移除操作的4中处理方式:
方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove(e) | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | / | / |
6.3.2 Java 阻塞队列
JDK 中提供了 7个阻塞队列:
- ArrayBlockingQueue:由数组结构组成的有界阻塞队列
- 队列按照FOFO原则对元素进行排序
- 默认不保证线程访问队列时的公平性
- LinkedBlockingQueue:由链表结构组成的有界阻塞队列
- FIFO原则对元素进行排序
- 队列默认和最大长度为 Integer.MAX_VALUE
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列
- 支持优先级、无界阻塞
- 元素采用自然顺序升序排列,也可以自定义 compareTo 来排序
- 不能保证同优先级元素的顺序
- DelayQueue:使用优先级队列实现的无界阻塞队列
- 支持延迟获取元素
- 队列元素必须实现 Delayed 接口
- 队列使用 PriorityQueue 实现
- 创建元素时可指定多久后才能获取当前元素
- DelayQueue常用于:
- 缓存系统的设计:用 DelayQueue 保存缓存元素的有效期
- 定时任务调度:使用 DelayQueue 保存执行的任务和执行时间
- SynchronousQueue:不存储元素的阻塞队列
- 不存储元素,每个put操作必须等待一个 take 操作
- 支持公平访问队列,默认是非公平的
- 队列本身不存储任何元素,适合传递性场景
- 吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- 提供 transfer 方法,立即将元素 transfer 给消费者,如果无消费者等待则将元素放在队列的 tail 节点,等待消费者消费
- 提供 tryTransfer 方法,试探生产者传入的元素能够直接传给消费者,如果无消费者等待接收,则返回false。
- LinkedBlockingQueue:由链表结构组成的双向阻塞队列
- 双向队列,使多线程同时入队时,减少了一般竞争,因为多了一个入队口
- 增加了 addFirst、addLast、offerFirst、offerLast、peekFirst、peekLast等方法
- add 等同于 addLast,remove 等同于 removeFirst