1.1 前言
同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。在容器中,有些也可以作为同步工具类,其它类型的同步工具类还包括闭锁(Latch)、信号量(Semaphore)以及栅栏(Barrier)。阻塞队列(eg: BlockQueue)是一种独特的类:它们不仅能作为保存对象的容器,还能协调生产者和消费者之间的控制流,因为它提供的 take
和 put
等方法将会阻塞,直到队列达到期望的状态。所有的同步工具类都包含一些特定的属性:它们封装了一些状态,这些状态将决定同步工具类的线程是继续执行还是等待,此外还提供了一些方法对其状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。
1.2 闭锁
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有线程通过。当闭锁到达结束状态后,将不会再次改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其它活动都完成后才继续执行。比如:
1.2.1 CountDownLatch
CountDownLatch
是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待事件的数量。countDown()
方法递减计数器,表示有一个事件已经发生了,而 await()
方法等待计数器达到 0 ,这表示所有需要等待的事件都已经发生。如果计数器的值非 0 ,那么 await()
方法会一直阻塞到计数器的值为 0 ,或者等待线程中断,或者等待超时。CountDownLatch
被用来同步一个或多个任务,强制它们等待由其它任务执行的一组操作完成。你可以向 CountDownLatch
对象设置一个初始计数值,任何在这个对象上调用 await()
的方法都将阻塞,直到这个计数值到达 0。其它任务在结束工作时,可以在该对象上调用 countDown()
方法来减小这个计数值。CountDownLatch
被设计为只触发一次,计数值不能重置。如果你需要重置计数值的版本,请看下文的 CyclicBarrier
。把大象放入冰箱的例子:
1 | /** |
以上代码输出结果:
1 | ------- 把大象放入冰箱 -------- |
1.2.2 FutureTask
FutureTask
也可以用作闭锁。它实现了 Future
的语义,表示一种抽象可生成结果的计算。 FutureTask
表示的计算是通过 Callable
来实现的,相当于一种可生成结果的 Runnable
,并且可以处于这三种状态:等待运行(Waiting to run)
、正在运行(Running)
和运行完成(Completed)
。其中执行完成
表示计算的所有可能结束方式,包括正常结束、由于取消结束和由于异常结束等。当 FutureTask
进入完成状态后,它就会永远停在这个状态上。get()
方法的行为取决于任务的状态。如果此时任务已经完成,那么 get()
方法会立即返回结果,否则将会阻塞直到任务进入到完成状态,然后返回结果或者抛出异常。FutureTask
将计算结果从执行计算的线程传递到获取这个结果的线程,而 FutureTask
的规范确保了这种传递过程能实现结果的安全发布。FutureTask
在 Executor
框架中表示异步任务,除此之外还可以用来表示一些耗时比较长的计算,这些计算可以在使用计算结果之前启动。以下示例使用其执行一个异步任务:
1 | /** |
以上代码输出结果为:
1 | --------- 进入主线程执行任务 |
1.4 信号量
计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行指定操作的数量。计数信号量还可以用来实现某种资源池或者对容器施加边界。Semaphore
中管理着一组虚拟的许可(permit
),许可的初始数量可以通过构造函数来指定,在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire()
将阻塞直到有许可或者直到终端或者直到超时。release()
方法将返回一个许可给信号量。Semaphore
可以用于实现资源池,例如数据库连接池。我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池非空时解除阻塞。如果将 Semaphore
的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用 acquire()
方法获取一个许可,在将资源返回给池之后调用 release()
方法释放许可,那么 acquire()
方法将一直阻塞直到资源池不为空。以下示例将使用 Semaphore
将 HashSet
容器变成有界的阻塞容器,信号量的计数值会初始化为容器容量的最大值。add
操作在向底层容器添加一个元素之前,首先要获取一个许可。如果 add
操作没有添加任何元素,那么会立刻释放许可。同样 remove
操作会释放一个许可,使更多的元素能够添加到容器中。底层的 Set
实现并不知道关于边界的任何信息。
1 | /** |
1.5 栅栏
我们已经看到通过闭锁
来启动一组相关的操作,或者等待一组相关的操作结束。闭锁是一次性对象,一旦进入终止状态,就不能被重置。栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于:所有线程都必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其它线程。栅栏用于实现一些协议,例如几个家庭决定在某个地方集合:“所有人 6:00 在 KFC 碰头,到了以后要等其它人,之后再讨论下一步要做的事情”。CyclicBarrier
适用于这样的情况:你希望创建一组任务,他们并行执行工作,然后再运行下一个步骤之前等待,知道所有任务都完成(有点儿像线程的 join
方法)。它使得所有的并行任务都将处于栅栏处列队,因此可以一致的向前移动。这和上文的 CountDownLatch
非常像,只是 CountDownLatch
只是触发一次的事件,而 CyclicBarrier
可以重复使用。CyclicBarrier
可以使一定数量的参与方反复地在栅栏位置汇聚,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程达到栅栏位置时将调用 await()
方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有栅栏都到达栅栏了位置,那么栅栏将打开,此时所有的线程都被释放,而栅栏将被重置以便下次使用。如果对 await()
方法调用超时,或者线程被中断,那么栅栏就认为是被打破了,所有阻塞 await()
的调用都将终止并抛出 BrokenBarrierException
。如果成功通过栅栏,那么 await()
将为每一个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier
还可以使你将一个栅栏操作传递给构造函数,这个一个 Runnable
,当成功通过栅栏时会(在一个子任务线程中)执行它,但是它在阻塞线程被释放前是不能执行的。使用示例:
1 | /** |
以上代码运行结果:
1 | 线程 pool-1-thread-3 即将到达集合地点1,当前已有 0 个已经到达,正在等候 |