Java并发

# 基础API

# Thread.sleep()
while(true) {
  try {
    Thread.sleep(50);
  } catch (InterruptedException e) {}
}

此处可避免while() {}循环空转消耗cpu性能,使用sleep()yield()适当让出cpu使用权

适用于无须锁同步的场景

# <thread1>.join()

调用线程进行阻塞,同步等待thread1执行结束

Thread t1; // 休眠1s
Thread t2; // 休眠2s

t1.join();
t2.join(); // 只需额外等待1s(两线程与主线程同时开始执行,主线程分别只对两线程的结束时机进行同步等待)
# <thread1>.interrupt()
  1. 正常运行的线程被打断后,打断标记isInterrupted()true(意味着可通过该标记进行后续处理)
  2. 通过调用sleep()wait()join()等方法处于阻塞状态的线程被打断后,打断标记将被自动清空,置为false(意味着线程已进入异常,该标记已失去作用,可在catch块中重新设置标记为true
  3. 通过调用park()方法处于阻塞状态的线程被打断后,打断标记为true;若不手动重置该标记为false(静态方法Thread.interrupt()),则后续的park()操作均会失效

二阶段终止模式实现监控流程

graph TD
w("while(true)") --> a
a("isInterrupted()") -- true --> b("释放资源")
b --> c("结束循环")
a -- false --> d("sleep()")
d -- 无异常 --> e("执行监控业务")
d -- 出现异常 --> i("设置打断标记")
i --> w
e --> w
class TwoPhaseTermination {
  	private Thread monitor;
  
  	public void startMonitor() {
   		monitor = new Thread(() -> {
     		while (true) {
        		Thread currentThread = Thread.currentThread();
        		if (currentThread.isInterrupted()) {
          		/*
          		释放资源
          		*/
          		break;
        		}
        
        		try {
          		Thread.sleep(2000);
          		/*
          		执行监控业务
          		*/
        		} catch(InterruptedException e) {
          		currentThread.interrupt(); // 阻塞过程中被打断,需手动再次设置打断标记(重新打断)
        		}
      		}
    		});
    		monitor.start();
  	}
  
  	public void stopMonitor() {
    		monitor.interrupt();
  	}
}

# 共享内存模型:管程(悲观锁,阻塞式)

避免临界区竞态条件发生的多种方式

# synchronized

  • 通过对象锁保证临界区内代码的原子性
  • synchronized锁为可重入锁
  • 对象模型:
    • 重入计数器❓
    • Contention List
    • Entry List
    • Wait Set
    • On Deck

# 局部变量和共享变量的线程安全问题

  • 不同线程对共享变量进行并发增删引发线程安全问题;而对于局部变量,不同线程分别拥有一份自己的局部变量实例,此时不引发线程安全问题
  • 若在父类public方法/非final方法/abstract方法传入或返回对象局部变量引用了对象,并逃离该栈帧,即发生引用泄漏),则子类可对父类方法进行重写(外星方法),以实现对该局部变量的并发增删(方法内部开启线程对变量进行操作),此时局部变量成为共享变量,仍将引发线程安全问题
  • Java Web环境
public class UserServiceImpl implements UserService {
  // 无状态,线程安全
  private UserDao userDao = new UserDao();
  
  // 非线程安全
  private Integer count = 0;
  
  public void update() {
    // ...
    count++;
  }
}
@Aspect
@Component
public class CustomAspect { // 单例
  // 非线程安全
  private Long start = 0L;
  
  @Before("execution(* *(..))")
  public void before() {
    start = System.nanoTime();
  }
  
  @After("execution(* *(..))")
  public void after() {
    Long end = System.nanoTime();
    // ...
  }
}

解决方式:采用环绕通知,将计时器用作局部变量

# synchronized锁的优化

# 轻量级锁

多线程访问没有竞争现象,优先使用轻量级锁

  1. 创建锁记录(Lock Record)对象

  2. 锁记录对象中Object Reference指向加锁对象,并尝试通过CAS将锁记录对象中地址字段ptr_to_lock_record:30 | 00)与加锁对象的Mark Word(非加锁状态下末两位为01)替换

    (若CAS失败,进行锁的膨胀;或进行轻量级锁重入,额外维护一个新的锁记录对象信息)

  3. (解锁)通过CAS进行解锁,还原加锁对象的Mark Word(若CAS失败,执行重量级锁的解锁流程)

# 偏向锁(轻量级锁的优化)

在加锁对象的Mark Word字段中加入当前线程的ID信息,当同一线程再次对该对象进行加锁时避免反复的CAS操作;当其他线程对该对象进行加锁时撤销该对象的偏向状态

  • 批量重偏向:当反复出现新线程对该对象进行加锁的情况时,不再直接撤销该对象的偏向状态,而是修改该类所有新实例对象的偏向状态为偏向新线程(偏向撤销阈值:20)
  • 批量偏向撤销:当再次反复出现另一新线程对该对象进行加锁的情况时,不再修改该对象的偏向状态,而是直接禁用该类所有新实例对象的偏向资格(偏向撤销阈值:40)

因对象的Mark Word字段在不可偏向时才包含hashCode信息(在可偏向时替换为偏向线程ID与epoch信息),故显式调用加锁对象的hashCode()方法将撤销该对象的偏向状态(1 | 01 -> 0 | 01,倒数第三位为偏向状态,1为可偏向,0为不可偏向)

