java多线程

JVM的资源调度是以线程进行的,这可以更好的利用cpu的资源,所以需要我们对java多线程进行熟练掌握。

线程定义

线程是一个基本的CPU执行单元,也是程序执行流的最小单元。

在java中,线程既是一个java.lang.Thread类的一个实例,也指运行中的线程。

线程状态

在Java中,线程可为以下状态:

  • New 线程未调用start方法,对应Thread.State.NEW
  • Runnable 就绪,等待调度程序运行。当调用start方法后进入此状态。对应Thread.State.RUNNABLE
  • Running 线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。这也是线程进入运行状态的唯一一种方式。
  • Blocked 等待/阻塞/睡眠。线程此时仍然存活,但因缺少运行条件不能进入就绪状态。可因其他事件导致条件满足返回就绪状态。对应Thread.State.BLOCKED/Thread.State.WAITING/Thread.State.TIMED_WAITING
  • Dead 线程的run()方法完成,对应Thread.State.TERMINATED

线程状态转换

使用jstack命令可以打印Java线程栈信息,其与java状态对应如下表。

线程对应关系

线程分为用户线程以及守护线程。一般的,守护线程用于gc等后台非必要操作。在jvm启动后,以main()为入口开启线程作为主线程。当所有的非守护线程消亡后,jvm即可推出,同时关闭守护线程。

线程安全

线程安全指在并发的情况之下,该代码经过多线程使用,线程的调度顺序不影响任何结果。下面展示了一段非线程安全的案例,因编译器和处理器指令重排序导致得不到想要结果。
语句重排序
更详细的内容可参照Oracle的JSR133文档。

互斥机制

monitor

在操作系统进程我们使用信号量(semaphore)互斥量(mutex)来控制进入临界区的多个进程。在使用互斥量机制时非常容易出错,因为我们需要去亲自操作变量以及对进程进行阻塞和唤醒。而java使用了monitor机制来实现对临界区的访问管理,而无法进入monitor临界区的线程,它们则被阻塞,并且在必要的时候会被唤醒。并且对外屏蔽掉这些机制,并且在内部实现这些机制,使得使用monitor的人看到的是一个简洁易用的接口。

使用monitor机制的目的主要是为了互斥进入临界区;在java中,可以采用synchronized关键字来修饰实例方法、类方法以及代码块,而被synchronized关键字修饰的方法、代码块,就是monitor机制的临界区。synchronized如果修饰实例方法,则默认对象是this;若修饰类方法,则对象为this.class

java中的java.lang.Object类充当着维护互斥量以及定义wait/signal API来管理线程的阻塞和唤醒的角色。java.lang.Object类定义了 wait(),notify(),notifyAll()方法,它们原理如下图:

ObjectMonitor

当一个线程需要获取Object的锁时,会被放入EntrySet中进行等待。如果该线程通过竞争获取了该锁,则成为了该锁的The Owner。如果Owner发现因为某些原因使得程序无法进行下去(如消费者发现资源队列为空时),可以通过wait释放锁并进入WaitSet,这可以使得其他需要该锁的线程可以重新竞争该锁。当条件得到满足时该进程又可加入到锁的竞争中去。

wait/notify必须存在于synchronized块中。并且,这三个关键字针对的是同一个监视器(某对象的监视器)。这意味着wait之后,其他线程可以进入同步块执行。

