同步工具类

1.1 前言

同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。在容器中,有些也可以作为同步工具类,其它类型的同步工具类还包括闭锁(Latch)、信号量(Semaphore)以及栅栏(Barrier)。阻塞队列(eg: BlockQueue)是一种独特的类:它们不仅能作为保存对象的容器,还能协调生产者和消费者之间的控制流,因为它提供的 takeput 等方法将会阻塞,直到队列达到期望的状态。所有的同步工具类都包含一些特定的属性:它们封装了一些状态,这些状态将决定同步工具类的线程是继续执行还是等待,此外还提供了一些方法对其状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。

1.2 闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有线程通过。当闭锁到达结束状态后,将不会再次改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其它活动都完成后才继续执行。比如:

  • 确保某个计算机在其需要的所有资源初始化后才能继续执行。
  • 确保某个服务在其依赖的所有服务都已经启动后才启动。
  • 等待直到某个操作的所有参与者都就绪后再继续执行。
1.2.1 CountDownLatch

CountDownLatch 是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待事件的数量。countDown() 方法递减计数器,表示有一个事件已经发生了,而 await() 方法等待计数器达到 0 ,这表示所有需要等待的事件都已经发生。如果计数器的值非 0 ,那么 await() 方法会一直阻塞到计数器的值为 0 ,或者等待线程中断,或者等待超时。
CountDownLatch 被用来同步一个或多个任务,强制它们等待由其它任务执行的一组操作完成。你可以向 CountDownLatch 对象设置一个初始计数值,任何在这个对象上调用 await() 的方法都将阻塞,直到这个计数值到达 0。其它任务在结束工作时,可以在该对象上调用 countDown() 方法来减小这个计数值。CountDownLatch 被设计为只触发一次,计数值不能重置。如果你需要重置计数值的版本,请看下文的 CyclicBarrier。把大象放入冰箱的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* @author mghio
* @date: 2019-11-03
* @version: 1.0
* @description: 同步工具类 —— CountDownLatch
* @since JDK 1.8
*/
public class CountDownLatchDemo {

private static CountDownLatch countDownLatch1 = new CountDownLatch(1);

private static CountDownLatch countDownLatch2 = new CountDownLatch(1);

public static void main(String[] args) {
final Thread thread1 = new Thread(() -> {
System.out.println("step 1:打开冰箱门...");
// 对 countDownLatch1 倒计时 -1
countDownLatch1.countDown();
});

final Thread thread2 = new Thread(() -> {
try {
// 等待 countDownLatch1 倒计时,计时为 0 则往下运行
countDownLatch1.await();
System.out.println("step 2:把大象放入冰箱...");
// 对 countDownLatch2 倒计时 -1
countDownLatch2.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});

final Thread thread3 = new Thread(() -> {
try {
// 对 countDownLatch2 倒计时,计时为 0 则往下进行
countDownLatch2.await();
System.out.println("step 3:关上冰箱门...");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

System.out.println("------- 把大象放入冰箱 --------");
thread3.start();
thread1.start();
thread2.start();
}

}

以上代码输出结果:

1
2
3
4
------- 把大象放入冰箱 --------
step 1:打开冰箱门...
step 2:把大象放入冰箱...
step 3:关上冰箱门...
1.2.2 FutureTask

FutureTask 也可以用作闭锁。它实现了 Future 的语义,表示一种抽象可生成结果的计算。 FutureTask 表示的计算是通过 Callable 来实现的,相当于一种可生成结果的 Runnable ,并且可以处于这三种状态:等待运行(Waiting to run)正在运行(Running)运行完成(Completed)。其中执行完成表示计算的所有可能结束方式,包括正常结束、由于取消结束和由于异常结束等。当 FutureTask 进入完成状态后,它就会永远停在这个状态上。get() 方法的行为取决于任务的状态。如果此时任务已经完成,那么 get() 方法会立即返回结果,否则将会阻塞直到任务进入到完成状态,然后返回结果或者抛出异常。FutureTask 将计算结果从执行计算的线程传递到获取这个结果的线程,而 FutureTask 的规范确保了这种传递过程能实现结果的安全发布。
FutureTaskExecutor 框架中表示异步任务,除此之外还可以用来表示一些耗时比较长的计算,这些计算可以在使用计算结果之前启动。以下示例使用其执行一个异步任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @author mghio
* @date: 2019-11-03
* @version: 1.0
* @description: 同步工具类 —— FutureTask
* @since JDK 1.8
*/
public class FutureTaskDemo {

public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("--------- 进入主线程执行任务");
ExecutorService threadPool = Executors.newCachedThreadPool();
System.out.println("--------- 提交异步任务");
FutureTask<String> future = new FutureTask<>(() -> "成功获取 future 异步任务结果");
threadPool.execute(future);
System.out.println("--------- 提交异步任务之后,立马返回到主线程继续往下执行");
Thread.sleep(1000);
System.out.println("--------- 此时需要获取上面异步任务的执行结果");
boolean flag = true;
while (flag) {
if (future.isDone() && !future.isCancelled()) {
String futureResult = future.get();
System.out.println("--------- 异步任务返回的结果是:" + futureResult);
flag = false;
}
}
if (!threadPool.isShutdown()) {
threadPool.shutdown();
}
}

}

以上代码输出结果为:

1
2
3
4
5
--------- 进入主线程执行任务
--------- 提交异步任务
--------- 提交异步任务之后,立马返回到主线程继续往下执行
--------- 此时需要获取上面异步任务的执行结果
--------- 异步任务返回的结果是:成功获取 future 异步任务结果

1.4 信号量

计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行指定操作的数量。计数信号量还可以用来实现某种资源池或者对容器施加边界。Semaphore 中管理着一组虚拟的许可(permit),许可的初始数量可以通过构造函数来指定,在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire() 将阻塞直到有许可或者直到终端或者直到超时。release() 方法将返回一个许可给信号量。Semaphore 可以用于实现资源池,例如数据库连接池。我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池非空时解除阻塞。如果将 Semaphore 的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用 acquire() 方法获取一个许可,在将资源返回给池之后调用 release() 方法释放许可,那么 acquire() 方法将一直阻塞直到资源池不为空。以下示例将使用 SemaphoreHashSet 容器变成有界的阻塞容器,信号量的计数值会初始化为容器容量的最大值。add 操作在向底层容器添加一个元素之前,首先要获取一个许可。如果 add 操作没有添加任何元素,那么会立刻释放许可。同样 remove 操作会释放一个许可,使更多的元素能够添加到容器中。底层的 Set 实现并不知道关于边界的任何信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* @author maguihai
* @date: 2019-11-03
* @version: 1.0
* @description: 同步工具类 —— Semaphore
* @since JDK 1.8
*/
public class BoundedHashSet<T> {

private final Set<T> set;

private final Semaphore sem;

public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<>());
this.sem = new Semaphore(bound);
}

public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded) {
sem.release();
}
}
}