# 重量级锁

  1. 为加锁对象申请Monitor锁,将加锁对象的轻量级锁记录地址字段再次替换为Monitor锁地址(ptr_to_monitor:30 | 10
  2. 当前线程进入Monitor锁的Entry List,进入**BLOCKED**状态
  3. (解锁)通过加锁对象的Monitor锁地址信息定位Monitor锁,设置Monitor锁的Owner对象为空,并唤醒Monitor锁Entry List中阻塞的线程(非公平锁)

自旋优化

# 锁消除

当加锁对象未逃逸局部代码块的作用范围(不可能成为共享变量)时,JIT将锁消除

# 线程等待与通知

# (Object)wait()/notify()

# 底层原理

在锁对象上调用wait()前必须确保获取对象的锁(在同步代码块中进行),并发生锁的膨胀(即只使用重量级锁),当前线程进入Monitor锁的Wait Set进行等待(park())并释放锁;在锁对象上调用notify()前也必须确保获取对象的锁(在同步代码块中进行),在退出同步块时对应线程被真正唤醒(unpark())后回到Entry List中进行阻塞,直至再次竞争锁成功,继续执行wait()后代码

  • notifyAll()按照LIFO顺序唤醒Entry List中阻塞的线程
// 线程A,消费者
synchronized(lock) {
  while (condition) { // 使用while循环而非if,从而在线程被唤醒后保证等待条件不满足时才继续执行,以避免虚假唤醒
  	lock.wait(timeout);
	}
  // 执行正常流程
}

// 线程B,生产者
synchronized(lock) {
  /*
  lock.notify()将随机唤醒Entry List中的线程,故可能造成虚假唤醒
  */
  lock.notifyAll();
}
# 同步模式之保护性暂停(Guarded Suspension)模式

当线程t1等待线程t1的执行结果,可分别关联同一个GuardedObject

Object lock;
Object response;

public Object get(long timeout) {
	synchronized(lock) {
		// 记录初始时间
		while (response == null) {
			// 判断消耗时间passedTime是否超过timeout,若是则退出循环
			/*
			当被虚假唤醒后重新进入循环时,应该继续等待的时间为timeout - passedTime
			*/
			this.wait(timeout - passedTime);
		}
	}
	return response;
}

public void produce(Object rawObject) {
	synchronized(lock) {
		response = rawObject;
		this.notifyAll();
	}
}
  • 一个生产者通过GuardedObject对象严格对应一个消费者
  • 相比于join()(原理一致),上述模式中消费者无需等待生产者线程完全结束就能消费资源
# 异步模式之异步队列模式(生产者/消费者)
LinkedList list;
int capacity;

public Object get() {
	synchronized(list) {
 		while (list.isEmpty()) {
      list.wait();
 		}

    Object o = list.removeFirst();
    list.notifyAll();
    return o;
	}
	return null;
}

public void put(Object rawObject) {
	synchronized(list) {
 		while (list.size() == capacity) {
      list.wait();
 		}
    
    list.addLast();
    list.notifyAll();
	}
}

# (ConditionObject)await/signal

img

// 借助await/signal机制实现生产者/消费者模型

ReentrantLock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();

// 线程A,消费者
new Thread(() -> {
	lock.lock();
	try {
  	while (/*执行条件A不满足*/) {
			condition1.await();
		}
  	// 执行业务操作
	} finally {
  	lock.unlock();
	}
}).start();

// 线程B,生产者
new Thread(() -> {
	lock.lock();
	try {
  	// 更改执行条件A
  	condition1.signal();
	} finally {
  	lock.unlock();
	}
}).start();

# 底层原理

ConditionObject条件对象上(对当前线程)调用await(),当前线程若竞争lock成功,则尾插入进AQS等待队列中进行等待(park())并释放lock;在ConditionObject条件对象上调用signal()将该线程转移至AQS阻塞队列中进行阻塞,直至再次竞争lock成功后被唤醒(unpark()),继续执行await()后代码

  • signalAll()按照FIFO顺序唤醒AQS阻塞队列中阻塞的线程
# await()
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1. 将当前线程包装成Node,尾插入到AQS等待队列中
    Node node = addConditionWaiter();
    // 2. 释放当前线程所占用的lock,在释放的程中会唤醒AQS阻塞队列中的下一个节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 3. 当前线程进入到等待状态
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4. 自旋等待获取同步状态/lock(成功获取lock是退出await()的充要条件)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 5. 处理中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
# signal()

将AQS等待队列中等待时间最长的节点移动到AQS阻塞队列中

public final void signal() {
    //1. 先检测当前线程是否已经获取lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2. 获取AQS等待队列中头节点,将其移动至AQS阻塞队列中等待再次获取lock从而被唤醒
    Node first = firstWaiter;
    if (first != null)
        // doSignal(Node)中的transferForSignal(Node)方法实现对头节点的状态检查、队列转移与(后续的)线程unpark
        doSignal(first);
}

Object.wait()/Object.notify()Condition.await()/Condition.signal()区别

前者与Monitor配合完成线程间的等待/通知机制,是java底层级别的;而后者与Lock配合完成等待/通知机制,是语言级别的,具有更高的可控性和扩展性

  • 后者支持不响应中断,而前者不支持

  • 后者支持多个等待队列(借助多个ConditionObject对象实现),而前者只支持一个

  • 后者支持超时时间的设置,而前者不支持

# Lock锁(java.util.concurrent.locks.*

void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
Condition newCondition();
  • 无参构造器默认构造非公平锁
  • 支持在多个条件变量上进行等待,在唤醒时可按照不同的条件变量进行精确唤醒

# AbstractQueuedSynchronizer(AQS)

Lock等同步组件主要专注于实现同步语义,而AQS负责同步状态管理,线程排队、等待与唤醒等底层操作

# 核心组件

  • State同步状态

    通过State体现加锁状态与重入次数

    子类通过getState()/setState()/compareAndSetState()重写AQS用来改变同步状态的若干protected方法

    对同步状态的获取/释放可分为独占模式共享模式

  • # AbstractQueuedSynchronizer$Node双向阻塞队列

    阻塞队列/同步队列(双向链表,FIFO队列)

    volatile int waitStatus; // 节点状态
    volatile Node prev; // 当前节点/线程的前驱节点
    volatile Node next; // 当前节点/线程的后继节点
    volatile Thread thread; // 加入同步队列的线程引用
    
    transient volatile Node head;
    transient volatile Node tail;
    
    • 等待队列/条件队列(单向链表):ConditionObject

      transient volatile Node firstWaiter; // 等待队列中的头节点
      transient volatile Node lastWaiter; // 等待队列中的尾节点
      Node nextWaiter; // 等待队列中的下一个节点
      

*Lock -> Lock -> AbstractQueuedSynchronizer -> Sync

# 模板方法设计模式

tryAcquire(int)等方法被*Sync子类重写(其中*Sync子类为*Lock子类的静态内部类),当*Sync子类调用acquire(int)等AQS模板方法时就会调用已被本类重写的方法;同步组件通过AQS提供的模板方法实现同步语义

  • tryAcquire(int):独占式获取与释放同步状态
  • tryRelease(int)
  • tryAcquireShared(int):共享式获取与释放同步状态
  • tryReleaseShared(int)
  • isHeldExclusively():查询同步队列中等待线程情况

同步器是实现锁的关键:利用同步器可实现锁的语义。锁面向使用者,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态的管理,线程的排队,等待和唤醒等底层操作。锁和同步器针对使用者和实现者实现了关注点分离。

# 独占锁/共享锁的获取与释放

# acquire(int)
public final void acquire(int arg) {
  // 尝试获取State,获取失败时进入阻塞队列
  if (!tryAcquire() &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
    selfInterrupt();
  }
}

// ...

private Node addWaiter(Node mode) {
    // 1. 基于当前线程构建Node节点
    Node node = new Node(Thread.currentThread(), mode);
    // 2. 查看当前尾节点是否为空
    Node pred = tail;
    if (pred != null) {
        // 将当前节点以“CAS尾插”的方式插入阻塞队列中
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 当前阻塞队列尾节点为null,说明当前线程是第一个加入阻塞队列进行等待的线程:
    // 1. 调用compareAndSetHead(new Node())方法,完成阻塞队列头结点的初始化
		// 2. 自旋不断尝试CAS尾插入直至成功为止
    enq(node);
    return node;
}

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
          	// 若前驱节点为头节点,再次尝试获取State
            if (p == head && tryAcquire(arg)) {
              	// 获取State成功,队列头指针指向当前节点并释放前驱节点
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
          	// 获取State失败,线程进入等待状态(通过CAS将节点状态由INITIAL设置成SIGNAL)
            if (shouldParkAfterFailedAcquire(p, node))
                // parkAndCheckInterrupt()借助LockSupport.park()真正实现线程阻塞
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

img

# release(int)
public final boolean release(int arg) {
    // 尝试释放State
    if (tryRelease(arg)) {
        Node h = head;
        // 释放State成功,唤醒队列头节点后继节点所引用线程(FIFO队列),其中借助LockSupport.unpark()真正实现线程唤醒
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

acquireInterruptibly(int)

tryAcquireNanos(int)

# acquireShared(int)
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
# releaseShared(int)

# ReentrantLock及其实现原理

# 锁的竞争

# 非公平锁(默认)

线程第一次获取锁时不对Node阻塞队列进行检查,直接通过CAS尝试争抢State

// Class NonFairSync

final void lock() {
  if (compareAndSetState(0, 1)) {
    setExclusiveOwnerThread(Thread.currentThread());
  } else {
    acquire(1);
  }
}

  1. 若前驱节点为头节点,tryAcquire()再次尝试获取锁

    1. 获取锁成功

      1. 将前驱节点置为空,当前节点置为头节点;

        waitStatus设置为1,Sync对象的exclusiveOwnerThread设置为当前节点所关联线程

    2. 获取锁失败

      1. 将前驱节点的waitStatus设置为-1(前驱节点有责任对本节点进行唤醒操作);

        tryAcquire()再次尝试获取锁,随后调用LockSupport.park()进行阻塞

  2. 释放锁:waitStatus设置为0,(若waitStatus为-1)对阻塞队列中下一节点进行唤醒操作

# 公平锁

线程第一次获取锁时检查Node阻塞队列中是否有前驱节点(检查虚拟头节点的next节点所关联线程是否为本线程),若没有则通过CAS尝试争抢State

# 线程的打断

acquire()

线程被打断时,仍停留在Node阻塞队列中,获取锁时能主动执行selfInterrupt()自我中断

acquireInterruptibly()

线程被打断时,仍停留在Node阻塞队列中,获取锁时将直接抛出InterruptedException

# 线程的等待与唤醒

持锁线程调用await()进行等待时:

  1. addConditionWaiter进入等待队列尾部,节点waitStatus改为-2,进行park阻塞
  2. fullyRelease()完全释放AQS上的可重入锁
  3. 对阻塞队列下一个节点进行unpark

持锁线程调用signal()进行唤醒时:

  1. doSignal(first)对等待队列队首线程进行唤醒

  2. transferForSignal()将唤醒线程加入阻塞队列尾部,节点waitStatus改为0,而其前驱节点的waitStatus设置为-1

    若操作失败(如该线程状态已被设置为取消),则尝试唤醒等待队列下一节点所关联线程

  3. 对所唤醒线程进行unpark

# ReentrantReadWriteLock及其实现原理

  • 写锁支持条件变量,而读锁不支持

  • 升级与降级

    • 读锁在重入时不可升级为写锁,此时会发生死锁
    • 写锁在重入时可降级为读锁
    class CachedData {
      Object data;
      volatile boolean isCacheValid;
      final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
      
      void processCachedData() {
        lock.readLock().lock();
        try (!isCacheValid) {
          // 本地缓存失效,改为获取写锁更新缓存
          // 获取写锁前需释放读锁
          lock.readLock().unlock();
          lock.writeLock().lock();
          try (!isCacheValid) { // 双重检查
            // set data
            isCacheValid = true;
          }
          
          // 将写锁降级为读锁
          lock.readLock().lock();
        } finally {
          lock.writeLock().unlock();
        }
        
        // 真正使用数据
        try {
          // use data
        } finally {
          lock.readLock().unlock();
        }
      }
    }
    

# 实现原理

# LockSupport.park()/LockSupport.unpark()

AQS底层通过park()/unpark()实现线程的状态切换

# park()

  1. 检查Parker对象的_counter字段
  2. _counter字段值为0,获取mutex,线程进入_condition条件变量进行阻塞

# unpark()

  1. Parker对象的_counter字段置为1
  2. 尝试唤醒_condition条件变量中的指定线程

线程开始运行时,设置_counter字段为0


# 共享内存模型:无锁(乐观锁,非阻塞式)

# CAS

CAS操作保证原子性,CAS实现类中借助volatile获取最新值进行比较,以实现线程安全

CAS操作适合短时间内即可获取到锁的场景,若线程需长时间等待锁释放,优先使用悲观锁

# 原子变量(java.util.concurrent.atomic.*

# AtomicInteger/AtomicLong/AtomicBoolean
# AtomicReference
AtomicReference<BigDecimal> balance;

// constructor

BigDecimal getBalance() {
  return balance.get();
}

void withdrawCash(BigDecimal amount) {
  while(true) { // 使用乐观锁实现多个组合操作的原子性(BigDecimal单个方法为线程安全)
    BigDecimal prev = balance.get();
    BigDecimal next = prev.subtract(amount);
    if (balance.compareAndSet(prev, next)) {
      break;
    }
  }
}

ABA问题

  • AtomicStampedReference
  • AtomicMarkableReference
# AtomicIntegerArray
AtomicIntegerArray states = new AtomicIntegerArray(size);

// ...
while (true) {
  for (int i = 0; i < size; i++) {
    if (states.get(i) == 0) {
      if (states.compareAndSet(i, 0, 1)) {
        // 获取数组中某元素锁,进行操作(如用于自定义连接池返回可用连接)
      }
    }
  }
  
  // 获取数组中某元素锁,等待,避免循环空转消耗性能(需长时间等待锁释放,使用悲观锁)
  synchronized(this) {
    this.wait();
    // ...
  }
}
# AtomicFieldUpdator
# LongAdder

LongAdder源码分析

  • CAS锁

# Unsafe

# 不可变类

  • DateTimeFormatter
  • String
# 不可变类对象的设计

String类的保护性(防御性)拷贝:在有可能对本对象发生修改时,创建新对象返回,因此是线程安全的

  • String#substring()

    未解决对象实例创建过于频繁的问题,引入享元模式

    • 基本类型包装类

      在包装类对象创建之前,内部Cache对象已经将需缓存的对象全部创建完成

      // Use the archived cache if it exists and is large enough
      if (archivedCache == null || size > archivedCache.length) {
      	Integer[] c = new Integer[size];
      	int j = low;
      	for(int i = 0; i < c.length; i++) {
      		c[i] = new Integer(j++);
      	}
      	archivedCache = c;
      }
      cache = archivedCache;
      
      // range [-128, 127] must be interned (JLS7 5.1.7)
      assert IntegerCache.high >= 127;
      
      • ByteShortLong:-128~127
      • Integer:-128~127(最大值可通过-Djava.lang.Integer.IntegerCache.high调整)
      • Character:0~127
      • Boolean:true、false
    • 字符串常量池

    • BigDecimal/BigInteger


# JMM

Java内存模型(Java Memory Model,JMM)定义在Java虚拟机规范中,用于屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的并发效果。JMM规范了Java虚拟机与计算机内存是如何协同工作的:规定了一个线程如何和何时可以看到由其他线程修改过后的共享变量的值,以及在必须时如何同步访问共享变量。

JVM内存分区与硬件内存架构之间存在差异。硬件内存架构没有区分线程栈和堆:线程栈和堆可能都分布于主内存中,部分线程栈和堆还可能出现在CPU寄存器与高速缓存中:

img

因此,JMM还定义了JVM内存分区与硬件内存架构之间的抽象关系:

线程

​ |

工作内存(对应CPU寄存器与高速缓存)

​ |

主内存(对应主存)

  1. 针对主存、高速缓存及CPU寄存器之间的具体交互细节,常用的缓存一致性协议:MSI、MESI(IllinoisProtocol)、MOSI、Synapse、Firefly及DragonProtocol

  2. 针对主内存与工作内存之间的具体交互细节,即一个变量如何从主内存拷贝到工作内存,如何从工作内存同步到主内存,JMM定义了八种操作:

    (lock-unlock)->read->load->use->assign->store->write

# 可见性(读同步问题)

# volatile

保证代码块内共享变量的可见性(每次只从主存,而非工作内存中读取),但不能保证代码块执行的原子性,故多用于“一写多读”的场景

  • LOAD前缀指令实现读写屏障
  • MESI缓存一致性与总线嗅探
  • synchronized可保证代码块的执行原子性及变量可见性(强制刷新工作内存),但一般为重量级操作,性能较低
  • System.out.print()语句内部有加锁操作,在IO操作后强制刷新工作内存,因此若代码块内有输出操作可不将共享变量声明为volatile

二阶段终止模式实现监控流程

// 非监控线程调用stopMonitor()方法,两共享变量必须在线程间保证可见性
private volatile boolean isStarted;
private volatile boolean isStopped;
private Thread monitor;

public void startMonitor() {
if (!isStarted) {
 synchronized(this) { // 只需将读写共享变量部分纳入同步代码块(缩小同步范围,提升并发性能)
		if (isStarted) { // balking(犹豫)模式
 		return;
		}
		isStarted = true;
		} 
}

/*
以下代码不会被多个线程并发执行,故无需加锁,因此所用到的共享变量需声明为volatile
*/
	monitor = new Thread(() -> {
	while (!isStopped) {
  	try {
    	Thread.sleep(2000);
    	/*
    	执行监控业务
    	*/
  	} catch(InterruptedException e) {
     // 借助isStopped共享变量实现循环优雅退出,不再依赖打断标记
   }
	}

 /*
 释放资源
 */
 isStarted = false;
	});
	monitor.start();
}

public void stopMonitor() {
isStopped = true;
	monitor.interrupt(); // 显式打断,以更快结束可能存在的sleep()
}

# ThreadLocal

提供线程内部的局部变量,不同的线程间访问自身变量不会互相干扰(“空间(变量副本)换时间(并行执行)”);作用于线程的整个生命周期,降低同一线程内多方法/组件间公共变量传递的复杂度

  • 数据传递:独立保存各个线程绑定的数据,可在同一线程内多方法/组件间安全传递,避免额外传递参数所造成的代码耦合
  • 线程隔离:各个线程绑定的数据相互隔离,但又支持并发存取,避免引入同步代码块所造成的性能损失
# ThreadLocal使用场景
  • 多线程场景下,跨service-dao层传递原生JDBC Connection,使用ThreadLocal保证同一线程内两层间传递同一JDBC Connection数据库连接池实现思路

    static ThreadLocal<Connection> threadLocal = new ThreadLocal<>();
    
    private static final DataSource ds = new SomeDataSource();
    
    public static Connection getConnection() throws Exception {
      Connection conn = threadLocal.get();
      if (conn == null) {
        conn = ds.newConnection();
        threadLocal.set(conn);
      }
      return conn;
    }
    
# ThreadLocalMap

k:ThreadLocal对象 - v:该对象对应的变量副本

# 以上kv键值对设计的优势
  • ThreadLocal对象而非Thread对象本身作为key,每个Map存储的Entry数量降低
  • ThreadLocalMap对象由Thread对象本身进行维护,当Thread销毁时,ThreadLocalMap对象随之销毁
classDiagram
  class ThreadLocalMap
  ThreadLocalMap : +set(ThreadLocal<?> key, T value)
  ThreadLocalMap : +remove(ThreadLocal<?> key)
  ThreadLocalMap : +getEntry(ThreadLocal<?> key)
  ThreadLocalMap : +int INITIAL_CAPACITY
  ThreadLocalMap : +int size
  ThreadLocalMap : +int threshold
  ThreadLocalMap : +Entry[] table
# ThreadLocal常用方法
# ThreadLocal.set(T value)
  1. 获取当前线程对象t与线程对象内Map(t.threadLocals,即ThreadLocalMap对象)
  2. 若所获取的Map不为空,调用ThreadLocalMap.set(ThreadLocal<?> key, T value)为以当前ThreadLocal对象作为key的ThreadLocalMap.Entry赋值;否则调用createMap(Thread t, T firstValue)创建线程内Map并赋予初始值
# ThreadLocal.get()
  1. 获取当前线程对象t与线程对象内Map
  2. 若所获取的Map不为空,调用ThreadLocalMap.getEntry(ThreadLocal<?> key)获取以当前ThreadLocal对象作为key的ThreadLocalMap.Entry,进而获取ThreadLocalMap.Entry.value返回
  3. 若获取的Map或Entry为空,调用ThreadLocal.setInitialValue()进行Map赋值或Map初始化(同ThreadLocal.set(T value)流程第二步)后返回
# ThreadLocal.remove()
  1. 获取当前线程对象t与线程对象内Map
  2. 若所获取的Map不为空,调用ThreadLocalMap.remove(ThreadLocal<?> key)移除以当前ThreadLocal对象作为key的ThreadLocalMap.Entry

引用链:Thread -> ThreadLocalMap -> ThreadLocalMap.Entry => ThreadLocal

ThreadLocalMap.Entry继承WeakReference<ThreadLocal<?>>,以便实现**ThreadLocal生命周期与线程生命周期的解绑**

# ThreadLocal可能产生的内存泄漏问题

(某线程运行时)若不及时对该线程中不再使用的ThreadLocal变量threadLocalA进行显式remove()操作,虽然该线程对应ThreadLocalMap的key值(即threadLocalA变量本身)将在本次GC时被置为空(弱引用),但ThreadLocalMap.EntryThreadLocalMap.Entry.value均无法立即得到释放(强引用),且后者再无主动访问机会(key为空),造成内存泄漏

(即使未进行显式remove()操作)下一次执行ThreadLocalMap.getEntry(..)/set(..)方法时将对空key进行检查与清理(expungeStaleEntry(int staleSlot)),故在一定情况下,所发生的内存泄漏将被自动解决弱引用的好处

因此,需在ThreadLocal变量使用结束后显式调用ThreadLocal.remove()对应Entry进行手动删除,以便实现**ThreadLocalMap生命周期与线程生命周期的解绑**,彻底杜绝内存泄漏(显式remove()操作的好处

# Hash冲突
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    table = new Entry[INITIAL_CAPACITY]; // INITIAL_CAPACITY = 16,可看作环形数组
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); // hash函数(开放定址法❓):借助AtomicInteger实现,每次计算hash时自增(getAndAdd)0x61c88647,以维持均匀哈希
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY); // 扩容threshold设置为INITIAL_CAPACITY的2/3
}

private void set(ThreadLocal<?> key, Object value) {
    // We don't use a fast path as with get() because it is at
    // least as common to use set() to create new entries as
    // it is to replace existing ones, in which case, a fast
    // path would fail more often than not.
  // 相比于getEntry(..)方法直接根据key值定位Entry后采取快速失败策略,set(..)方法采用开放定址·线性探测(环形)法解决hash冲突,直至触发Entry清理与扩容
}

# 有序性

# 使用volatile修饰单例变量以防止指令重排序
  • synchronized不能阻止指令重排序,但只要所使用的局部变量不逃离同步代码块的作用范围,则虽有重排序却不会造成有序性问题
# as-if-serial语义

不论如何重排序,程序的执行结果不能与单线程执行结果发生冲突。(编译器、runtime和处理器都必须遵守as-if-serial语义)

# happens-before规则

在JMM中,如果一个操作执行的结果需要对另一个操作可见(两个操作既可以是在一个线程之内,也可以是在不同线程之间),那么两操作之间必须满足happens-before规则:

  1. 程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作
  2. 监视器锁定规则:对一个锁的解锁操作,happens-before于任意后续对这个锁的加锁操作
  3. volatile域规则:对一个volatile域的写操作,happens-before于任意后续对这个volatile域的读操作
  4. 传递性规则:如果A happens-before B,且B happens-before C,那么A happens-before C
  5. 线程启动、中断、终结规则
  6. 对象终结规则

以下场景,对共享变量的写操作对其读操作(本线程或其他线程)可见:

  • 使用synchronized分别对写操作和读操作加锁
  • volatile共享变量分别进行写操作和读操作
  • 对共享变量的写操作发生在读线程启动之前
  • 读操作阻塞直至写操作线程执行结束(Thread.join())或被打断

# 单例模式最佳实践

# balking(犹豫)模式(配合同步代码块)实现懒汉式单例
public final Class Singleton {
	private Singleton() {}

	private static Singleton INSTANCE = null;

	public static synchronized Singleton getInstance() {
		if (INSTANCE != null) {
   			return INSTANCE:
 		}
 		INSTANCE = new Singleton();
 		return INSTANCE;
	}
}
# 锁粒度优化

缩小同步代码块范围,使用volatile修饰单例对象避免同步代码块重排序,同时使用double-checked locking(DCL,双重检查、双检锁)实现懒汉式单例

private static volatile Singleton INSTANCE = null;

public static Singleton getInstance() {
 	// 同步代码之外的共享变量可能存在有序性问题
 	if (INSTANCE == null) {
		synchronized(Singleton.class) {
     		if (INSTANCE == null) {
       		INSTANCE = new Singleton();
           return INSTANCE;
     		}
   		}
 	}
 	return INSTANCE;
}

其他单例模式

  • # 通过反序列化过程实现的饿汉式单例
    • 返回单例使用final防止子类继承破坏单例
    • 实现Serializable接口,可重写Object readResolve()方法防止反序列化过程返回多例
    public final class Singleton implements Serializable {
     	private Singleton() {}
    
      private static final Singleton INSTANCE = new Singleton();
    
      public static Singleton getInstance() {
        return INSTANCE;
      }
    
      public Object readResolve() {
      	return instance;
    	}
    }
    
  • # 通过枚举实现的饿汉式单例

    在类加载阶段保证线程安全,能保证在反序列化仍然返回单例,或在反射过程中因无法获取<init>而无法破坏单例

    enum Singleton {
      INSTANCE;
    }
    
  • # 借助静态内部类实现的懒汉式单例

    静态内部类在使用时才进行类加载,为懒加载过程

    public final Class Singleton {
    	private Singleton() {}
    
    	private static Class InstanceHolder {
     		static final Singleton INSTANCE = new Singleton(); // 懒汉式,非饿汉式
    	}
    
    	public Singleton getInstance() {
     		return InstanceHolder.INSTANCE; // 调用时执行静态内部类的类加载,类加载阶段JVM保证对<clinit>过程加锁,且将加载的静态成员进行缓存(保证<clinit>方法只执行一次),从而实现线程安全的单例
    	}
    }
    

# 内存屏障与final变量

  • LoadLoad
  • StoreStore
  • LoadStore
  • StoreLoad:在Load2及其后续的读取操作被执行前,保证Store1的写入对所有处理器可见(全能屏障,开销最大)

针对final域的重排序规则:

  • final域构造规则:在构造函数内对一个final域进行构造,随后一个变量对其进行引用,这两个操作之间不能重排序

    final域的写操作之后,构造函数返回之前,编译器插入一个storestore屏障,这个屏障禁止处理器把final域的写重排序到构造函数之外

  • 初次读一个包含final域的对象的引用,与随后初次读这个final域,这两个操作之间不能重排序(❓)

# 原子性(写同步问题)

  • 普通变量:基本数据类型变量、引用类型变量、声明为volatile的任何类型变量的访问读写都具备原子性
  • 代码块:使用synchronized关键字实现同步代码块

# 线程池

# Executors

实际生产环境中,使用ThreadPoolExecutor(int corePoolSize, // 1 int maximumPoolSize, // 2 long keepAliveTime, // 3 TimeUnit unit, // 4 BlockingQueue<Runnable> workQueue, // 5 ThreadFactory threadFactory, // 6 RejectedExecutionHandler handler // 7)手动创建线程池

  • 核心线程:开辟新线程执行任务
  • 救急线程(非核心线程):任务数大于corePoolSize时,新任务加入任务队列进行阻塞。若任务队列已满,则临时开辟救急线程执行任务,存活时间为keepAliveTime达到存活时间时,被回收的可能是任一线程,不一定是该线程);若总线程数超过maximumPoolSize,执行拒绝策略
  • Executors.newFixedThreadPool()

    使用LinkedBlockingQueue(无界),适用于任务数量固定,执行时间较长的场景

    请求队列的长度为Integer.MAX_VALUE,可能导致大量请求堆积引发OOM

  • Executors.newCachedThreadPool()

    使用SynchronousQueue,适用于任务数量不定,执行时间较短的场景

    允许创建的线程数量为Integer.MAX_VALUE,可能导致大量线程堆积引发OOM

  • Executors.newSingleThreadExecutor()

    使用FinalizableDelegatedExecutorService对返回的ThreadPoolExecutor对象进行包装,保证外界不能修改ThreadPoolExecutor对象的线程数

  • Executors.newScheduledThreadExecutor()

    原生定时任务

# ThreadPoolExecutor生命周期方法

  • execute()/submit()

    使用submit()方法接收Future对象,可通过返回值(future.get())对任务中的异常进行捕获

  • invokeAll()/invokeAny()

  • shutdown/shutdownNow()

  • prestartAllCoreThreads()

    提前创建等于核心线程数的线程数量(线程池预热)

工作线程模式(异步模式、分工模式、享元模式)

饥饿现象

使用固定大小线程池,当线程数不足时导致的阻塞现象

解决方案:不同任务类型交给不同线程池进行处理,避免不同任务对线程的循环依赖

# 线程池线程数量选取

线程数量过多,可能导致系统资源竞争激烈、上下文切换频繁

线程数量过少,可能导致系统无法充分利用计算机资源

  • CPU密集型

    $\large{CPU核心数+1}$

  • IO密集型

    $\Large{\frac{CPU核心数\times{CPU期望利用率}\times{总时间\normalsize{(即CPU耗时+IO耗时)}}}{CPU耗时}}$

# 线程池持久化

将队列中的任务信息落库,并维护任务状态

# ForkJoinPool

适用于任务可被拆分的场景


# 并发集合类

# HashTable/Vector

put()/get()方法均为synchronized同步方法,并发度低

# Collections.Synchronized*

put()/get()方法内均对同一mutex对象加入synchronized同步锁,并发度低

# Blocking*

# LinkedBlockingQueue

# ArrayBlockingQueue

定长环形数组

# CopyOnWrite*

# CopyOnWriteArrayList

无锁读,有锁写

# Concurrent*

# ConcurrentHashMap

# 7︎⃣ “分段锁”

Segment数组每一元素(Segment分段,继承自ReentrantLock)对应一把锁,每一分段可视作小型HashMap,内部维护多个HashEntry数组

# put()
  1. 通过哈希函数计算出当前key的hash值

  2. 通过hash值计算出所对应的Segment数组下标

    //计算 Segment 下标
    (hash >>> segmentShift) & segmentMask
    
  3. 再通过hash值计算出所对应的SegmentHashEntry数组下标

    //计算 HashEntry 数组下标
    (tab.length - 1) & hash
    

    尽量避免当前hash值所计算出来的Segment数组下标与HashEntry数组下标趋于相同,导致分配到同一个Segment中的元素都被分配到同一条链表上,导致链表过长,并降低并发度

  4. 插入元素

ensureSegment()

scanAndLockForPut()

rehash()

# get()
# size()

基于“统计元素个数时Segment结构未发生变化”的假设进行三次尝试,若统计失败,对所有Segment进行强制加锁

# 8︎⃣ synchronized+CAS

弃用Segment分段锁,每个链表头节点(Bucket)对应一把(可优化的)同步锁,进一步提高并发度

  • 使用CAS优化加锁操作,提高吞吐量
  • 弱一致性(fail-safe)
# 构造方法

在构造器中进行懒惰初始化,只计算数组容量,在第一次使用时才真正创建table数组

  • initialCapacity
  • concurrencyLevel:initialCapacity应至少与concurrencyLevel相等
  • loadFactor:取第一个大于initialCapacity/loadFactor值的2幂值作为数组的初始容量,最终存入SIZECTL(默认值为16)
# put()

put(K key, V value)内部调用putVal(key, value, false),每次设置值时用新值替换旧值

final V putVal(K key, V value, boolean onlyIfAbsent) {
  // 若目标键值为空抛出NPE
  if (key == null || value == null) throw new NullPointerException();
  // hash函数:(h ^ (h >>> 16)) & HASH_BITS;  HASH_BITS = 0x7fffffff;
  // 与1.8HashMap中hash函数基本一致
  int hash = spread(key.hashCode());
  int binCount = 0;
  for(Node<K,V>[] tab = table;;) {
    // Table数组为空,进行初始化
   	if (tab == null || tab.length == 0) {
      // 使用CAS(尝试将SIZECTL设置为-1)创建Table数组,并将SIZECTL设置为下一次扩容的阈值大小
      initTable();
    }
    // Table数组不为空,但key所在位置链表头节点(桶)为空
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null/*key位置无头节点*/) {
      // 使用CAS创建链表头节点(no lock when adding to empty bin)
      if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
         break;
    }
    // Table数组及key所在位置链表头节点(桶)均不为空
    else if ((fh = f.hash) == MOVED/*key位置有头节点,但hash值为-1(正在扩容)*/) {
      tab = helpTransfer(tab, f); // 调用多个工作线程协同进行元素迁移
    } else {
      synchronized (f) {
    		if (tabAt(tab, i) == f) {
     		// 如果hash值大于等于0,说明是正常的链表结构
     		if (fh >= 0) {
      		binCount = 1;
      		// 从头结点开始遍历,每遍历一次,binCount计数加1
      		for (Node<K,V> e = f;; ++binCount) {
       			K ek;
       			// 成功根据hash/key/value值找到key,进行值(新值替换旧值)
       			if (e.hash == hash &&
        			((ek = e.key) == key ||
         			(ek != null && key.equals(ek)))) {
        				oldVal = e.val;
        				if (!onlyIfAbsent)
         					e.val = value;
        					break;
       				}
       				Node<K,V> pred = e;
       				// 若遍历到了尾结点(未找到key),则把新节点尾插进去
       				if ((e = e.next) == null) {
        				pred.next = new Node<K,V>(hash, key, value, null);
        				break;
       				}
      			}
     			}
     			// 判断是否为树节点
     			else if (f instanceof TreeBin) {
      			Node<K,V> p;
      			binCount = 2;
            // 操作红黑树
      			if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
       				oldVal = p.val;
       				if (!onlyIfAbsent)
        				p.val = value;
      			}
     			}
    		}
   		}
      
      // 若binCount达到treeify阈值(8),尝试将链表转为红黑树(检查table容量是否超过64)
      if (binCount != 0) {
    		if (binCount >= TREEIFY_THRESHOLD/*8*/)
     			treeifyBin(tab, i);
    		//把旧节点值返回
    		if (oldVal != null)
     			return oldVal;
    		break;
   		}
    }
  }
  
  // 通过CAS进行哈希表元素个数计数,当binCount超过SIZECTL时进行扩容,在真正进行节点迁移时加同步锁
  addCount(1L, binCount);
  return null;
}

链表/红黑树更新/插入值链表转化为红黑树、及扩容迁移节点时加同步锁,减小锁粒度

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  // 待补充
}