使用synchronized/wait/notify实现的一个典型的生产者消费者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
public class ProducerConsumer {

static final int MIN_STORGE = 0;
static final int MAX_STORGE = 10;

/**
* 产品
* @field producerName 生产者名称
* @filed value 随机值
* @author lyyljs
*
*/
static class Product{
private String producerName;
private long value;

Product(String producerName, long value){
this.producerName = producerName;
this.value = value;
}

public String getProducerName() {
return producerName;
}

public void setProducerName(String producerName) {
this.producerName = producerName;
}

public long getValue() {
return value;
}

public void setValue(long value) {
this.value = value;
}

@Override
public String toString() {
return "Product [生产者:" + producerName + ", value=" + value + "]";
}

}

/**
* 消费者
* @field name 消费者名称
* @filed queue 商品库,临界资源
* @author lyyljs
*
*/
static class Consumer implements Runnable{

private String name;
private List<Product> queue;

Consumer(String name, List<Product> queue){
this.name = name;
this.queue = queue;
}

void consume(String name){
synchronized(queue){//对queue加锁,防止其他线程操作
if (queue.size() <= MIN_STORGE){
try {
System.out.println(name + "尝试消费,但缺货");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}

return;
}

Product p = queue.remove(0);
System.out.println(name + "消费了一个商品" + p);
queue.notifyAll();
}
}

public void run() {
while (true){
consume(name);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

/**
* 生产者
* @field name 生产者名称
* @field queue 商品库,临界资源
* @author lyyljs
*
*/
static class Producer implements Runnable{

private String name;
private List<Product> queue;

Producer(String name, List<Product> queue){
this.name = name;
this.queue = queue;
}

void produce(String name){
synchronized (queue){//对queue加锁,防止其他线程操作
if (queue.size() >= MAX_STORGE){
try {
System.out.println(name + "尝试生产,但满仓");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}

return;
}

queue.add(new Product(name, ThreadLocalRandom.current().nextLong(100)));
System.out.println(name + "生产了一个商品");
queue.notifyAll();
}
}

public void run() {
while (true){
produce(name);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args){
List<Product> list = new ArrayList<Product>();
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 3; i++){
pool.submit(new Consumer("consumer_" + i, list));
pool.submit(new Producer("producer_" + i, list));
}

}
}

其部分执行结果:

1
2
3
4
5
6
7
8
consumer_0尝试消费,但缺货
producer_0生产了一个商品
producer_2生产了一个商品
producer_1生产了一个商品
consumer_2消费了一个商品Product [生产者:producer_0, value=96]
consumer_1消费了一个商品Product [生产者:producer_2, value=68]
consumer_2消费了一个商品Product [生产者:producer_1, value=74]
consumer_1尝试消费,但缺货

线程内存

thread_JMM

  • 主内存(Main Memory):所有变量的存储位置。直接对应于物理硬件的内存。
  • 工作内存(Working Memory):每条线程还有自己的工作内存,用于保存被该线程使用到的变量的主内存副本拷贝。为了获取更好的运行速度,虚拟机可能会让工作内存优先存储于寄存器和高速缓存中。
volatile

多线程在对同一个变量进行操作的时候很有可能产生不可预知的结果(一个线程修改了这个值,但是之后在另一个线程看到的是修改之前的值)。此时volatile可以解决可见性问题。volatile保证了对一个volatile变量的写操作先行发生于后面对这个变量的读操作。本质上,volatile既是不去取工作内存的缓存值而是直接去取主内存的值。在线程安全的情况下会牺牲性能。详细参见volatile关键字解惑

ThreadLocal

ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量,这意味着线程修改该副本不会影响到其他副本。常见的用法有:存储单个线程上下文信息。比如存储id等,示例如dubbo调用链中生成id追踪调用链;使变量线程安全。变量既然成为了每个线程内部的局部变量,自然就不会存在并发问题了,示例如数据库session连接管理。

ThreadLocal里类型的变量,其实是放入了当前Thread里。每个Thread都有一个threadLocals。threadLocals是一个ThreadLocalMap。TreadLocalMap是一个Map,它以ThreadLocal本身为Key(这里实际是ThreadLocal的弱引用WeakReference<ThreadLocal<?>>),value为Object。当设置一个ThreadLocal变量时,这个map里就多了一对ThreadLocal -> Object的映射。

ThreadLocal

使用ThreadLocal时需要注意可能存在的内存泄露问题,见深入分析 ThreadLocal 内存泄漏问题

常用类与接口

Thread

  • Runnable

最常见的创建一个线程的方法即实现该接口,通过run()启动线程。

1
2
3
4
5
6
7
8
9
10
class Producer implements Runnable{
public void run() {
// do something;
}
}

public static void main(String[] args){
Producer thread = new Producer();
thread.run();//启动线程
}
  • Thread

Thread类实现了Runnable接口,并提供了丰富的方法。

  • start() 自定义线程启动,同runnable的run()
  • yield() 线程让步,先检测当前是否有相同优先级的线程处于同可运行状态,如有,则把CPU的占有权交给次线程,否则继续运行原来的线程。如持有锁则不会释放锁。
  • sleep(TimeMillis) 使当前线程休眠指定时间(ms)。同yield如持有锁则不会释放锁。
  • join(TimeMillis) JDK文档里描述为Waits for this thread to die.用于等待指定线程完成后再继续执行。从源码里可以看出实际上是执行了在不断检测线程存活并wait()。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public final synchronized void join(long millis)
    throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
    throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
    while (isAlive()) {//判断线程是否存活
    wait(0);
    }
    } else {
    while (isAlive()) {
    long delay = millis - now;
    if (delay <= 0) {
    break;
    }
    wait(delay);
    now = System.currentTimeMillis() - base;
    }
    }
    }

wait()和sleep():wait()是Object的方法,且会释放锁让其他线程可以竞争锁;sleep()是Thread的方法,其只将线程暂停,让其他线程得到执行的机会,如果持有锁则并不会释放锁。

