04JUC
JUC
ATOMIC
- 提供了一系列原子操作类,用于在多线程环境下实现高效、无锁的线程安全操作。这些类基于CAS(Compare and Swap)机制,避免了传统锁机制的性能开销,适用于低到中等竞争场景
核心原理
- CAS机制:通过硬件指令(如
cmpxchg
)实现无锁并发。基本流程为:- 读取内存中的当前值(current)。
- 计算新值(new)。
- 若内存值仍为current,则更新为new;否则重试或放弃。
- 实现依赖:通过
Unsafe
类调用本地方法(如compareAndSwapInt
),直接操作内存,保证原子性。
核心类及用途
基本类型原子类
AtomicInteger
/AtomicLong
/ **AtomicBoolean
**:- 原子更新整型、长整型、布尔值。
- 常用方法:
1
2
3
4get() / set() // 获取/设置值
getAndIncrement() // 原子自增(i++)
incrementAndGet() // 原子自增(++i)
compareAndSet(expected, new) // CAS操作
引用类型原子类
- **
AtomicReference<V>
**:- 原子更新对象引用。
- 示例:实现无锁栈。
- **
AtomicStampedReference<V>
**:- 解决ABA问题,通过版本号(stamp)标记引用变化。
- **
AtomicMarkableReference<V>
**:- 用布尔标记替代版本号,简化ABA问题处理。
数组原子类
AtomicIntegerArray
/AtomicLongArray
/ **AtomicReferenceArray<E>
**:- 原子更新数组中的元素。
- 示例:
array.getAndAdd(i, 5)
原子更新第i个元素。
字段更新器
AtomicIntegerFieldUpdater<T>
/AtomicLongFieldUpdater<T>
/ **AtomicReferenceFieldUpdater<T,V>
**:- 原子更新某个类的volatile字段。
- 示例:
1
2
3
4
5class Counter {
volatile int count;
}
AtomicIntegerFieldUpdater<Counter> updater =
AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");
高性能累加器
LongAdder
/DoubleAdder
:- 分散竞争热点,通过多个单元格(cells)减少并发冲突。
- 适用于高并发统计场景,性能优于
AtomicLong
。
Future
Future
是异步计算结果的容器接口
主要方法
- get() / get(long timeout, TimeUnit unit)
- 阻塞获取异步任务的结果,直到任务完成。
- 支持超时机制
- cancel(boolean mayInterruptIfRunning)
- 尝试取消任务
mayInterruptIfRunning
:为true
时,会尝试中断正在执行的任务;为false
时,允许正在运行的任务完成- 如果任务尚未开始,调用
cancel(true)
会直接取消任务。 - 如果任务已经在运行,是否能中断取决于任务是否响应中断
- isDone()
- 检查任务是否已经完成(正常完成、被取消或抛出异常)
- isCancelled()
- 检查任务是否被取消
RunnableFuture
- 典型实现:FutureTask
- 将
Callable
或Runnable
包装成一个可执行的任务,并通过线程执行。
SchedualedFuture
- 支持任务在指定延迟后执行(一次)。
- 支持周期性执行任务(固定速率或固定延迟)。
1
2
3
4
5
6
7
8
9
10
11ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 延迟执行一次
ScheduledFuture<?> future = scheduler.schedule(
() -> System.out.println("任务在 3 秒后执行"),
3, TimeUnit.SECONDS
);
// 周期性任务(固定速率)
ScheduledFuture<?> periodicFuture = scheduler.scheduleAtFixedRate(
() -> System.out.println("每 5 秒执行一次"),
0, 5, TimeUnit.SECONDS
);
CompletableFuture
- 是future在java8的增强实现、支持异步任务的链式调用、组合和回调
Future的缺点
- 回调地狱,必须在每一个 Future 完成后启动另一个 Future,这使得代码看起来像是在不断嵌套回调
- 阻塞操作,它会阻止当前线程的执行,直到异步操作完成
- 复杂的错误处理
- 无法表示任务间的复杂关联
- 无法使用 Future 来表示某个任务需要在另外两个任务都完成后才能开始,或者表示多个任务可以并行执行但是必须在一个共同的任务之前完成
核心原理
- 状态机和观察者模式
- 通过状态机管理异步任务的生命周期
- 每个 CompletableFuture 实例作为被观察者,维护一个
Completion
类型的链表(stack
),用于存储注册的观察者(回调函数)。当任务完成时,通过回调链触发后续操作,类似观察者模式
- 依赖链和流水线执行
- 通过链式调用(如
thenApply
、thenCompose
),任务被分解为多个阶段(Stage),每个阶段的结果作为下一阶段的输入。依赖链通过Completion
对象构建,确保任务按顺序或并行执行
- 通过链式调用(如
核心功能
- 任务创建
- 无返回值任务:
runAsync(Runnable)
,适用于日志记录等场景 - 有返回值任务:
supplyAsync(Supplier)
,返回CompletableFuture<T>
,支持结果传递 - 手动完成任务:
complete(T)
或completeExceptionally(Throwable)
,允许强制设置结果或异常
- 无返回值任务:
- 结果处理及转换
- 转换结果:
thenApply(Function)
,将结果映射为另一种类型 - 消费结果:
thenAccept(Consumer)
,处理结果但无返回值 - 组合任务:
- 串联:
thenCompose()
,将两个依赖任务串联(前一个结果作为后一个输入) - 并行合并:
thenCombine()
,合并两个独立任务的结果
- 串联:
- 回调
- 全部计算完成后:
CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
- 任一计算完成后:
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
- 全部计算完成后:
- 转换结果:
- 完成回调
- 完成时处理:
CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
- 完成时交给其他线程处理:
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
- 统一处理:
handle(BiFunction)
,无论成功或异常均可处理
- 完成时处理:
- 异常处理
- 捕获异常:
exceptionally(Function)
,返回备选值
- 捕获异常:
- 超时及竞态控制
- 超时机制:
orTimeout(long, TimeUnit)
,超时后抛出TimeoutException
- 竞态选择:
anyOf(CompletableFuture...)
,返回最快完成的任务结果
- 超时机制:
- 默认使用
ForkJoinPool.commonPool()
执行异步任务,可自定义线程池
ForkJoinFuture
ForkJoinTask
是Future
的子类,专为ForkJoinPool
设计,用于实现分治(Divide-and-Conquer)任务ForkJoinFuture
是ForkJoinTask
的子类,但通常直接使用ForkJoinTask
的invoke()
或fork()
/join()
方法
LOCK
API
1
2
3
4
5void lock() // 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放
void lockInterruptibly() // 和 lock()方法相似, 但阻塞的线程可中断,抛出 java.lang.InterruptedException异常
boolean tryLock() // 非阻塞获取锁;尝试获取锁,如果成功返回true
boolean tryLock(long timeout, TimeUnit timeUnit) //带有超时时间的获取锁方法
void unlock() // 释放锁
锁的分类
可重入锁和非可重入锁
- 所谓重入锁,顾名思义。就是支持重新进入的锁,也就是说这个锁支持一个线程对资源重复加锁。
synchronized关键字就是使用的重入锁。比如说,你在一个synchronized实例方法里面调用另一个本实例的synchronized实例方法,它可以重新进入这个锁,不会出现任何异常。
如果我们自己在继承AQS实现同步器的时候,没有考虑到占有锁的线程再次获取锁的场景,可能就会导致线程阻塞,那这个就是一个“非可重入锁”。ReentrantLock
的中文意思就是可重入锁
公平锁和非公平锁
- 这里的“公平”,其实通俗意义来说就是“先来后到”,也就是FIFO。如果对一个锁来说,先对锁获取请求的线程一定会先被满足,后对锁获取请求的线程后被满足,那这个锁就是公平的。反之,那就是不公平的。
一般情况下,非公平锁能提升一定的效率。但是非公平锁可能会发生线程饥饿(有一些线程长时间得不到锁)的情况。所以要根据实际的需求来选择非公平锁和公平锁。
ReentrantLock支持非公平锁和公平锁两种。
读写锁和排他锁
- 我们前面讲到的synchronized用的锁和ReentrantLock,其实都是“排它锁”。也就是说,这些锁在同一时刻只允许一个线程进行访问。
而读写锁可以在同一时刻允许多个读线程访问。Java提供了ReentrantReadWriteLock类作为读写锁的默认实现,内部维护了两个锁:一个读锁,一个写锁。通过分离读锁和写锁,使得在“读多写少”的环境下,大大地提高了性能。注意,即使用读写锁,在写线程访问时,所有的读线程和其它写线程均被阻塞。
可见,只是synchronized是远远不能满足多样化的业务对锁的要求的
关键接口
Condition (等待队列)
- 本质为不同的等待队列,await后,放入不同的等待队列,使用signalAll()唤醒所有的等待队列
- 通过可重入锁来获取不同的等待队列,将等待线程放入不同的等待队列中去等待
Objec监视器和Condition对比
- 每个对象都可以用继承自
Object
的wait/notify方法来实现等待/通知机制。而Condition接口也提供了类似Object监视器的方法,通过与Lock配合来实现等待/通知模式。 对比项 Object监视器 Condition 前置条件 获取对象的锁 调用Lock.lock获取锁,调用Lock.newCondition获取Condition对象 调用方式 直接调用,比如object.notify() 直接调用,比如condition.await() 等待队列的个数 一个 多个 当前线程释放锁进入等待状态 支持 支持 当前线程释放锁进入等待状态,在等待状态中不中断 不支持 支持 当前线程释放锁并进入超时等待状态 支持 支持 当前线程释放锁并进入等待状态直到将来的某个时间 不支持 支持 唤醒等待队列中的一个线程 支持 支持 唤醒等待队列中的全部线程 支持 支持 - Condtion中的方法
方法名称 描述 await() 当前线程进入等待状态直到被通知(signal)或者中断;当前线程进入运行状态并从await()方法返回的场景包括:(1)其他线程调用相同Condition对象的signal/signalAll方法,并且当前线程被唤醒;(2)其他线程调用interrupt方法中断当前线程; awaitUninterruptibly() 当前线程进入等待状态直到被通知,在此过程中对中断信号不敏感,不支持中断当前线程 awaitNanos(long) 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于0,可以认定就是超时了 awaitUntil(Date) 当前线程进入等待状态,直到被通知、中断或者超时。如果没到指定时间被通知,则返回true,否则返回false signal() 唤醒一个等待在Condition上的线程,被唤醒的线程在方法返回前必须获得与Condition对象关联的锁 signalAll() 唤醒所有等待在Condition上的线程,能够从await()等方法返回的线程必须先获得与Condition对象关联的锁
ReadWriteLock(读写锁)
- ReadWriteLock支持锁降级,可以加上写锁后再加读锁。但不允许锁升级,即读锁后不能加写锁
- ReetrantReadWriteLock读写锁的效率明显高于synchronized关键字
- 可重入实现:ReentranLockReadWriteLock(可重入读写锁)
- 读写锁。读锁是共享锁、写锁是独占锁
ReentrantLock(可重入锁)
- ReentrantLock是一个非抽象类,它是Lock接口的JDK默认实现
- ReentrantLock 使用起来更加灵活,可操作性也更大,但一定要在finally 中释放锁,目的是保证在获取锁之后,最终能够被释放。同时不要将获取锁的过程写在try 里面。要不然可能会发生异常导致锁释放失败
- 可以指定为公平锁 ,new ReentrantLock(true),默认为非公平的
- 非公平锁:如果当前没有线程占有锁,当前线程直接通过cas指令占有锁
- 公平锁:公平锁获取锁的方式为如果当前线程是等待队列的第一个或者等待队列为空或为当前占有线程,则通过cas指令设置state为1或增加。公平锁加锁的线程全部是按照先来后到的顺序,依次进入等待队列中排队的,不会盲目的胡乱抢占加锁。
- CAS原理实现
- ReentrantLock 是基于 Lock 实现的可重入锁,所有的 Lock 都是基于 AQS 实现的,AQS 和 Condition 各自维护不同的对象,在使用 Lock 和 Condition 时,其实就是两个队列的互相移动。它所提供的共享锁、互斥锁都是基于对 state 的操作。而它的可重入是因为实现了同步器 Sync,在 Sync 的两个实现类中,包括了公平锁和非公平锁,ReentrantLock的锁是”独占“的,也就是说,它的锁都是”排他锁“,不能共享。
- 当通过lockInterruptibly()方法获取某个锁时,如果不能获取到,只有进行等待的情况下,是可以响应中断的。
ReentrantLock与synchronized的区别
- ReentrantLock等待时可中断,当持有锁的线程长时间不释放锁的时候,等待中的线程可以选择放弃等待,转而处理其他的任务。
- ReentranLock需要显式的加锁和解锁
- ReentranLock中有多个condition队列,而synchronized只有一个等待队列
- 公平锁:synchronized和ReentrantLock默认都是非公平锁,但是ReentrantLock可以通过构造函数传参改变。只不过使用公平锁的话会导致性能急剧下降。
- 另一种说法
- 锁的实现
- synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。
- 等待可中断
- 当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。ReentrantLock 可中断,而 synchronized 不行
- 公平锁
- 公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的。
- 锁绑定多个条件
- 一个 ReentrantLock 可以同时绑定多个 Condition 对象。
- 锁的实现
ReentrantReadWriteLock
- 它是ReadWriteLock接口的JDK默认实现。它与ReentrantLock的功能类似,同样是可重入的,支持非公平锁和公平锁。不同的是,它还支持”读写锁“。
- 在“写”操作的时候,其它线程不能写也不能读。我们称这种现象为“写饥饿”, 具体情况一种是虽然很早申请写锁,但要等到所有读线程都执行后才能获取到写锁
StampedLock(读写增强锁)
StampedLock
和ReadWriteLock
相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。- 怎么避免的写饥饿问题(乐观读锁)
- 在读的时候如果发生了写,应该通过重试的方式来获取新的值,而不应该阻塞写操作。这种模式也就是典型的无锁编程思想,和CAS自旋的思想一样
- 乐观读锁的意思就是先假定在这个锁获取期间,共享变量不会被改变,既然假定不会被改变,那就不需要上锁。在获取乐观读锁之后进行了一些操作,然后又调用了validate方法,这个方法就是用来验证tryOptimisticRead之后,是否有写操作执行过,如果有,则获取一个悲观读锁,这里的悲观读锁和ReentrantReadWriteLock中的读锁类似,也是个共享锁。
- StampedLock还把读锁分为了“乐观读锁”和“悲观读锁”两种。
- 不可重入锁,不支持wait/notify机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public double distanceFromOrigin() {
// 获得一个乐观读锁
long stamp = stampedLock.tryOptimisticRead();
// 注意下面两行代码不是原子操作
// 假设x,y = (100,200)
double currentX = x;
// 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
double currentY = y;
// 此处已读取到y,如果没有写入,读取是正确的(100,200)
// 如果有写入,读取是错误的(100,400)
// 检查乐观读锁后是否有其他写锁发生
if (!stampedLock.validate(stamp)) {
// 获取一个悲观读锁
stamp = stampedLock.readLock();
try {
currentX = x;
currentY = y;
} finally {
// 释放悲观读锁
stampedLock.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
CLH LOCK
- CLH 是一种基于单向链表的高性能、公平的自旋锁。
- 核心思想是让每个线程在本地自旋前驱节点的状态,从而减少全局同步的开销,提升性能
- AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class CLHLock implements Lock {
private final ThreadLocal<CLHLock.Node> node;
// 尾节点(通过原子引用保证线程安全) 所有线程共享的
private final AtomicReference<CLHLock.Node> tail = new AtomicReference<>(new CLHLock.Node());
private static class Node {
private volatile boolean locked;
}
public CLHLock() {
// withInitial方法延迟初始化,初始值仅在每个线程首次调用 get() 方法时生成,避免不必要的资源开销
this.node = ThreadLocal.withInitial(CLHLock.Node::new);
}
@Override
public void lock() {
// Thread local 拿到节点
final Node node = this.node.get();
node.locked = true;
// 将尾节点设置为当前节点,并拿到以前的tail 节点
Node pred_node = this.tail.getAndSet(node);
// 自旋,等待加锁
while (pred_node.locked) {
}
}
@Override
public void unlock() {
// Thread local 拿到节点
final Node node = this.node.get();
node.locked = false;
// 重置当前线程的节点,复用节点以减少内存分配开销。
this.node.set(new Node());
}
} 优点 缺点 无全局竞争,减少缓存同步开销 自旋可能浪费 CPU 资源 公平锁(FIFO 顺序) 不可重入 适用于高并发短任务场景 节点频繁创建可能影响性能
MCS LOCK
- 它也是一种基于链表的可扩展、高性能、公平的自旋锁,
- 与 CLH LOCK不同。它是真的有下一个节点 next,添加这个真实节点后,它就可以只在本地变量上自旋,而 CLH LOCK是前驱节点的属性上自旋
- 该实现适合需要高吞吐量的同步场景,但需要注意自旋锁的特性:在锁持有时间较长的场景可能不如阻塞锁高效。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public class MCSLock implements Lock {
// 队列尾节点(使用原子引用保证线程安全)
private AtomicReference<MCSLock.Node> tail = new AtomicReference<>(null);
private ThreadLocal<MCSLock.Node> node;
private static class Node {
private volatile boolean locked = false;
// 下一个等待节点(原子引用保证可见性)
private volatile Node next = null;
}
public MCSLock() {
// 每个线程持有一个独立的节点(避免竞争)
node = ThreadLocal.withInitial(Node::new);
}
// 加锁操作
public void lock() {
MCSNode node = myNode.get();
node.locked = true; // 当前线程准备获取锁
// 将当前节点添加到队列尾部,并获取前驱节点
MCSNode pred = tail.getAndSet(node);
if (pred != null) {
pred.next.set(node); // 前驱节点指向当前节点
// 自旋等待前驱节点释放锁
while (node.locked) {
Thread.yield(); // 避免过度消耗CPU
}
}
}
/ 解锁操作
public void unlock() {
MCSNode node = myNode.get();
// 检查是否有后继节点
if (node.next.get() == null) {
// 尝试将尾节点置空(没有其他等待者)
if (tail.compareAndSet(node, null)) {
return;
}
// 等待后继节点链接建立
while (node.next.get() == null) {
Thread.yield();
}
}
// 唤醒后继节点
node.next.get().locked = false;
node.next.set(null); // 清理引用,便于GC
}
}
Ticket Lock
- Ticket Lock 是一种基于票据的公平自旋锁机制,灵感来源于银行排队叫号系统。它通过分配递增的票据号来保证线程获取锁的顺序严格按照请求的先后顺序(FIFO),从而避免饥饿现象,适用于需要严格公平性的场景。
- 核心原理
- 票据分配:每个线程获取锁时,会获得一个全局递增的票据号(类似排队号)。
- 服务号检查:锁内部维护一个当前服务号,线程需等待直到自己的票据号与服务号匹配才能获取锁。
- 公平性保障:严格按照票据号顺序分配锁,先到先得
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class TicketLock implements Lock {
//记录当前允许获取锁的票据号。
private AtomicInteger serviceCount = new AtomicInteger(0);
//记录已分配的票据号。
private AtomicInteger ticketCount = new AtomicInteger(0);
private final ThreadLocal<Integer> owner = new ThreadLocal<>();
@Override
public void lock() {
int myTicket = ticketCount.getAndIncrement(); // 获取当前票据号
while (serviceCount.get() != myTicket) { // 自旋等待
// 可加入 Thread.yield() 或短时睡眠减少 CPU 占用
}
}
@Override
// 释放锁
public void unlock() {
serviceCount.incrementAndGet(); // 递增服务号,通知下一个线程
}
}
LockSupport(较为底层)
- park和wait的区别
park
不需要获取某个对象的锁- 因为中断的时候
park
不会抛出InterruptedException
异常,所以需要在park
之后自行判断中断状态,然后做额外的处理1
2
3
4
5
6
7
8
9
10public static void park(Object blocker); // 暂停当前线程,blocker的作用是在dump线程的时候看到阻塞对象的信息
public static void parkNanos(Object blocker, long nanos); // 暂停当前线程,不过有超时时间的限制
public static void parkUntil(Object blocker, long deadline); // 暂停当前线程,直到某个时间
public static void park(); // 无期限暂停当前线程
public static void parkNanos(long nanos); // 暂停当前线程,不过有超时时间的限制
public static void parkUntil(long deadline); // 暂停当前线程,直到某个时间
public static void unpark(Thread thread); // 恢复指定线程,如果调用时当前线程的还未进入park,则下一次park该线程不会受阻塞,所以顺序不对时,不会造成死锁
public static Object getBlocker(Thread t);
//调用方式
LockSupport.park();
AQS(AbstractQueuedSynchronizer)
- 上图中有颜色的为Method,无颜色的为Attribution。
- 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。
通过 队列管理线程的阻塞与唤醒,并通过 状态变量(state)控制资源访问
AQLS
- AQS里面的“资源”是用一个
int
类型的数据来表示的,有时候我们的业务需求资源的数量超出了int
的范围,所以在JDK 1.6 中,多了一个AQLS(AbstractQueuedLongSynchronizer)。它的代码跟AQS几乎一样,只是把资源的类型变成了long
类型。
- AQS里面的“资源”是用一个
AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的同步器,比如ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。
核心组件
同步状态
- AQS内部使用了一个volatile int的变量state来作为资源的标识。
- 通过 CAS(Compare-And-Swap)操作保证线程安全。
- 对于状态的操作提供了三个原子性的方法
- getState():获取锁的标志state值
- setState(): 设置
- compareAndSetState(int,int)
等待队列
CLH 变体
它内部使用了一个先进先出(FIFO)的双端队列,并使用了两个指针head和tail用于标识队列的头部和尾部
队列中每个节点(
Node
)封装一个线程及其等待状态(如是否被取消)。
FIFO队列中的Node
- node的数据结构为双向链表,队列中的元素为Node,Node为保存线程引用和线程状态的容器。每个线程对同步器的访问都可以看作是队列中的一个节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40Node {
// 标记一个结点(对应的线程)在共享模式下等待
static final Node SHARED = new Node();
// 标记一个结点(对应的线程)在独占模式下等待
static final Node EXCLUSIVE = null;
// waitStatus的值,表示该结点(对应的线程)已被取消
static final int CANCELLED = 1;
// waitStatus的值,表示线程已经准备好了,就等资源释放了
static final int SIGNAL = -1;
// waitStatus的值,表示该结点(对应的线程)在等待某一条件,节点线程等待唤醒
static final int CONDITION = -2;
/*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
static final int PROPAGATE = -3;
// 等待状态,取值范围,-3,-2,-1,0,1
volatile int waitStatus;
volatile Node prev; // 前驱结点
volatile Node next; // 后继结点
volatile Thread thread; // 结点对应的线程
Node nextWaiter; // 等待队列里下一个等待条件的结点
// 判断共享模式的方法
final boolean isShared() {
return nextWaiter == SHARED;
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 其它方法忽略,可以参考具体的源码
}
private Node addWaiter(Node mode) {
// 使用了Node的这个构造函数
Node node = new Node(Thread.currentThread(), mode);
// 其它代码省略
}- waitStatus表示节点的状态
- CANCELLED,值为1,表示当前结点已取消调度,当timeout或者被中断(响应中断),会触发变更此状态,进入该状态后的节点不会再变化。
- SIGNAL,值为-1,表示当前节点的后继节点在等待唤醒,当后继节点入队时,会将前继节点的状态更新为(-1)SIGNAL
- CONDITION,值为-2,线程等待在条件变量队列中。当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE,值为-3,共享模式,表示当前场景下后续的acquireShared(获得共享)能够得以执行。共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
- 值为0,表示当前节点在sync队列中,等待着获取锁。新节点入队时的默认状态
- 可以简单的通过waitStatus释放小于等于0,来判断是否是CANCELLED状态
- prev:前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
- next : 后继调用
- nextWaiter:存储condition队列中的后继节点。
- 通过Node我们可以实现两个队列,一是通过prev和next实现CLH队列(线程同步队列,双向队列),二是nextWaiter实现Condition条件上的等待线程队列(单向队列),这个Condition主要用在ReentrantLock类中。
- waitStatus表示节点的状态
模板方法模式
- AQS的设计是基于模板方法模式,子类需实现
tryAcquire
、tryRelease
等方法定义资源获取/释放逻辑。AQS 负责队列管理和线程阻塞/唤醒。 - 关键方法
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。int为获取锁的次数
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。int是释放封锁的次数
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。int为获取锁的次数
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。int是释放封锁的次数
- 注意:
- 这些方法虽然都是
protected
方法,但是它们并没有在AQS具体实现,而是直接抛出异常(这里不使用抽象方法的目的是:避免强迫子类中把所有的抽象方法都实现一遍,减少无用功,这样子类只需要实现自己关心的抽象方法即可,比如 Semaphore 只需要实现 tryAcquire 方法而不用实现其余不需要用到的模版方法)
- 这些方法虽然都是
获取资源
- 入口是acquire(int arg)方法。arg是要获取的资源的个数,在独占模式下始终为1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 首先用tryAcquire(arg)尝试去获取资源
//如果获取资源失败,就通过addWaiter(Node.EXCLUSIVE)方法把这个线程插入到等待队列中。其中传入的参数代表要插入的Node是独占式的
private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// 将Node插入队列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用CAS尝试,如果成功就返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS失败,再自旋CAS插入
enq(node);
return node;
}
// 自旋CAS插入等待队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 现在通过addWaiter方法,已经把一个Node放到等待队列尾部了。而处于等待队列的结点是从头结点一个一个去获取资源的
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果node的前驱结点p是head,表示node是第二个结点,就可以尝试去获取资源了
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该结点。
// 所以head所指的结点,就是当前获取到资源的那个结点或null。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过取消状态的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
} - 获取资源的方法除了acquire外,还有以下三个:
- acquireInterruptibly:申请可中断的资源(独占模式)
- acquireShared:申请共享模式的资源
- acquireSharedInterruptibly:申请可中断的资源(共享模式)
释放资源
1 |
|
AQS的两种模式
- 独占模式(Exclusive):资源是独占的,一次只能一个线程获取。如ReentrantLock。
- 共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定。如Semaphore/CountDownLatch。
- 子类只需要根据需求实现其中一种模式,当然也有同时实现两种模式的同步类,如
ReadWriteLock
独占模式
独占模式:同一时刻只允许一个线程获取同步状态
获取同步状态:
多线程并发获取(修改)同步状态, 修改同步状态成功的线程标记为拥有同步状态
获取同步状态失败的线程,加入到同步队列的队尾;加入到队列中后,如果当前节点的前驱节点为头节点再次尝试获取同步状态
如果头节点的下一个节点尝试获取同步状态失败后,会进入等待状态;其他节点则继续自旋。
释放同步状态
当线程执行完相应逻辑后,需要释放同步状态,使后继节点有机会同步状态(让出资源,让排队的线程使用)。这时就需要调用release(int arg)方法。调用该方法后,会唤醒后继节点。释放同步状态,唤醒后继节点
后继节点获取同步状态成功,头节点出队。需要注意的事,出队操作是间接的,有节点获取到同步状态时,会将当前节点设置为head,而原本的head设置为null。
其他竞争状态
当同步队列中头节点唤醒后继节点时,此时可能有其他线程尝试获取同步状态。
假设获取成功,将会被设置为头节点。
头节点后续节点获取同步状态失败。
共享模式
共享模式和独占模式最主要的区别是是否支持同一时刻有多个线程同时获取同步状态
共享模式下的共享队列
获取同步状态
首先至少要调用一次tryAcquireShared(arg)方法,如果返回值大于等于0表示获取成功。
当获取锁失败时,则创建一个共享类型的节点并进入一个同步队列,然后进入队列中进入自旋状态(阻塞,唤醒两种状态来回切换,直到获取到同步状态为止)
当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,否则继续挂起等待。
当一个同享节点获取到同步状态,并唤醒后面等待的共享状态的结果如下图所示:
最后,获取到同步状态的线程执行完毕,同步队列中只有一个独占节点:
释放同步状态
- 释放同步状态后,同步队列的变化过程和共享节点获取到同步状态后的变化过程一致
AQS实现
类 | 作用 |
---|---|
Semaphore | 限制线程的数量 |
Exchanger | 两个线程交换数据 |
CountDownLatch | 线程等待直到计数器减为0时开始工作 |
CyclicBarrier | 作用跟CountDownLatch类似,但是可以重复使用 |
Phaser | 增强的CyclicBarrier |
CountDownLatch
- 使一个线程等待其他线程各自执行完毕后再执行。
- 通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
- 主要方法
1
2
3
4
5
6//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//将count值减1
public void countDown() { }; - demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public static void main(String[] args) throws InterruptedException {
// 定义工作线程数量
int threadCount = 5;
// 创建 CountDownLatch,初始计数为线程数量
CountDownLatch latch = new CountDownLatch(threadCount);
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
// 提交任务到线程池
for (int i = 0; i < threadCount; i++) {
final int taskId = i + 1;
executor.submit(() -> {
try {
// 模拟任务执行时间(0~2秒随机)
Thread.sleep((long) (Math.random() * 2000));
System.out.println("任务-" + taskId + " 执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 计数器减1(无论任务是否异常,必须执行)
latch.countDown();
}
});
}
System.out.println("主线程等待所有任务完成...");
// 主线程阻塞,直到计数器归零
latch.await();
System.out.println("所有任务已完成,主线程继续执行");
// 关闭线程池
executor.shutdown();
}
CyclicBarrier(循环屏障)
- 让所有线程都等待完成后才会继续下一步行动。在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒
- CountDownLatch和CyclicBarrier区别:
1.countDownLatch是一个计数器,线程完成一个记录一个,计数器递减,只能只用一次
2.CyclicBarrier的计数器更像一个阀门,需要所有线程都到达,然后继续执行,计数器递增,提供reset功能,可以多次使用 - 如果参与者(线程)在等待的过程中,Barrier被破坏,就会抛出BrokenBarrierException。可以用
isBroken()
方法检测Barrier是否被破坏。- 如果有线程已经处于等待状态,调用reset方法会导致已经在等待的线程出现BrokenBarrierException异常。并且由于出现了BrokenBarrierException,将会导致始终无法等待。
- 如果在等待的过程中,线程被中断,会抛出InterruptedException异常,并且这个异常会传播到其他所有的线程。
- 如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出BrokenBarrierException,屏障被损坏。
- 如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线程会抛出BrokenBarrierException异常。
- demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67// 阶段任务数据模板(模拟数据处理)
private static class PhaseTask implements Runnable {
private final int threadId;
private final CyclicBarrier barrier;
private final int phases;
public PhaseTask(int threadId, CyclicBarrier barrier, int phases) {
this.threadId = threadId;
this.barrier = barrier;
this.phases = phases;
}
@Override
public void run() {
try {
// 分阶段处理任务(共 phases 个阶段)
for (int phase = 1; phase <= phases; phase++) {
// 模拟阶段处理耗时
Thread.sleep((long) (Math.random() * 1000));
System.out.printf("线程-%d 完成阶段%d 的数据处理\n", threadId, phase);
// 等待其他线程到达屏障点
barrier.await();
// 所有线程到达屏障后继续执行(此处可添加阶段完成后的逻辑)
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 定义线程数和阶段数
final int threadCount = 3;
final int phases = 2;
// 创建 CyclicBarrier,指定线程数和屏障点回调(汇总动作)
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("\n----- 所有线程到达屏障,触发汇总操作 -----\n");
});
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
// 提交任务
for (int i = 0; i < threadCount; i++) {
executor.submit(new PhaseTask(i + 1, barrier, phases));
}
// 关闭线程池(非阻塞)
executor.shutdown();
}
/**
线程-1 完成阶段1 的数据处理
线程-3 完成阶段1 的数据处理
线程-2 完成阶段1 的数据处理
----- 所有线程到达屏障,触发汇总操作 -----
线程-2 完成阶段2 的数据处理
线程-1 完成阶段2 的数据处理
线程-3 完成阶段2 的数据处理
----- 所有线程到达屏障,触发汇总操作 -----
**/
phaser(阶段控制)
- 用来解决控制多个线程分阶段共同完成任务的情景问题
- 关键方法
1
2
3onAdvance() //每个阶段完成后回调的方法
arriveAndAwaitAdvance() //阶段性完成等待
arriveAndDeregister() // 完成任务并注销 - DEMO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70public class PhaserDemo {
static class PreTaskThread implements Runnable {
private String task;
private Phaser phaser;
public PreTaskThread(String task, Phaser phaser) {
this.task = task;
this.phaser = phaser;
}
@Override
public void run() {
for (int i = 1; i < 4; i++) {
try {
// 第二次关卡起不加载NPC,跳过
if (i >= 2 && "加载新手教程".equals(task)) {
continue;
}
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d,需要加载%d个模块,当前模块【%s】",
i, phaser.getRegisteredParties(), task));
// 从第二个关卡起,不加载NPC
if (i == 1 && "加载新手教程".equals(task)) {
System.out.println("下次关卡移除加载【新手教程】模块");
phaser.arriveAndDeregister(); // 移除一个模块
} else {
phaser.arriveAndAwaitAdvance();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(4) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(String.format("第%d次关卡准备完成", phase + 1));
return phase == 3 || registeredParties == 0;
}
};
new Thread(new PreTaskThread("加载地图数据", phaser)).start();
new Thread(new PreTaskThread("加载人物模型", phaser)).start();
new Thread(new PreTaskThread("加载背景音乐", phaser)).start();
new Thread(new PreTaskThread("加载新手教程", phaser)).start();
}
}
//输出:
//关卡1,需要加载4个模块,当前模块【加载背景音乐】
//关卡1,需要加载4个模块,当前模块【加载新手教程】
//下次关卡移除加载【新手教程】模块
//关卡1,需要加载3个模块,当前模块【加载地图数据】
//关卡1,需要加载3个模块,当前模块【加载人物模型】
//第1次关卡准备完成
//关卡2,需要加载3个模块,当前模块【加载地图数据】
//关卡2,需要加载3个模块,当前模块【加载背景音乐】
//关卡2,需要加载3个模块,当前模块【加载人物模型】
//第2次关卡准备完成
//关卡3,需要加载3个模块,当前模块【加载人物模型】
//关卡3,需要加载3个模块,当前模块【加载地图数据】
//关卡3,需要加载3个模块,当前模块【加载背景音乐】
//第3次关卡准备完成
//这里要注意关卡1的输出,在“加载新手教程”线程中调用了arriveAndDeregister()减少一个party之后,后面的线程使用getRegisteredParties()得到的是已经被修改后的parties了。但是当前这个阶段(phase),仍然是需要4个parties都arrive才触发屏障的。从下一个阶段开始,才需要3个parties都arrive就触发屏障。
semaphore(限流)
- 用于线程的流量控制,Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。
- 默认使用的是非公平锁
1
2public void acquire(); //获取许可,也可以获取指定数量的许可
public void release();//释放许可,也可以释放指定数量的许可
exchanger
- 用于两个线程之间交换信息,可简单地将Exchanger对象理解为一个包含两个格子的容器,通过exchanger方法可以向两个格子中填充信息。当两个格子中的均被填充时(如果只有一个格子被填充,回阻塞直到另一个格子填充),该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public class ExchangerDemo {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
System.out.println("这是线程A,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程A的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
Thread.sleep(1000);
new Thread(() -> {
try {
System.out.println("这是线程B,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程B的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
//输出:
//这个时候线程A是阻塞的,在等待线程B的数据
//这是线程B,得到了另一个线程的数据:这是来自线程A的数据
//这是线程A,得到了另一个线程的数据:这是来自线程B的数据
ForkJoin框架
- Fork/Join框架是一个实现了ExecutorService接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。
- 分解汇总的任务,用很少的线程可以执行很多的任务(子任务)TPE做不到先执行子任务,CPU密集型
- 工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。每个线程都维护了一个双端队列,用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。例如下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。
- 初始时可以设置最大工作线程数、工作线程工厂、拒绝任务的Handler和同步模式
工作窃取算法
- 工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。
- 当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。
Fork/Join的具体实现
ForkJoinTask
- ForkJoinTask是一个类似普通线程的实体,但是比普通线程轻量得多。