/**
 * Helps transfer if a resize is in progress.
 */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // 根据 length 得到一个标识符号
        int rs = resizeStamp(tab.length);
        // 如果 tab 与 nextTab 都没有被并发修改
        // 且 sizeCtl  < 0 (扩容中)
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            // 如果 sizeCtl >>> 16 不等于 rs ( sc 前 16 位如果不等于标识符,说明标识符变化了)
            // 或者 sizeCtl == rs + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc == rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。此时 sc == rs + 1)
            // 或者 sizeCtl == rs + 65535 (如果达到最大帮助线程的数量,即 65535)
            // 或者转移下标正在调整 (扩容结束)
            // 结束循环,返回 table
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 如果以上都不是, 将 sizeCtl 加一 (表示增加了一个线程帮助其扩容)
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 进行转移
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}
# get()

无加锁操作,性能较高,但不能保证读取到的是最新数据(弱一致性

public V get(Object key) {
  // ...
  // 1. h确保为正整数
  int h = spread(key.hashCode());
  // ...
  // e = tabAt(tab, (n - 1) & h)寻址
  // 若hash为负值表明数组正在扩容(-1,forwardingNode)或节点为红黑树(-2,Treebin),调用e.find(h, key)方法在newTable或红黑树中继续寻找
  // 若hash为正值,根据hash/key/value值遍历链表节点寻找目标数据
}

# ConcurrentLinkedQueue

# 不同场景下的最优并发集合

# JUC并发工具

底层基于AQS实现

# StampedLock

# Semaphore

# CountdownLatch

# CyclicBarrier

# CompletableFuture


# 非共享模型