  • Callable,Future,FutureTask

不同于Runnable没有返回值,Callable可以拿到返回结果。使用Future/FutureTask获取Callable的返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
	static class Task implements Callable<Integer>{

public Integer call() throws Exception {
System.out.println("child thread sleeping");

TimeUnit.SECONDS.sleep(1L);//同Thread.sleep(1000)

System.out.println("child thread executing");

int sum = 0;

for (int i = 0; i <= 1000; i++) {
sum += i;
}

return sum;
}

}

public static void main(String[] args){
Task task = new Task();

//使用Future获取返回结果
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result = executor.submit(task);
executor.shutdown();

//使用FutureTask获取返回结果,因为FutureTask继承RunnableFuture接口,所以可以直接new Thread(FutureTask),这里实际使用了new Thread(Runnable)
// FutureTask<Integer> result = new FutureTask<Integer>(task);
// Thread thread = new Thread(result);
// thread.start();

System.out.println("master thread executing");

if (!result.isDone()){
System.out.println("child thread has not done");
}

if (!result.isCancelled()){
System.out.println("child thread has not cancelled");
}

try {
Integer sum = result.get();
System.out.println("result:" + sum);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

执行结果:

1
2
3
4
5
6
master thread executing
child thread has not done
child thread has not cancelled
child thread sleeping
child thread executing
result:500500
  • CountDownLatch

类似于join;不同于join,调用join方法需要等待thread执行完毕才能继续向下执行,而CountDownLatch只需要检查计数器的值为零就可以继续向下执行,相比之下,CountDownLatch更加灵活一些,可以实现一些更加复杂的业务场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final CountDownLatch cd = new CountDownLatch(SIZE);
for (int i = 0; i < SIZE; i++){
new Thread(() -> {
try {
//do something
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
cd.countDown();//减去计数
}
}).start();
}
try {
cd.await();//阻塞主线程,等待所有子线程执行完毕
} catch (InterruptedException e) {
//deal Exception
}
//子线程任务都完成后继续主线程逻辑
  • CyclicBarrier

可以让一组线程等待至某个状态之后再全部同时执行。如await有指定时间且到达时间有线程未就绪则其它就绪线程抛出BrokenBarrierException继续执行后面任务。

1
2
3
4
5
6
7
8
9
10
11
12
final CyclicBarrier cb = new CyclicBarrier(SIZE);
for (int i = 0; i < SIZE; i++){
new Thread(() -> {
try {
//do something
cb.await();//阻塞线程直到其他所有线程都到达该点然后继续执行下面的代码
//do something
} catch (InterruptedException | BrokenBarrierException e1) {
e1.printStackTrace();
}
}).start();
}
  • Phaser

Phaser是JDK 7新增的一个同步辅助类,在功能上跟CyclicBarrier和CountDownLatch差不多,但支持更丰富的用法:使用过程中可以随时注册和注销参与者;不同于CyclicBarrier,分离出”到达”和”等待”机制;支持结束,默认情况下,当没有参与者的时候Phaser就结束了;支持层级Phaser结构;提供针对内部状态的监控方法。

1
2
3
4
5
6
7
8
9
10
11
//重要方法
public int register(); //注册到Phaser
public int bulkRegister(int parties); //register的批量注册方法
public int arrive(); //记录到达状态
public int arriveAndDeregister(); //抵达并注销
public int arriveAndAwaitAdvance(); //抵达并等待其它线程达到
protected boolean onAdvance(int phase, int registeredParties); //覆写该方法来定义抵达动作并控制终止条件
public int awaitAdvance(int phase); //当phase大于给定数时返回
public int getRegisteredParties(); //获取parties数
public int getArrivedParties(); //已经到达当前phase的parties数
public int getUnarrivedParties(); //剩余的未到达parties数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//分层示例
void build(Task[] tasks, int lo, int hi, Phaser ph) {
if (hi - lo > TASKS_PER_PHASER) {//如果大于每阶段任务数则分层
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
build(tasks, i, j, new Phaser(ph));//进行分层
}
} else {
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(ph);//假定在new Task(ph)中执行了ph.register()
}
}

build(new Task[n], 0, n, new Phaser());
  • Semaphore

Java中信号量的实现类。同样提供公平锁与非公平两种机制。内部调度同样基于AQS

1
2
3
4
5
6
7
8
9
10
11
12
//几个重要的方法
//阻塞的方法
public void acquire(); //获取一个许可,若未获得,则会一直等待,直到获得许可。
public void acquire(int permits); //获取permits个许可
public void release(); //释放一个许可
public void release(int permits); //释放permits个许可

//非阻塞方法
public boolean tryAcquire(); //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit); //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits); //尝试获取permits个许可
public boolean tryAcquire(int permits, long timeout, TimeUnit unit); //尝试获取permits个许可
  • TimeUnit

时间单元类。支持有:DAYS,HOURS,MINUTS,SECONDS,MILLISECONDS,MICROSECONDS,NANOSECONDS。除了提供时间转换方法以外,该类还提供了快捷的指定时间的wait/sleep/join方法。

1
2
3
public void timedWait(Object obj, long timeout);
public void timedJoin(Thread thread, long timeout);
public void sleep(long timeout);

atomic

原子类,使用原子类可以保证原子操作。

  • AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
    这四种基本类型用来处理布尔,整数,长整数,对象四种数据。
  • LongAdder,LongAccumulator,DoubleAdder,DoubleAccumulator
    这四种类是jdk1.8新增,他们都继承了Striped64Striped64是在java8中添加用来支持累加器的并发组件,其设计类似于分段锁算法,在竞争激烈的时候尽量分散竞争。LongAdder基本可以替换掉AtomicLong,在高并发情况下性能更高。LongAccumulator对LongAdder进行了扩展;LongAdder只支持加减,而LongAccumulator则提供了自定义函数操作。
  • AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
    这三类提供了对数组的原子操作。
  • AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
    这三类基于反射,提供对对象域的原子操作。
  • AtomicMarkableReference,AtomicStampedReference
    这两类解决了ABA问题。本质上是采用了版本号对修改进行了记录。AtomicStampedReference是使用pair的int stamp作为计数器使用,它记录了修改了几次,AtomicMarkableReference的pair使用的是boolean mark,它则是记录是否被修改。

锁分类
  • 独享锁/共享锁 独享锁是指该锁一次只能被一个线程所持有(可看作信号量为1),共享锁是指该锁可被多个线程所持有(可看作信号量大于1)。
  • 互斥锁/读写锁 互斥锁是独享锁的具体实现,读写锁是共享锁的具体实现。
  • 公平锁/非公平锁 公平锁是指多个线程按照申请锁的顺序来获取锁。而非公平锁则是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,可能会造成优先级反转或者饥饿现象。
  • 可重入锁 可重入锁又名递归锁,是指同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。
  • 乐观锁/悲观锁 乐观锁与悲观锁并不是具体的锁,而是看待并发同步的角度。乐观锁认为不存在很多的并发更新操作,不加锁是安全的。在innodb的读操作就采取了乐观锁(MVCC,有效范围为REPEATABLE READ和READ COMMITTED)。在Java中,常常采取CAS实现,例如Atomic类的更新。悲观锁则认为存在很多并发更新操作,一定需要加锁来确保安全。
  • 分段锁 分段锁同样不是具体的锁,而是一种锁的设计。它是通过更细的粒度来降低每一粒度的并发,从而减少冲突。在ConcurrentHashMapStriped64就体现了这种设计思想。
  • 自旋锁 自旋锁是指尝试获取锁的线程不会阻塞,而是采用循环的方式尝试获取锁。好处是减少上下文切换,缺点是一直占用CPU资源。低并发下使用。自旋锁属于乐观锁
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class SpinLock {//一个简单的自旋锁
    private AtomicReference<Thread> cas = new AtomicReference<Thread>();
    public void lock() {
    Thread current = Thread.currentThread();
    while (!cas.compareAndSet(null, current));//循环获取锁
    }
    public void unlock() {
    Thread current = Thread.currentThread();
    cas.compareAndSet(current, null);
    }
    }
  • 偏向锁/轻量级锁/重量级锁 这是针对Synchronized做的优化。偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。当锁是偏向锁的时候,被另一个线程所访问时,会将锁升级为轻量级锁。此时未获得锁的线程会通过自旋来尝试获取锁。在自旋超过一定次数任未获得锁时,则升级为重量级锁偏向锁轻量级锁都属于乐观锁
locks
  • synchronized

synchronized属于互斥锁,同一时间仅允许一个线程访问临界资源。在公平锁/非公平锁上属于非公平锁,是由竞争来获取锁所有权。同时synchronized也是可重入锁,即一个线程调用synchronized方法的同时在其方法体内部调用该对象另一个synchronized方法不会出现死锁,即以下代码是合法的。

1
2
3
4
5
6
7
8
synchronized void functionA() throws Exception{
//do something
functionB();//重入该对象
}

synchronized void functionB() throws Exception{
//do something
}

JUCL_UML

  • AbstractOwnableSynchronizer(AOS);AbstractQueuedLongSynchronizer(AQLS)/AbstractQueuedSynchronizer(AQS)

AOS为创建锁和相关同步器的所有权提供了一个标准,它本身不管理和提供任何信息,需要由子类来实现。它的两个子类即使AQSAQLSAQSAQLS通过CLH队列与共享资源state来管理线程关于锁的使用和同步。需要注意的是它们并不参与具体如何获取和释放锁,这需要由子类来实现。AQLS相较于AQS的区别在于内部acquire和release的arg参数是long而不是int类型。

1
2
3
4
5
//需要子类实现其获取和释放锁的方法,AQLS其入参类型为long
protected boolean tryAcquire(int arg);//尝试获取锁,用于独享锁
protected boolean tryRelease(int arg);//尝试释放锁,用于独享锁
protected int tryAcquireShared(int arg);//尝试获取锁,用于共享锁
protected boolean tryReleaseShared(int arg);//尝试释放锁,用于共享锁

AQS

  • Condition

在JUC(java.util.concurrent)下实现类似wait/notify的功能。Condition将Object监视器方法(wait、notify和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了synchronized方法和语句的使用,Condition替代了 Object监视器方法的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//JDK示例代码
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();//其实现在AQS里ConditionObject
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();//将该线程挂起并移到等待队列中
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();//通知等待队列
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
  • Lock

抽象接口,提供类似synchronized的功能,在JUC.locks下主要实现为ReentrantLock。其与synchronized区别主要如下:


  1. 使用上lock必须使用finally释放锁,否则会容易造成死锁;lock更灵活,可以自由定义多把锁的枷锁解锁顺序,synchronized要按照先加的后解顺序。
  2. 在加锁方案上提供多种加锁方案,lock 阻塞式, trylock 无阻塞式, lockInterruptily 可打断式, 还有trylock的带超时时间版本。
  3. 锁类型上两者皆是可重入锁,但lock提供公平锁的方案。
  4. 性能lock相比synchronized更高。

  • LockSupport

工具类,操作对象是线程,基于Unsafe类实现。基本操作park和unpark。park会把使得线程挂起,直到出现以下几种情况中的一种:其他线程调用unpark方法操作该线程;该线程被中断;park方法立刻返回。

1
2
3
4
5
//重要的方法
public static void park();
public static void parkNanos(long nanos);
public static void parkUntil(long deadline);
public static void unpark(Thread thread);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//使用示例:一个先进先出的互斥队列
public class FIFOMutex {
private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();

public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
waiters.add(current);

// 非队首或不能获取锁的时候挂起
while (waiters.peek() != current ||
!locked.compareAndSet(false, true)) {
LockSupport.park(this);
if (Thread.interrupted()) // 在等待的时候忽略中断
wasInterrupted = true;
}

waiters.remove();
if (wasInterrupted) // 重新设定中断状态并退出
current.interrupt();
}

public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
  • ReadWriteLock

读写锁接口,实现是ReentrantReadWriteLock。和Lock接口无关。读写锁在实现上保持了两个锁:读锁与写锁。其中读锁是共享锁,可被多个线程同时读取;写锁是独享锁,仅能被单个线程占有。

  • ReentrantLock

可重入锁,是Lock的实现类。控制方法采用AQS;其内部有NonfairSync非公平和FairSync公平两种实现,默认非公平。使用示例见Condition下JDK示例。

  • ReentrantReadWriteLock

可重入读写锁,是ReadWriteLock的实现类。内部持有一个ReadLock与一个WriteLock。同样使用AQS控制调度,也有NonfairSync非公平和FairSync公平两种实现,默认非公平。在自定义的Sync抽象类中(继承AQS),state高位的16位是共享锁的状态,低位的16位是独占锁的状态,以此来同时管理两种锁并实现读写互斥。

  • StampedLock

可简单理解为读写锁的一个改进版本。读写锁读写互斥;而StampedLock则提供了乐观的读策略,在读的时候发现有写操作,再去读多一次,这使得读操作不会阻塞写操作。需要注意的是,StampedLock是不可重入的,这使得如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁。

线程池

ThreadPool_UML

  • Executor/ExecutorService/ScheduledExecutorService

JUC中三个接口;Executor提供了一个运行新任务的简单接口;ExecutorService扩展了Executor接口,添加了一些用来管理执行器生命周期和任务生命周期的方法;ScheduledExecutorService则在ExecutorService基础上增添了一些计划任务方法。

  • ThreadPoolExecutor

ThreadPoolExecutor是线程池的核心类。它的构造函数如下:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);

  • corePoolSize 线程池核心线程数
  • maximumPoolSize 线程池最大数量
  • keepAliveTime 超出corePoolSize数量的线程的保留时间。
  • unit keepAliveTime单位。见JUC里TimeUnit。
  • workQueue 阻塞队列,存放来不及执行的线程。BlockingQueue接口在JUC里有以下实现类:
1
2
3
4
5
6
7
8
9
10
11
public ArrayBlockingQueue(int capacity);//默认非公平的array实现,可以通过另一个构造方法使用公平调度

public DelayQueue();//队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。

public LinkedBlockingQueue();//链表实现,其大小默认Integer.MAX_VALUE,可手动通过另一构造方法指定大小,否则当有大量请求任务时,容易造成OOM。

public SynchronousQueue();//同步队列,这是一个不存储元素的特殊队列,它负责把生产者线程处理的数据直接传递给消费者线程,默认非公平;公平时构造TransferQueue实现公平交易,否则构造TransferStack实现非公平交易。实际上,这个队列接收到任务的时候,会直接提交给线程处理,而不保留它。

public LinkedTransferQueue();//SynchronousQueue在交易时只有一个空位,而LinkedTransferQueue则是由多个空位(链表)来存储待交易的节点进行匹配

public PriorityBlockingQueue();//带优先级的阻塞队列,其默认大小为11;它在PriorityQueue的基础上增加了阻塞,队列不支持插入null元素,同时不支持插入非comparable的对象。
  • threadFactory 用来创建新线程。默认使用Executors.defaultThreadFactory()来创建线程。
  • handler 线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。如果需要自定义策略,可实现RejectedExecutionHandler接口。
1
2
3
4
public CallerRunsPolicy();//用调用者所在的线程来执行任务
public AbortPolicy();//直接抛出异常,默认策略
public DiscardPolicy();//什么都不做,意即丢弃任务
public DiscardOldestPolicy();//丢弃阻塞队列中靠最前的任务,并执行当前任务

当向线程池添加任务时:

  1. 如果线程池线程数量未到corePoolSize,即使有空闲线程,也会立即新建线程来执行任务
  2. 如果线程数量已达corePoolSize,则将任务放进阻塞队列workQueue
  3. 如果workQueue已满,线程池没有空闲线程且线程数量未到maximumPoolSize,则新建线程执行任务
  4. 如果workQueue已满,总线程数又达到了maximumPoolSize,则根据饱和策略handler来处理该任务

  • ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor扩展了ThreadPoolExecutor,在其基础上新增了计划任务,实现了ScheduledExecutorService接口。这里计划任务可分为两类:指定延时后执行任务;周期性重复执行任务。
它的构造方法如下:

1
2
3
4
5
6
7
8
9
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

可以看到它是一个核心线程数为corePoolSize,最大线程数为Integer.MAX_VALUE,同时空闲线程不会因闲置时间消亡的线程池,corePoolSize是必要参数,threadFactory和handler默认情况下为Executors.defaultThreadFactory()和AbortPolicy()。特别的,这里的阻塞队列为DelayedWorkQueue,它是一个内部静态类,内部存储对象为RunnableScheduledFuture,初始数组大小为16,它保证了添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//一些重要的方法

//指定时延后调度执行任务
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit);
//指定时延后调度执行任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit);
//指定时延后开始执行任务,以后每隔period的时长再次执行该任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//指定时延后开始执行任务,以后任务执行完成后等待delay时长,再次执行任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
  • Fork/Join框架与ForkJoinPool

Fork/Join核心思想是分治,它将大任务划分为小任务并行计算,最后再将小任务结果汇总得到最终结果。fork分解任务,join汇集结果。

Fork/Join框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行。那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行。这种方式减少了线程的等待时间,提高了性能。

ForkJoin主要提供了两个主要的执行任务的接口。RecurisiveActionRecurisiveTask。其中RecurisiveTask代表有返回值的任务,而RecurisiveAction代表没有返回值的任务。 它们的父类都是ForkJoinTaskforkjoin的实现在ForkJoinTask类中。fork方法用以一部方式启动任务的执行,join方法则等待任务完成并返回指向结果。

ForkJoinPool是ExecutorService的实现类。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

1
2
3
4
public ForkJoinPool(int parallelism,//线程数,默认为可用的CPU数量
ForkJoinWorkerThreadFactory factory,//默认为defaultForkJoinWorkerThreadFactory
UncaughtExceptionHandler handler,//异常处理,默认null
boolean asyncMode);//是否同步,默认false
1
2
3
4
5
6
7
8
//几个重要方法
public <T> T invoke(ForkJoinTask<T> task);//等待获取结果,内部调用ForkJoinTask.invoke
public void execute(ForkJoinTask<?> task);//异步执行任务,内部调用ForkJoinTask.fork
public void execute(Runnable task);//异步执行任务将Runnable封装为ForkJoinTask再调用
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);//提交ForkJoinTask执行并返回结果
public <T> ForkJoinTask<T> submit(Callable<T> task);
public <T> ForkJoinTask<T> submit(Runnable task, T result);
public ForkJoinTask<?> submit(Runnable task);

ForkJoinPool特别之处在于实现了工作窃取,即当前线程的Task已经全被执行完毕,则自动取到其他线程的Task池中取出Task继续执行。

ForkJoinPool

考虑对1000万个数据进行排序,使用归并将其划分为两个500万的排序任务和一个针对这两组500万数据的合并任务;以此思想不断递归划分,当子任务规模足够小的时候(低于某个阈值,比如10),停止继续划分,直接使用其他排序方式(如插入排序)进行排序。

此时它的树形任务图有21层,任务数约有10w个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

这里不能使用ThreadPoolExecutor,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

