多线程工具包

类型 并发特性 其他
CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行 不可复用
CyclicBarrier 般用于一组线程互相等待至某个状态,然后这一组线程再同时执行 可复用;方法更多
Semaphore 流控(资源并发控制)
Exchanger 用于线程间协作的工具类,用于两个线程间能够交换
Phaser 移相器 线程之前的动作协调,同步 很灵活,底层非AQS,内部实现比较复杂

Phaser

alt

Phaser的灵活性主要体现在在构造函数时不需要强制指定目前有多少参与协作的线程,可以在运行时动态改变。

register()//添加一个新的注册者
bulkRegister(int parties)//添加指定数量的多个注册者
arrive()// 到达栅栏点直接执行,无须等待其他的线程
arriveAndAwaitAdvance()//到达栅栏点,必须等待其他所有注册者到达
arriveAndDeregister()//到达栅栏点,注销自己无须等待其他的注册者到达
onAdvance(int phase, int registeredParties)//多个线程达到注册点之后,会调用该方法。

(1)替代CountDownLatch实现一次性的共享锁例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}

// allow threads to start and deregister self
phaser.arriveAndDeregister();
}

(2)模拟CyclicBarrier的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package concurrent.tools.phaser;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
* Created by Administrator on 2018/8/27.
*/
public class PhaserDemo5 {

public static void main(String[] args) throws InterruptedException {

Phaser phaser=new Phaser(){
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("=================step-"+phase+"==================="+registeredParties);
return super.onAdvance(phase, registeredParties);
}
};

Bus bus1=new Bus(phaser,"小张");
Bus bus2=new Bus(phaser,"小李");
Bus bus3=new Bus(phaser,"小王");

bus1.start();
bus2.start();
bus3.start();


System.out.println(phaser.getRegisteredParties());



}


static public class Bus extends Thread{

private Phaser phaser;
private Random random;

public Bus(Phaser phaser,String name){
this.phaser=phaser;
setName(name);
random=new Random();
phaser.register();
}


private void trip(int sleepRange,String cityName){
System.out.println(this.getName()+" 准备去"+cityName+"....");
int sleep=random.nextInt(sleepRange);
try {
TimeUnit.SECONDS.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getName()+" 达到"+cityName+"...... ");
if(this.getName().equals("小王1")){ // 测试掉队的情况
try {
TimeUnit.SECONDS.sleep(7);
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndDeregister();
}else {
phaser.arriveAndAwaitAdvance();
}
}





@Override
public void run() {

try {
int s=random.nextInt(3);
TimeUnit.SECONDS.sleep(s);
System.out.println(this.getName()+" 准备好了,旅行路线=北京=>上海=>杭州 ");
phaser.arriveAndAwaitAdvance();// 等待所有的汽车准备好
} catch (InterruptedException e) {
e.printStackTrace();
}


trip(5,"北京");
trip(5,"上海");
trip(3,"杭州");

}
}

}