# 基础API
# Thread.sleep()
while(true) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {}
}
此处可避免while() {}
循环空转消耗cpu性能,使用sleep()
或yield()
适当让出cpu使用权
适用于无须锁同步的场景
# <thread1>.join()
调用线程进行阻塞,同步等待thread1执行结束
Thread t1; // 休眠1s
Thread t2; // 休眠2s
t1.join();
t2.join(); // 只需额外等待1s(两线程与主线程同时开始执行,主线程分别只对两线程的结束时机进行同步等待)
# <thread1>.interrupt()
- 正常运行的线程被打断后,打断标记
isInterrupted()
为true
(意味着可通过该标记进行后续处理) - 通过调用
sleep()
、wait()
、join()
等方法处于阻塞状态的线程被打断后,打断标记将被自动清空,置为false
(意味着线程已进入异常,该标记已失去作用,可在catch
块中重新设置标记为true
) - 通过调用
park()
方法处于阻塞状态的线程被打断后,打断标记为true
;若不手动重置该标记为false
(静态方法Thread.interrupt()
),则后续的park()
操作均会失效
二阶段终止模式实现监控流程
graph TD w("while(true)") --> a a("isInterrupted()") -- true --> b("释放资源") b --> c("结束循环") a -- false --> d("sleep()") d -- 无异常 --> e("执行监控业务") d -- 出现异常 --> i("设置打断标记") i --> w e --> w
class TwoPhaseTermination { private Thread monitor; public void startMonitor() { monitor = new Thread(() -> { while (true) { Thread currentThread = Thread.currentThread(); if (currentThread.isInterrupted()) { /* 释放资源 */ break; } try { Thread.sleep(2000); /* 执行监控业务 */ } catch(InterruptedException e) { currentThread.interrupt(); // 阻塞过程中被打断,需手动再次设置打断标记(重新打断) } } }); monitor.start(); } public void stopMonitor() { monitor.interrupt(); } }
# 共享内存模型:管程(悲观锁,阻塞式)
避免临界区竞态条件发生的多种方式
# synchronized
- 通过对象锁保证临界区内代码的原子性
synchronized
锁为可重入锁- 对象模型:
- 重入计数器❓
- Contention List
- Entry List
- Wait Set
- On Deck
# 局部变量和共享变量的线程安全问题
- 不同线程对共享变量进行并发增删引发线程安全问题;而对于局部变量,不同线程分别拥有一份自己的局部变量实例,此时不引发线程安全问题
- 若在父类public方法/非final方法/abstract方法中传入或返回对象(局部变量引用了对象,并逃离该栈帧,即发生引用泄漏),则子类可对父类方法进行重写(外星方法),以实现对该局部变量的并发增删(方法内部开启线程对变量进行操作),此时局部变量成为共享变量,仍将引发线程安全问题
- Java Web环境
public class UserServiceImpl implements UserService {
// 无状态,线程安全
private UserDao userDao = new UserDao();
// 非线程安全
private Integer count = 0;
public void update() {
// ...
count++;
}
}
@Aspect
@Component
public class CustomAspect { // 单例
// 非线程安全
private Long start = 0L;
@Before("execution(* *(..))")
public void before() {
start = System.nanoTime();
}
@After("execution(* *(..))")
public void after() {
Long end = System.nanoTime();
// ...
}
}
解决方式:采用环绕通知,将计时器用作局部变量
# synchronized
锁的优化
# 轻量级锁
多线程访问没有竞争现象,优先使用轻量级锁
创建锁记录(Lock Record)对象
锁记录对象中Object Reference指向加锁对象,并尝试通过CAS将锁记录对象中地址字段(
ptr_to_lock_record:30
|00
)与加锁对象的Mark Word(非加锁状态下末两位为01
)替换(若CAS失败,进行锁的膨胀;或进行轻量级锁重入,额外维护一个新的锁记录对象信息)
(解锁)通过CAS进行解锁,还原加锁对象的Mark Word(若CAS失败,执行重量级锁的解锁流程)
# 偏向锁(轻量级锁的优化)
在加锁对象的Mark Word字段中加入当前线程的ID信息,当同一线程再次对该对象进行加锁时避免反复的CAS操作;当其他线程对该对象进行加锁时撤销该对象的偏向状态
- 批量重偏向:当反复出现新线程对该对象进行加锁的情况时,不再直接撤销该对象的偏向状态,而是修改该类所有新实例对象的偏向状态为偏向新线程(偏向撤销阈值:20)
- 批量偏向撤销:当再次反复出现另一新线程对该对象进行加锁的情况时,不再修改该对象的偏向状态,而是直接禁用该类所有新实例对象的偏向资格(偏向撤销阈值:40)
因对象的Mark Word字段在不可偏向时才包含hashCode信息(在可偏向时替换为偏向线程ID与epoch信息),故显式调用加锁对象的
hashCode()
方法将撤销该对象的偏向状态(1 | 01
->0 | 01
,倒数第三位为偏向状态,1为可偏向,0为不可偏向)
# 重量级锁
- 为加锁对象申请Monitor锁,将加锁对象的轻量级锁记录地址字段再次替换为Monitor锁地址(
ptr_to_monitor:30
|10
) - 当前线程进入Monitor锁的Entry List,进入**
BLOCKED
**状态 - (解锁)通过加锁对象的Monitor锁地址信息定位Monitor锁,设置Monitor锁的Owner对象为空,并唤醒Monitor锁Entry List中阻塞的线程(非公平锁)
自旋优化
# 锁消除
当加锁对象未逃逸局部代码块的作用范围(不可能成为共享变量)时,JIT将锁消除
# 线程等待与通知
# (Object
)wait()
/notify()
# 底层原理
在锁对象上调用wait()
前必须确保获取对象的锁(在同步代码块中进行),并发生锁的膨胀(即只使用重量级锁),当前线程进入Monitor
锁的Wait Set进行等待(park()
)并释放锁;在锁对象上调用notify()
前也必须确保获取对象的锁(在同步代码块中进行),在退出同步块时对应线程被真正唤醒(unpark()
)后回到Entry List中进行阻塞,直至再次竞争锁成功,继续执行wait()
后代码
notifyAll()
按照LIFO顺序唤醒Entry List中阻塞的线程
// 线程A,消费者
synchronized(lock) {
while (condition) { // 使用while循环而非if,从而在线程被唤醒后保证等待条件不满足时才继续执行,以避免虚假唤醒
lock.wait(timeout);
}
// 执行正常流程
}
// 线程B,生产者
synchronized(lock) {
/*
lock.notify()将随机唤醒Entry List中的线程,故可能造成虚假唤醒
*/
lock.notifyAll();
}
# 同步模式之保护性暂停(Guarded Suspension)模式
当线程t1等待线程t1的执行结果,可分别关联同一个
GuardedObject
Object lock; Object response; public Object get(long timeout) { synchronized(lock) { // 记录初始时间 while (response == null) { // 判断消耗时间passedTime是否超过timeout,若是则退出循环 /* 当被虚假唤醒后重新进入循环时,应该继续等待的时间为timeout - passedTime */ this.wait(timeout - passedTime); } } return response; } public void produce(Object rawObject) { synchronized(lock) { response = rawObject; this.notifyAll(); } }
- 一个生产者通过
GuardedObject
对象严格对应一个消费者- 相比于
join()
(原理一致),上述模式中消费者无需等待生产者线程完全结束就能消费资源# 异步模式之异步队列模式(生产者/消费者)
LinkedList list; int capacity; public Object get() { synchronized(list) { while (list.isEmpty()) { list.wait(); } Object o = list.removeFirst(); list.notifyAll(); return o; } return null; } public void put(Object rawObject) { synchronized(list) { while (list.size() == capacity) { list.wait(); } list.addLast(); list.notifyAll(); } }
# (ConditionObject
)await
/signal
// 借助await/signal机制实现生产者/消费者模型
ReentrantLock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
// 线程A,消费者
new Thread(() -> {
lock.lock();
try {
while (/*执行条件A不满足*/) {
condition1.await();
}
// 执行业务操作
} finally {
lock.unlock();
}
}).start();
// 线程B,生产者
new Thread(() -> {
lock.lock();
try {
// 更改执行条件A
condition1.signal();
} finally {
lock.unlock();
}
}).start();
# 底层原理
在ConditionObject
条件对象上(对当前线程)调用await()
,当前线程若竞争lock成功,则尾插入进AQS等待队列中进行等待(park()
)并释放lock;在ConditionObject
条件对象上调用signal()
将该线程转移至AQS阻塞队列中进行阻塞,直至再次竞争lock成功后被唤醒(unpark()
),继续执行await()
后代码
signalAll()
按照FIFO顺序唤醒AQS阻塞队列中阻塞的线程
# await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将当前线程包装成Node,尾插入到AQS等待队列中
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,在释放的程中会唤醒AQS阻塞队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取同步状态/lock(成功获取lock是退出await()的充要条件)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 5. 处理中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
# signal()
将AQS等待队列中等待时间最长的节点移动到AQS阻塞队列中
public final void signal() {
//1. 先检测当前线程是否已经获取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2. 获取AQS等待队列中头节点,将其移动至AQS阻塞队列中等待再次获取lock从而被唤醒
Node first = firstWaiter;
if (first != null)
// doSignal(Node)中的transferForSignal(Node)方法实现对头节点的状态检查、队列转移与(后续的)线程unpark
doSignal(first);
}
Object.wait()
/Object.notify()
与Condition.await()
/Condition.signal()
区别前者与
Monitor
配合完成线程间的等待/通知机制,是java底层级别的;而后者与Lock
配合完成等待/通知机制,是语言级别的,具有更高的可控性和扩展性
后者支持不响应中断,而前者不支持
后者支持多个等待队列(借助多个
ConditionObject
对象实现),而前者只支持一个后者支持超时时间的设置,而前者不支持
# Lock
锁(java.util.concurrent.locks.*
)
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
Condition newCondition();
- 无参构造器默认构造非公平锁
- 支持在多个条件变量上进行等待,在唤醒时可按照不同的条件变量进行精确唤醒
# AbstractQueuedSynchronizer
(AQS)
Lock等同步组件主要专注于实现同步语义,而AQS负责同步状态管理,线程排队、等待与唤醒等底层操作
# 核心组件
State
同步状态通过
State
体现加锁状态与重入次数子类通过
getState()
/setState()
/compareAndSetState()
重写AQS用来改变同步状态的若干protected方法对同步状态的获取/释放可分为独占模式与共享模式
#
AbstractQueuedSynchronizer$Node
双向阻塞队列阻塞队列/同步队列(双向链表,FIFO队列)
volatile int waitStatus; // 节点状态 volatile Node prev; // 当前节点/线程的前驱节点 volatile Node next; // 当前节点/线程的后继节点 volatile Thread thread; // 加入同步队列的线程引用 transient volatile Node head; transient volatile Node tail;
等待队列/条件队列(单向链表):
ConditionObject
transient volatile Node firstWaiter; // 等待队列中的头节点 transient volatile Node lastWaiter; // 等待队列中的尾节点 Node nextWaiter; // 等待队列中的下一个节点
*Lock
->Lock
->AbstractQueuedSynchronizer
->Sync
# 模板方法设计模式
tryAcquire(int)
等方法被*Sync
子类重写(其中*Sync
子类为*Lock
子类的静态内部类),当*Sync
子类调用acquire(int)
等AQS模板方法时就会调用已被本类重写的方法;同步组件通过AQS提供的模板方法实现同步语义
tryAcquire(int)
:独占式获取与释放同步状态tryRelease(int)
tryAcquireShared(int)
:共享式获取与释放同步状态tryReleaseShared(int)
isHeldExclusively()
:查询同步队列中等待线程情况同步器是实现锁的关键:利用同步器可实现锁的语义。锁面向使用者,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态的管理,线程的排队,等待和唤醒等底层操作。锁和同步器针对使用者和实现者实现了关注点分离。
# 独占锁/共享锁的获取与释放
# acquire(int)
public final void acquire(int arg) {
// 尝试获取State,获取失败时进入阻塞队列
if (!tryAcquire() &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
// ...
private Node addWaiter(Node mode) {
// 1. 基于当前线程构建Node节点
Node node = new Node(Thread.currentThread(), mode);
// 2. 查看当前尾节点是否为空
Node pred = tail;
if (pred != null) {
// 将当前节点以“CAS尾插”的方式插入阻塞队列中
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 当前阻塞队列尾节点为null,说明当前线程是第一个加入阻塞队列进行等待的线程:
// 1. 调用compareAndSetHead(new Node())方法,完成阻塞队列头结点的初始化
// 2. 自旋不断尝试CAS尾插入直至成功为止
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
// 若前驱节点为头节点,再次尝试获取State
if (p == head && tryAcquire(arg)) {
// 获取State成功,队列头指针指向当前节点并释放前驱节点
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 获取State失败,线程进入等待状态(通过CAS将节点状态由INITIAL设置成SIGNAL)
if (shouldParkAfterFailedAcquire(p, node))
// parkAndCheckInterrupt()借助LockSupport.park()真正实现线程阻塞
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
# release(int)
public final boolean release(int arg) {
// 尝试释放State
if (tryRelease(arg)) {
Node h = head;
// 释放State成功,唤醒队列头节点后继节点所引用线程(FIFO队列),其中借助LockSupport.unpark()真正实现线程唤醒
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
acquireInterruptibly(int)
tryAcquireNanos(int)
# acquireShared(int)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
# releaseShared(int)
# ReentrantLock
及其实现原理
# 锁的竞争
# 非公平锁(默认)
线程第一次获取锁时不对Node
阻塞队列进行检查,直接通过CAS尝试争抢State
// Class NonFairSync
final void lock() {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
} else {
acquire(1);
}
}
若前驱节点为头节点,
tryAcquire()
再次尝试获取锁获取锁成功
将前驱节点置为空,当前节点置为头节点;
waitStatus
设置为1,Sync对象的exclusiveOwnerThread
设置为当前节点所关联线程
获取锁失败
将前驱节点的
waitStatus
设置为-1(前驱节点有责任对本节点进行唤醒操作);tryAcquire()
再次尝试获取锁,随后调用LockSupport.park()
进行阻塞
释放锁:
waitStatus
设置为0,(若waitStatus
为-1)对阻塞队列中下一节点进行唤醒操作
# 公平锁
线程第一次获取锁时检查Node
阻塞队列中是否有前驱节点(检查虚拟头节点的next节点所关联线程是否为本线程),若没有则通过CAS尝试争抢State
# 线程的打断
acquire()
线程被打断时,仍停留在Node
阻塞队列中,获取锁时能主动执行selfInterrupt()
自我中断
acquireInterruptibly()
线程被打断时,仍停留在Node
阻塞队列中,获取锁时将直接抛出InterruptedException
# 线程的等待与唤醒
持锁线程调用await()
进行等待时:
addConditionWaiter
进入等待队列尾部,节点waitStatus
改为-2,进行park阻塞fullyRelease()
完全释放AQS上的可重入锁- 对阻塞队列下一个节点进行unpark
持锁线程调用signal()
进行唤醒时:
doSignal(first)
对等待队列队首线程进行唤醒transferForSignal()
将唤醒线程加入阻塞队列尾部,节点waitStatus
改为0,而其前驱节点的waitStatus
设置为-1若操作失败(如该线程状态已被设置为取消),则尝试唤醒等待队列下一节点所关联线程
对所唤醒线程进行unpark
# ReentrantReadWriteLock
及其实现原理
写锁支持条件变量,而读锁不支持
升级与降级
- 读锁在重入时不可升级为写锁,此时会发生死锁
- 写锁在重入时可降级为读锁
class CachedData { Object data; volatile boolean isCacheValid; final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); void processCachedData() { lock.readLock().lock(); try (!isCacheValid) { // 本地缓存失效,改为获取写锁更新缓存 // 获取写锁前需释放读锁 lock.readLock().unlock(); lock.writeLock().lock(); try (!isCacheValid) { // 双重检查 // set data isCacheValid = true; } // 将写锁降级为读锁 lock.readLock().lock(); } finally { lock.writeLock().unlock(); } // 真正使用数据 try { // use data } finally { lock.readLock().unlock(); } } }
# 实现原理
# LockSupport.park()
/LockSupport.unpark()
AQS底层通过
park()
/unpark()
实现线程的状态切换
# park()
- 检查
Parker
对象的_counter
字段 - 若
_counter
字段值为0,获取mutex,线程进入_condition
条件变量进行阻塞
# unpark()
- 将
Parker
对象的_counter
字段置为1 - 尝试唤醒
_condition
条件变量中的指定线程
线程开始运行时,设置
_counter
字段为0
# 共享内存模型:无锁(乐观锁,非阻塞式)
# CAS
CAS操作保证原子性,CAS实现类中借助volatile
获取最新值进行比较,以实现线程安全
CAS操作适合短时间内即可获取到锁的场景,若线程需长时间等待锁释放,优先使用悲观锁
# 原子变量(java.util.concurrent.atomic.*
)
# AtomicInteger
/AtomicLong
/AtomicBoolean
# AtomicReference
AtomicReference<BigDecimal> balance;
// constructor
BigDecimal getBalance() {
return balance.get();
}
void withdrawCash(BigDecimal amount) {
while(true) { // 使用乐观锁实现多个组合操作的原子性(BigDecimal单个方法为线程安全)
BigDecimal prev = balance.get();
BigDecimal next = prev.subtract(amount);
if (balance.compareAndSet(prev, next)) {
break;
}
}
}
ABA问题
AtomicStampedReference
AtomicMarkableReference
# AtomicIntegerArray
AtomicIntegerArray states = new AtomicIntegerArray(size);
// ...
while (true) {
for (int i = 0; i < size; i++) {
if (states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
// 获取数组中某元素锁,进行操作(如用于自定义连接池返回可用连接)
}
}
}
// 获取数组中某元素锁,等待,避免循环空转消耗性能(需长时间等待锁释放,使用悲观锁)
synchronized(this) {
this.wait();
// ...
}
}
# AtomicFieldUpdator
# LongAdder
LongAdder
源码分析
- CAS锁
# Unsafe
# 不可变类
DateTimeFormatter
String
# 不可变类对象的设计
String
类的保护性(防御性)拷贝:在有可能对本对象发生修改时,创建新对象返回,因此是线程安全的
String#substring()
未解决对象实例创建过于频繁的问题,引入享元模式
基本类型包装类
在包装类对象创建之前,内部Cache对象已经将需缓存的对象全部创建完成
// Use the archived cache if it exists and is large enough if (archivedCache == null || size > archivedCache.length) { Integer[] c = new Integer[size]; int j = low; for(int i = 0; i < c.length; i++) { c[i] = new Integer(j++); } archivedCache = c; } cache = archivedCache; // range [-128, 127] must be interned (JLS7 5.1.7) assert IntegerCache.high >= 127;
Byte
、Short
、Long
:-128~127Integer
:-128~127(最大值可通过-Djava.lang.Integer.IntegerCache.high
调整)Character
:0~127Boolean
:true、false字符串常量池
BigDecimal
/BigInteger
# JMM
Java内存模型(Java Memory Model,JMM)定义在Java虚拟机规范中,用于屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的并发效果。JMM规范了Java虚拟机与计算机内存是如何协同工作的:规定了一个线程如何和何时可以看到由其他线程修改过后的共享变量的值,以及在必须时如何同步访问共享变量。
JVM内存分区与硬件内存架构之间存在差异。硬件内存架构没有区分线程栈和堆:线程栈和堆可能都分布于主内存中,部分线程栈和堆还可能出现在CPU寄存器与高速缓存中:
因此,JMM还定义了JVM内存分区与硬件内存架构之间的抽象关系:
线程
|
工作内存(对应CPU寄存器与高速缓存)
|
主内存(对应主存)
针对主存、高速缓存及CPU寄存器之间的具体交互细节,常用的缓存一致性协议:MSI、MESI(IllinoisProtocol)、MOSI、Synapse、Firefly及DragonProtocol
针对主内存与工作内存之间的具体交互细节,即一个变量如何从主内存拷贝到工作内存,如何从工作内存同步到主内存,JMM定义了八种操作:
(
lock
-unlock
)->read
->load
->use
->assign
->store
->write
# 可见性(读同步问题)
# volatile
保证代码块内共享变量的可见性(每次只从主存,而非工作内存中读取),但不能保证代码块执行的原子性,故多用于“一写多读”的场景
LOAD
前缀指令实现读写屏障- MESI缓存一致性与总线嗅探
synchronized
可保证代码块的执行原子性及变量可见性(强制刷新工作内存),但一般为重量级操作,性能较低System.out.print()
语句内部有加锁操作,在IO操作后强制刷新工作内存,因此若代码块内有输出操作可不将共享变量声明为volatile
二阶段终止模式实现监控流程
// 非监控线程调用stopMonitor()方法,两共享变量必须在线程间保证可见性 private volatile boolean isStarted; private volatile boolean isStopped; private Thread monitor; public void startMonitor() { if (!isStarted) { synchronized(this) { // 只需将读写共享变量部分纳入同步代码块(缩小同步范围,提升并发性能) if (isStarted) { // balking(犹豫)模式 return; } isStarted = true; } } /* 以下代码不会被多个线程并发执行,故无需加锁,因此所用到的共享变量需声明为volatile */ monitor = new Thread(() -> { while (!isStopped) { try { Thread.sleep(2000); /* 执行监控业务 */ } catch(InterruptedException e) { // 借助isStopped共享变量实现循环优雅退出,不再依赖打断标记 } } /* 释放资源 */ isStarted = false; }); monitor.start(); } public void stopMonitor() { isStopped = true; monitor.interrupt(); // 显式打断,以更快结束可能存在的sleep() }
# ThreadLocal
提供线程内部的局部变量,不同的线程间访问自身变量不会互相干扰(“空间(变量副本)换时间(并行执行)”);作用于线程的整个生命周期,降低同一线程内多方法/组件间公共变量传递的复杂度
- 数据传递:独立保存各个线程绑定的数据,可在同一线程内多方法/组件间安全传递,避免额外传递参数所造成的代码耦合
- 线程隔离:各个线程绑定的数据相互隔离,但又支持并发存取,避免引入同步代码块所造成的性能损失
#
ThreadLocal
使用场景
多线程场景下,跨service-dao层传递原生
JDBC Connection
,使用ThreadLocal
保证同一线程内两层间传递同一JDBC Connection
(数据库连接池实现思路)static ThreadLocal<Connection> threadLocal = new ThreadLocal<>(); private static final DataSource ds = new SomeDataSource(); public static Connection getConnection() throws Exception { Connection conn = threadLocal.get(); if (conn == null) { conn = ds.newConnection(); threadLocal.set(conn); } return conn; }
# ThreadLocalMap
k:ThreadLocal
对象 - v:该对象对应的变量副本
# 以上kv键值对设计的优势
- 以
ThreadLocal
对象而非Thread
对象本身作为key,每个Map存储的Entry数量降低 ThreadLocalMap
对象由Thread
对象本身进行维护,当Thread
销毁时,ThreadLocalMap
对象随之销毁
classDiagram
class ThreadLocalMap
ThreadLocalMap : +set(ThreadLocal<?> key, T value)
ThreadLocalMap : +remove(ThreadLocal<?> key)
ThreadLocalMap : +getEntry(ThreadLocal<?> key)
ThreadLocalMap : +int INITIAL_CAPACITY
ThreadLocalMap : +int size
ThreadLocalMap : +int threshold
ThreadLocalMap : +Entry[] table
# ThreadLocal
常用方法
# ThreadLocal.set(T value)
- 获取当前线程对象t与线程对象内Map(t.threadLocals,即
ThreadLocalMap
对象) - 若所获取的Map不为空,调用
ThreadLocalMap.set(ThreadLocal<?> key, T value)
为以当前ThreadLocal
对象作为key的ThreadLocalMap.Entry
赋值;否则调用createMap(Thread t, T firstValue)
创建线程内Map并赋予初始值
# ThreadLocal.get()
- 获取当前线程对象t与线程对象内Map
- 若所获取的Map不为空,调用
ThreadLocalMap.getEntry(ThreadLocal<?> key)
获取以当前ThreadLocal
对象作为key的ThreadLocalMap.Entry
,进而获取ThreadLocalMap.Entry.value
返回 - 若获取的Map或Entry为空,调用
ThreadLocal.setInitialValue()
进行Map赋值或Map初始化(同ThreadLocal.set(T value)
流程第二步)后返回
# ThreadLocal.remove()
- 获取当前线程对象t与线程对象内Map
- 若所获取的Map不为空,调用
ThreadLocalMap.remove(ThreadLocal<?> key)
移除以当前ThreadLocal
对象作为key的ThreadLocalMap.Entry
引用链:Thread -> ThreadLocalMap -> ThreadLocalMap.Entry => ThreadLocal
ThreadLocalMap.Entry
继承WeakReference<ThreadLocal<?>>
,以便实现**ThreadLocal
生命周期与线程生命周期的解绑**#
ThreadLocal
可能产生的内存泄漏问题(某线程运行时)若不及时对该线程中不再使用的
ThreadLocal
变量threadLocalA
进行显式remove()
操作,虽然该线程对应ThreadLocalMap
的key值(即threadLocalA
变量本身)将在本次GC时被置为空(弱引用),但ThreadLocalMap.Entry
及ThreadLocalMap.Entry.value
均无法立即得到释放(强引用),且后者再无主动访问机会(key为空),造成内存泄漏(即使未进行显式
remove()
操作)下一次执行ThreadLocalMap.getEntry(..)
/set(..)
方法时将对空key进行检查与清理(expungeStaleEntry(int staleSlot)
),故在一定情况下,所发生的内存泄漏将被自动解决(弱引用的好处)因此,需在
ThreadLocal
变量使用结束后显式调用ThreadLocal.remove()
对应Entry进行手动删除,以便实现**ThreadLocalMap
生命周期与线程生命周期的解绑**,彻底杜绝内存泄漏(显式remove()
操作的好处)
# Hash冲突
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY]; // INITIAL_CAPACITY = 16,可看作环形数组
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); // hash函数(开放定址法❓):借助AtomicInteger实现,每次计算hash时自增(getAndAdd)0x61c88647,以维持均匀哈希
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY); // 扩容threshold设置为INITIAL_CAPACITY的2/3
}
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
// 相比于getEntry(..)方法直接根据key值定位Entry后采取快速失败策略,set(..)方法采用开放定址·线性探测(环形)法解决hash冲突,直至触发Entry清理与扩容
}
# 有序性
# 使用volatile
修饰单例变量以防止指令重排序
synchronized
不能阻止指令重排序,但只要所使用的局部变量不逃离同步代码块的作用范围,则虽有重排序却不会造成有序性问题
# as-if-serial语义
不论如何重排序,程序的执行结果不能与单线程执行结果发生冲突。(编译器、runtime和处理器都必须遵守as-if-serial语义)
# happens-before规则
在JMM中,如果一个操作执行的结果需要对另一个操作可见(两个操作既可以是在一个线程之内,也可以是在不同线程之间),那么两操作之间必须满足happens-before规则:
- 程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作
- 监视器锁定规则:对一个锁的解锁操作,happens-before于任意后续对这个锁的加锁操作
- volatile域规则:对一个volatile域的写操作,happens-before于任意后续对这个volatile域的读操作
- 传递性规则:如果A happens-before B,且B happens-before C,那么A happens-before C
- 线程启动、中断、终结规则
- 对象终结规则
以下场景,对共享变量的写操作对其读操作(本线程或其他线程)可见:
- 使用
synchronized
分别对写操作和读操作加锁 - 对
volatile
共享变量分别进行写操作和读操作 - 对共享变量的写操作发生在读线程启动之前
- 读操作阻塞直至写操作线程执行结束(
Thread.join()
)或被打断
# 单例模式最佳实践
# balking(犹豫)模式(配合同步代码块)实现懒汉式单例
public final Class Singleton { private Singleton() {} private static Singleton INSTANCE = null; public static synchronized Singleton getInstance() { if (INSTANCE != null) { return INSTANCE: } INSTANCE = new Singleton(); return INSTANCE; } }
# 锁粒度优化
缩小同步代码块范围,使用
volatile
修饰单例对象避免同步代码块重排序,同时使用double-checked locking(DCL,双重检查、双检锁)实现懒汉式单例private static volatile Singleton INSTANCE = null; public static Singleton getInstance() { // 同步代码之外的共享变量可能存在有序性问题 if (INSTANCE == null) { synchronized(Singleton.class) { if (INSTANCE == null) { INSTANCE = new Singleton(); return INSTANCE; } } } return INSTANCE; }
其他单例模式
# 通过反序列化过程实现的饿汉式单例
- 返回单例使用
final
防止子类继承破坏单例- 实现
Serializable
接口,可重写Object readResolve()
方法防止反序列化过程返回多例public final class Singleton implements Serializable { private Singleton() {} private static final Singleton INSTANCE = new Singleton(); public static Singleton getInstance() { return INSTANCE; } public Object readResolve() { return instance; } }
# 通过枚举实现的饿汉式单例
在类加载阶段保证线程安全,能保证在反序列化仍然返回单例,或在反射过程中因无法获取
<init>
而无法破坏单例enum Singleton { INSTANCE; }
# 借助静态内部类实现的懒汉式单例
静态内部类在使用时才进行类加载,为懒加载过程
public final Class Singleton { private Singleton() {} private static Class InstanceHolder { static final Singleton INSTANCE = new Singleton(); // 懒汉式,非饿汉式 } public Singleton getInstance() { return InstanceHolder.INSTANCE; // 调用时执行静态内部类的类加载,类加载阶段JVM保证对<clinit>过程加锁,且将加载的静态成员进行缓存(保证<clinit>方法只执行一次),从而实现线程安全的单例 } }
# 内存屏障与final
变量
- LoadLoad
- StoreStore
- LoadStore
- StoreLoad:在Load2及其后续的读取操作被执行前,保证Store1的写入对所有处理器可见(全能屏障,开销最大)
针对final
域的重排序规则:
final
域构造规则:在构造函数内对一个final
域进行构造,随后一个变量对其进行引用,这两个操作之间不能重排序在
final
域的写操作之后,构造函数返回之前,编译器插入一个storestore屏障,这个屏障禁止处理器把final
域的写重排序到构造函数之外初次读一个包含
final
域的对象的引用,与随后初次读这个final
域,这两个操作之间不能重排序(❓)
# 原子性(写同步问题)
- 普通变量:基本数据类型变量、引用类型变量、声明为volatile的任何类型变量的访问读写都具备原子性
- 代码块:使用
synchronized
关键字实现同步代码块
# 线程池
# Executors
实际生产环境中,使用ThreadPoolExecutor(int corePoolSize, // 1 int maximumPoolSize, // 2 long keepAliveTime, // 3 TimeUnit unit, // 4 BlockingQueue<Runnable> workQueue, // 5 ThreadFactory threadFactory, // 6 RejectedExecutionHandler handler // 7)
手动创建线程池
- 核心线程:开辟新线程执行任务
- 救急线程(非核心线程):任务数大于
corePoolSize
时,新任务加入任务队列进行阻塞。若任务队列已满,则临时开辟救急线程执行任务,存活时间为keepAliveTime
(达到存活时间时,被回收的可能是任一线程,不一定是该线程);若总线程数超过maximumPoolSize
,执行拒绝策略
Executors.newFixedThreadPool()
使用
LinkedBlockingQueue
(无界),适用于任务数量固定,执行时间较长的场景请求队列的长度为
Integer.MAX_VALUE
,可能导致大量请求堆积引发OOMExecutors.newCachedThreadPool()
使用
SynchronousQueue
,适用于任务数量不定,执行时间较短的场景允许创建的线程数量为
Integer.MAX_VALUE
,可能导致大量线程堆积引发OOMExecutors.newSingleThreadExecutor()
使用
FinalizableDelegatedExecutorService
对返回的ThreadPoolExecutor
对象进行包装,保证外界不能修改ThreadPoolExecutor
对象的线程数Executors.newScheduledThreadExecutor()
原生定时任务
# ThreadPoolExecutor
生命周期方法
execute()
/submit()
使用
submit()
方法接收Future
对象,可通过返回值(future.get()
)对任务中的异常进行捕获invokeAll()
/invokeAny()
shutdown
/shutdownNow()
prestartAllCoreThreads()
提前创建等于核心线程数的线程数量(线程池预热)
工作线程模式(异步模式、分工模式、享元模式)
饥饿现象
使用固定大小线程池,当线程数不足时导致的阻塞现象
解决方案:不同任务类型交给不同线程池进行处理,避免不同任务对线程的循环依赖
# 线程池线程数量选取
线程数量过多,可能导致系统资源竞争激烈、上下文切换频繁
线程数量过少,可能导致系统无法充分利用计算机资源
CPU密集型
$\large{CPU核心数+1}$
IO密集型
$\Large{\frac{CPU核心数\times{CPU期望利用率}\times{总时间\normalsize{(即CPU耗时+IO耗时)}}}{CPU耗时}}$
# 线程池持久化
将队列中的任务信息落库,并维护任务状态
# ForkJoinPool
适用于任务可被拆分的场景
# 并发集合类
# HashTable
/Vector
put()
/get()
方法均为synchronized
同步方法,并发度低
# Collections.Synchronized*
put()
/get()
方法内均对同一mutex
对象加入synchronized
同步锁,并发度低
# Blocking*
# LinkedBlockingQueue
# ArrayBlockingQueue
定长环形数组
# CopyOnWrite*
# CopyOnWriteArrayList
无锁读,有锁写
# Concurrent*
# ConcurrentHashMap
# 7︎⃣ “分段锁”
Segment
数组每一元素(Segment
分段,继承自ReentrantLock
)对应一把锁,每一分段可视作小型HashMap
,内部维护多个HashEntry
数组
# put()
通过哈希函数计算出当前key的hash值
通过hash值计算出所对应的
Segment
数组下标//计算 Segment 下标 (hash >>> segmentShift) & segmentMask
再通过hash值计算出所对应的
Segment
中HashEntry
数组下标//计算 HashEntry 数组下标 (tab.length - 1) & hash
尽量避免当前hash值所计算出来的
Segment
数组下标与HashEntry
数组下标趋于相同,导致分配到同一个Segment
中的元素都被分配到同一条链表上,导致链表过长,并降低并发度插入元素
ensureSegment()
scanAndLockForPut()
rehash()
# get()
# size()
基于“统计元素个数时Segment
结构未发生变化”的假设进行三次尝试,若统计失败,对所有Segment
进行强制加锁
# 8︎⃣ synchronized
+CAS
弃用Segment
分段锁,每个链表头节点(Bucket)对应一把(可优化的)同步锁,进一步提高并发度
- 使用CAS优化加锁操作,提高吞吐量
- 弱一致性(fail-safe)
# 构造方法
在构造器中进行懒惰初始化,只计算数组容量,在第一次使用时才真正创建table数组
initialCapacity
concurrencyLevel
:initialCapacity应至少与concurrencyLevel相等loadFactor
:取第一个大于initialCapacity/loadFactor值的2幂值作为数组的初始容量,最终存入SIZECTL
(默认值为16)
# put()
put(K key, V value)
内部调用putVal(key, value, false)
,每次设置值时用新值替换旧值
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 若目标键值为空抛出NPE
if (key == null || value == null) throw new NullPointerException();
// hash函数:(h ^ (h >>> 16)) & HASH_BITS; HASH_BITS = 0x7fffffff;
// 与1.8HashMap中hash函数基本一致
int hash = spread(key.hashCode());
int binCount = 0;
for(Node<K,V>[] tab = table;;) {
// Table数组为空,进行初始化
if (tab == null || tab.length == 0) {
// 使用CAS(尝试将SIZECTL设置为-1)创建Table数组,并将SIZECTL设置为下一次扩容的阈值大小
initTable();
}
// Table数组不为空,但key所在位置链表头节点(桶)为空
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null/*key位置无头节点*/) {
// 使用CAS创建链表头节点(no lock when adding to empty bin)
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
// Table数组及key所在位置链表头节点(桶)均不为空
else if ((fh = f.hash) == MOVED/*key位置有头节点,但hash值为-1(正在扩容)*/) {
tab = helpTransfer(tab, f); // 调用多个工作线程协同进行元素迁移
} else {
synchronized (f) {
if (tabAt(tab, i) == f) {
// 如果hash值大于等于0,说明是正常的链表结构
if (fh >= 0) {
binCount = 1;
// 从头结点开始遍历,每遍历一次,binCount计数加1
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 成功根据hash/key/value值找到key,进行值(新值替换旧值)
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 若遍历到了尾结点(未找到key),则把新节点尾插进去
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 判断是否为树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 操作红黑树
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 若binCount达到treeify阈值(8),尝试将链表转为红黑树(检查table容量是否超过64)
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD/*8*/)
treeifyBin(tab, i);
//把旧节点值返回
if (oldVal != null)
return oldVal;
break;
}
}
}
// 通过CAS进行哈希表元素个数计数,当binCount超过SIZECTL时进行扩容,在真正进行节点迁移时加同步锁
addCount(1L, binCount);
return null;
}
在链表/红黑树更新/插入值、链表转化为红黑树、及扩容迁移节点时加同步锁,减小锁粒度
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// 待补充
}
/**
* Helps transfer if a resize is in progress.
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根据 length 得到一个标识符号
int rs = resizeStamp(tab.length);
// 如果 tab 与 nextTab 都没有被并发修改
// 且 sizeCtl < 0 (扩容中)
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 如果 sizeCtl >>> 16 不等于 rs ( sc 前 16 位如果不等于标识符,说明标识符变化了)
// 或者 sizeCtl == rs + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc == rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。此时 sc == rs + 1)
// 或者 sizeCtl == rs + 65535 (如果达到最大帮助线程的数量,即 65535)
// 或者转移下标正在调整 (扩容结束)
// 结束循环,返回 table
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 如果以上都不是, 将 sizeCtl 加一 (表示增加了一个线程帮助其扩容)
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 进行转移
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
# get()
无加锁操作,性能较高,但不能保证读取到的是最新数据(弱一致性)
public V get(Object key) {
// ...
// 1. h确保为正整数
int h = spread(key.hashCode());
// ...
// e = tabAt(tab, (n - 1) & h)寻址
// 若hash为负值表明数组正在扩容(-1,forwardingNode)或节点为红黑树(-2,Treebin),调用e.find(h, key)方法在newTable或红黑树中继续寻找
// 若hash为正值,根据hash/key/value值遍历链表节点寻找目标数据
}
# ConcurrentLinkedQueue
# 不同场景下的最优并发集合
# JUC并发工具
底层基于AQS实现
# StampedLock
# Semaphore
# CountdownLatch
# CyclicBarrier
# CompletableFuture
# 非共享模型
JVM →