  • Executors

Executors提供了一些常用的线程池,开箱即用。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool核心线程等于最大线程数,线程不会因闲置而销毁。因使用队列为默认LinkedBlockingQueue,其最大数量为Integer.MAX_VALUE,当有大量请求任务时可能会在队列里堆积导致OOM。

1
2
3
4
5
6
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

WorkStealingPool提供了同步的ForkJoinPool线程池。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

SingleThreadExecutorFixedThreadPool,只是核心线程数为1。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

CachedThreadPool没有核心线程,最大线程数量无限制,创建任务如有空闲线程则提交空闲线程执行,没有则立即新建线程,当有大量请求任务时会创建大量的线程,导致OOM。

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

ScheduledThreadPool提供了指定核心数量的计划任务线程池ScheduledThreadPoolExecutor

1
2
3
4
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

SingleThreadScheduledExecutor单线程的ScheduledThreadPool

Collections

JUC_Collections_UML

  • ConcurrentHashMap

ConcurrentHashMapHashMap的并发实现,初见JDK1.7,彼时底层使用分段的方式来降低高并发导致的热点争用问题;在1.8时,弃用了分段锁,采用CAS来优化并发。详细讲解见Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析
ConcurrentHashMap_17.png
ConcurrentHashMap_18.png

  • ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全FIFO队列。在入队时,首先定位尾节点,然后使用CAS将入队节点设置为尾节点的后继。出队时首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