public boolean remove(T o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
sem.release();
}
return wasRemoved;
}

}

1.5 栅栏

我们已经看到通过闭锁来启动一组相关的操作,或者等待一组相关的操作结束。闭锁是一次性对象,一旦进入终止状态,就不能被重置。栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于:所有线程都必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其它线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:“所有人 6:00 在 KFC 碰头,到了以后要等其它人,之后再讨论下一步要做的事情”。CyclicBarrier 适用于这样的情况:你希望创建一组任务,他们并行执行工作,然后再运行下一个步骤之前等待,知道所有任务都完成(有点儿像线程的 join 方法)。它使得所有的并行任务都将处于栅栏处列队,因此可以一致的向前移动。这和上文的 CountDownLatch 非常像,只是 CountDownLatch 只是触发一次的事件,而 CyclicBarrier 可以重复使用。
CyclicBarrier 可以使一定数量的参与方反复地在栅栏位置汇聚,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程达到栅栏位置时将调用 await() 方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有栅栏都到达栅栏了位置,那么栅栏将打开,此时所有的线程都被释放,而栅栏将被重置以便下次使用。如果对 await() 方法调用超时,或者线程被中断,那么栅栏就认为是被打破了,所有阻塞 await() 的调用都将终止并抛出 BrokenBarrierException。如果成功通过栅栏,那么 await() 将为每一个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier 还可以使你将一个栅栏操作传递给构造函数,这个一个 Runnable ,当成功通过栅栏时会(在一个子任务线程中)执行它,但是它在阻塞线程被释放前是不能执行的。使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* @author mghio
* @date: 2019-11-03
* @version: 1.0
* @description: 同步工具类 —— CyclicBarrier
* @since JDK 1.8
*/
public class CyclicBarrieDemo {

public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
// 创建 CyclicBarrier 对象并设置 3 个公共屏障点
final CyclicBarrier cb = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
Runnable runnable = () -> {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程 " + Thread.currentThread().getName() + " 即将到达集合地点1,当前已有 " + cb.getNumberWaiting() + " 个已经到达,正在等候");
// 到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行
cb.await();
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程 " + Thread.currentThread().getName() + " 即将到达集合地点2,当前已有 " + cb.getNumberWaiting() + " 个已经到达,正在等候");
cb.await();
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程 " + Thread.currentThread().getName() + " 即将到达集合地点3,当前已有 " + cb.getNumberWaiting() + " 个已经到达,正在等候");
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
};
service.execute(runnable);
}
service.shutdown();
}

}

以上代码运行结果:

1
2
3
4
5
6
7
8
9
线程 pool-1-thread-3 即将到达集合地点1,当前已有 0 个已经到达,正在等候
线程 pool-1-thread-1 即将到达集合地点1,当前已有 1 个已经到达,正在等候
线程 pool-1-thread-2 即将到达集合地点1,当前已有 2 个已经到达,正在等候
线程 pool-1-thread-3 即将到达集合地点2,当前已有 0 个已经到达,正在等候
线程 pool-1-thread-2 即将到达集合地点2,当前已有 1 个已经到达,正在等候
线程 pool-1-thread-1 即将到达集合地点2,当前已有 2 个已经到达,正在等候
线程 pool-1-thread-3 即将到达集合地点3,当前已有 0 个已经到达,正在等候
线程 pool-1-thread-2 即将到达集合地点3,当前已有 1 个已经到达,正在等候
线程 pool-1-thread-1 即将到达集合地点3,当前已有 2 个已经到达,正在等候
-------------本文结束感谢您的阅读-------------
mghio wechat
微信公众号「mghio」
请我吃🍗