  • ConcurrentLinkedDeque

ConcurrentLinkedDequeConcurrentLinkedQueue双向链表版本,同时支持FIFO和FILO两种操作方式。头插入时首先寻找合法(存活)头节点,然后将新节点尾指针指向原头节点,并通过CAS设置原头节点前指针。取节点也类似,找到合法(存活)节点后unlink。

  • ConcurrentSkipListMap

跳跃表是一种通过“空间来换取时间”的一个算法,通过在每个节点中增加了向前的指针,从而提升查找的效率。ConcurrentSkipListMap提供了一种线程安全的并发访问的排序映射表。内部是SkipList(跳表)结构实现,在理论上能够O(log(n))时间内完成查找、插入、删除操作。

1
2
3
4
5
6
7
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;

...
}

在其中,Node是传统的链表节点,跳跃表中在之上建立索引,Index则是跳跃表的基本组成单元。

1
2
3
4
5
6
7
8
9
* Head nodes          Index nodes       
* +-+ right +-+ +-+
* |1|----------->| |------>| |---------------->| |->null
* +-+ +-+ +-+ +-+
* v down | | |
* Nodes next v v v
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+
* | |->|A|->|B|->|C|->|E|->|F|->|G|->|H|->|J|->|K|->null
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+

一个带一级索引的跳跃表如上。
在添加元素时,首先新增一个结点到最底层的链表上;然后生成一个随机数,并根据该随机数得到一个level,如果概率算得的level在当前跳表level范围内,则构建一个从1level的纵列index结点引用;否则新增添加一层,完成head结点的指针转移,并构建好纵向的index结点引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
* Head nodes          Index nodes
* +-+ right +-+ +-+
* |2|---------------->| |--------------------->| |->null
* +-+ +-+ +-+
* | down | |
* v v v
* +-+ +-+ +-+ +-+ +-+ +-+
* |1|----------->| |->| |------>| |----------->| |------>| |->null
* +-+ +-+ +-+ +-+ +-+ +-+
* v | | | | |
* Nodes next v v v v v
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+
* | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+

上图中,插入元素D时,得到一个大于当前跳跃表的level,于是便新增一层,并建立纵向的index结点引用。然后新增元素I,此时得到等于当前跳表的level,于是直接添加并每层都添加好向下的index结点引用。

1
2
3
*        +------+       +------+      +------+
* ... | b |------>| n |----->| f | ...
* +------+ +------+ +------+

删除节点时,如上图的n节点,首先使用CAS将n的value换成null;

1
2
3
*        +------+       +------+      +------+       +------+
* ... | b |------>| n |----->|marker|------>| f | ...
* +------+ +------+ +------+ +------+

然后在n的后面增加一个空的结点(marker)以避免一些在基于CAS链表的删除错误;

1
2
3
*        +------+                                    +------+
* ... | b |----------------------------------->| f | ...
* +------+ +------+

最后将b指向f,断开n,让n被GC回收掉。

  • ConcurrentSkipListSet

ConcurrentSkipListSet本质是ConcurrentSkipListMap<E,Object>

  • CopyOnWriteArrayList

CopyOnWriteArrayList使用了写时复制,当有新元素添加到CopyOnWriteArrayList时,先从原有的数组中拷贝一份出来,然后在新的数组做写操作,写完之后,再将原来的数组引用指向到新数组。因此做到了线程安全。

  • CopyOnWriteArraySet

CopyOnWriteArraySet本质是CopyOnWriteArrayList,在添加元素时调用了CopyOnWriteArrayListaddIfAbsent(e)方法。