public class Phaser extends Object
CyclicBarrier和CountDownLatch,但支持更灵活的使用。
注册。 不像其他障碍的情况下,各方的号码进行注册 ,以在相位同步可以随时间变化。 任务可以在任何时间(使用的方法来注册register() , bulkRegister(int) ,或构造建立各方的初始数的形式),和(使用任何抵达时任选注销arriveAndDeregister() )。 与大多数基本同步结构一样,注册和注销仅影响内部计数; 他们没有建立任何进一步的内部簿记,所以任务无法查询是否注册。 (但是,您可以通过对此类进行子类化来介绍此类簿记。)
同步 像CyclicBarrier , Phaser可能会重复等待。 方法arriveAndAwaitAdvance()有效果类似于CyclicBarrier.await 。 每一代移相器都具有相关的相位数。 相位数从零开始,当所有各方到达移相器时前进,在达到Integer.MAX_VALUE之后Integer.MAX_VALUE零。 通过使用阶段数字,可以通过两种可由任何注册方调用的方法,在到达移相器和等待其他人时独立控制动作:
arrive()和arriveAndDeregister()记录到达。 这些方法不会阻塞,而是返回相关的到达阶段数 ; 即到达应用的移相器的相位数。 当给定阶段的最后一方到达时,执行可选操作,并且阶段前进。 这些动作由触发相位提前的方执行,并且通过覆盖方法onAdvance(int, int)进行排列,该方法也控制终止。 覆盖该方法是相似,但比更灵活,提供屏障作用到CyclicBarrier 。 awaitAdvance(int)需要一个指示到达阶段数的参数,并且当相位器前进到(或已经处于)不同阶段时返回。 与使用CyclicBarrier类似结构不同,方法awaitAdvance继续等待即使等待的线程被中断。 中断和超时版本也是可用的,但任务等待中断或超时时遇到的异常不会改变移相器的状态。 如有必要,您可以在调用forceTermination之后,在处理程序中执行任何关联的恢复。 在ForkJoinPool中执行的任务也可以使用抖动器 ,当其他人被阻塞等待相位提前时,这将确保足够的并行执行任务。 终止。 移相器可能进入终止状态,可以使用方法isTerminated()进行检查。 一旦终止,所有同步方法立即返回而不等待提前,如负值返回值所示。 同样,终止时注册的尝试也没有效果。 当调用onAdvance返回true 。 如果注销已导致注册方的数量变为零,默认实现将返回true 。 如下所示,当相位器以固定次数的迭代控制动作时,当当前相位数达到阈值时,通常方便的是重写该方法以导致终止。 方法forceTermination()也可用于突然释放等待线程并允许它们终止。
分层 移相器可以分层 (即,以树结构构造)以减少争用。 具有大量聚会的激动剂,否则将会遇到重度的同步竞争成本,可能会被设置为使得一些子相位器共享一个共同的父节点。 这可能会大大增加吞吐量,即使它产生更大的每操作开销。
在一个分层相位的树中,自动管理儿童相机与其父母的注册和注销。 每当儿童移动设备的注册方数量变得非零(如Phaser(Phaser,int)构造函数中所确定的那样, register()或bulkRegister(int) ),子移动设备就向其父母注册。 只要注册方的数量变为零的调用的结果arriveAndDeregister() ,孩子相位器从其父注销。
监控。 虽然同步方法只能由注册方调用,但是可以由任何呼叫者监视移相器的当前状态。 在任何时刻,共有getRegisteredParties()个派对,其中getArrivedParties()已到达现阶段( getPhase() )。 剩下的( getUnarrivedParties() )派对到达时,阶段进行。 这些方法返回的值可以反映瞬态状态,因此对于同步控制通常不是有用的。 方法toString()以便于非正式监视的形式返回这些状态查询的快照。
示例用法:
可以使用A Phaser而不是CountDownLatch来控制为可变数目的方提供服务的一次性动作。 典型的成语是用于首先注册的方法设置,然后启动动作,然后注销,如:
void runTasks(List<Runnable> tasks) { final Phaser phaser = new Phaser(1); // "1" to register self // create and start threads for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { phaser.arriveAndAwaitAdvance(); // await all creation task.run(); } }.start(); } // allow threads to start and deregister self phaser.arriveAndDeregister(); }
导致一组线程重复执行给定次数迭代的一种方法是覆盖onAdvance :
void startTasks(List<Runnable> tasks, final int iterations) { final Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations || registeredParties == 0; } }; phaser.register(); for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); } }.start(); } phaser.arriveAndDeregister(); // deregister self, don't wait }
如果主要任务必须等待终止,则可能会重新注册,然后执行类似的循环:
// ... phaser.register(); while (!phaser.isTerminated()) phaser.arriveAndAwaitAdvance();
可以使用相关的结构来等待上下文中的特定阶段数字,在这些上下文中,您确定阶段永远不会绕过Integer.MAX_VALUE 。 例如:
void awaitPhase(Phaser phaser, int phase) { int p = phaser.register(); // assumes caller not already registered while (p < phase) { if (phaser.isTerminated()) // ... deal with unexpected termination else p = phaser.arriveAndAwaitAdvance(); } phaser.arriveAndDeregister(); }
要创建一组n采用移相器的一棵树的任务,你可以使用以下形式的代码,假设一个任务类构造函数接受Phaser ,它与在建注册。 在调用build(new Task[n], 0, n, new Phaser())之后,可以启动这些任务,例如通过提交到池:
void build(Task[] tasks, int lo, int hi, Phaser ph) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(tasks, i, j, new Phaser(ph)); } } else { for (int i = lo; i < hi; ++i) tasks[i] = new Task(ph); // assumes new Task(ph) performs ph.register() } }
TASKS_PER_PHASER的最佳价值主要取决于预期的同步率。
低至4的值可能适用于非常小的每阶段任务机构(因此高速率),或者对于极大的任务机构可能达到数百。
实施说明 :此实施限制了最大数量的65535.尝试注册附加方会导致IllegalStateException 。 但是,您可以并且应该创建分层相位器来适应任意大量的参与者。
| Constructor and Description |
|---|
Phaser()
创建一个新的移相器,没有最初的注册方,没有父级和初始阶段数0。
|
Phaser(int parties)
创建一个新的移相器与给定数量的注册无障碍方,没有父母和初始阶段0。
|
Phaser(Phaser parent)
相当于
Phaser(parent, 0) 。
|
Phaser(Phaser parent, int parties)
与给定的父母和注册的无礼方的数量创建一个新的移相器。
|
| Modifier and Type | Method and Description |
|---|---|
int |
arrive()
抵达这个移相器,而不用等待别人到达。
|
int |
arriveAndAwaitAdvance()
到达这个移相器,等待其他人。
|
int |
arriveAndDeregister()
到达这个移相器并从其中注销,而无需等待别人到达。
|
int |
awaitAdvance(int phase)
等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。
|
int |
awaitAdvanceInterruptibly(int phase)
等待该移相器的阶段从给定的相位值推进,如果在等待时
InterruptedException则抛出
InterruptedException ,或者如果当前相位不等于给定的相位值或者该相位器被终止,则立即返回。
|
int |
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
等待该移相器的阶段从给定的相位值或给定的超时时间
InterruptedException到等待时抛出
InterruptedException ,如果当前相位不等于给定的相位值,则立即返回,或者该相位器被终止。
|
int |
bulkRegister(int parties)
增加给定数量的新的有争议的派对到这个移相器。
|
void |
forceTermination()
强制此移相器进入终止状态。
|
int |
getArrivedParties()
返回在此移相器的当前阶段到达的已注册方的数量。
|
Phaser |
getParent()
返回此移相器的父级,如果没有,则返回
null 。
|
int |
getPhase()
返回当前相位数。
|
int |
getRegisteredParties()
返回在此移动设备上注册的各方数量。
|
Phaser |
getRoot()
返回此移相器的根祖先,如果它没有父代,则与该移相器相同。
|
int |
getUnarrivedParties()
返回尚未到达此移相器当前阶段的已注册方的数量。
|
boolean |
isTerminated()
返回
true如果移相器已被终止。
|
protected boolean |
onAdvance(int phase, int registeredParties)
在即将进行的相位提前执行动作的可覆盖方法,并控制终止。
|
int |
register()
添加一个新的unririved party到这个移相器。
|
String |
toString()
返回一个标识此移相器的字符串及其状态。
|
public Phaser()
public Phaser(int parties)
parties - 需要进入下一阶段的各方数量
IllegalArgumentException - 如果当事方小于零或大于所支持的最大数目
public Phaser(Phaser parent)
Phaser(parent, 0) 。
parent - 父移相器
public Phaser(Phaser parent, int parties)
parent - 父移相器
parties - 进入下一阶段所需的当事人数
IllegalArgumentException - 如果缔约方小于零或大于所支持的最大数目
public int register()
onAdvance(int, int)正在进行中,则此方法可能会在返回之前等待完成。
如果此移动设备有父母,而此移动设备以前没有注册方,则该儿童移动设备也向其父母注册。
如果此移相器被终止,则尝试注册不起作用,并返回负值。
IllegalStateException - 如果尝试注册超过最多支持的派对数量
public int bulkRegister(int parties)
onAdvance(int, int)正在进行中,则此方法可能会在返回之前等待完成。
如果此移动设备有父母,而且指定的当事人数量大于零,并且此移动设备以前没有注册方,则该子版移动设备也向其父母注册。
如果此移相器被终止,则尝试注册不起作用,并返回负值。
parties - 需要进入下一阶段的附加方的数量
IllegalStateException - 如果尝试注册超过最多支持的派对数量
IllegalArgumentException - 如果
parties < 0
public int arrive()
这是非注册方调用此方法的使用错误。 但是,可能会导致这个错误IllegalStateException只有在这个相位一些后续操作,如果有的话。
IllegalStateException - 如果没有终止,而有争议的党的人数将变为负数
public int arriveAndDeregister()
这是非注册方调用此方法的使用错误。 但是,可能会导致这个错误IllegalStateException只有在这个相位一些后续操作,如果有的话。
IllegalStateException - 如果没有终止,登记的或没有任何一方的人数将变为负数
public int arriveAndAwaitAdvance()
awaitAdvance(arrive()) 。
如果您需要等待中断或超时,您可以使用awaitAdvance方法的其他形式之一使用类似的结构进行awaitAdvance 。
如果相反,您需要在抵达时awaitAdvance(arriveAndDeregister()) ,请使用awaitAdvance(arriveAndDeregister()) 。
这是非注册方调用此方法的使用错误。 但是,这个错误可能会导致IllegalStateException只有在这个移相器的一些后续操作,如果有的话。
IllegalStateException - 如果没有被终止,而有争议的党的人数将变为负数
public int awaitAdvance(int phase)
phase - 抵达阶段数,如果终止则为负值;
此参数通常是前一次调用arrive或arriveAndDeregister返回的值。
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException
InterruptedException则抛出
InterruptedException ,或者如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。
phase - 到达阶段号码,如果终止则为负值;
此参数通常是前一次调用arrive或arriveAndDeregister返回的值。
InterruptedException - 如果线程在等待时中断
public int awaitAdvanceInterruptibly(int phase,
long timeout,
TimeUnit unit)
throws InterruptedException,
TimeoutException
InterruptedException到等待时抛出
InterruptedException ,如果当前相位不等于给定的相位值,则立即返回,或者该相位器被终止。
phase - 到达阶段数,如果终止则为负值;
此参数通常是前一次调用arrive或arriveAndDeregister返回的值。
timeout - 放弃之前等待多久,以
unit为单位
unit - a
TimeUnit确定如何解释
timeout参数
InterruptedException - 如果线程在等待时中断
TimeoutException - 如果在等待时超时
public void forceTermination()
public final int getPhase()
Integer.MAX_VALUE ,之后重新开始为零。
一旦终止,相数为负,在这种情况下,终止前的主要阶段可以通过getPhase() + Integer.MIN_VALUE获得。
public int getRegisteredParties()
public int getArrivedParties()
public int getUnarrivedParties()
public Phaser getParent()
null 。
null
public Phaser getRoot()
public boolean isTerminated()
true如果移相器已被终止。
true如果此移相器已被终止
protected boolean onAdvance(int phase,
int registeredParties)
true ,则该移相器将在提前设置为最终终止状态,后续调用isTerminated()将返回true。
任何(未选中)通过调用此方法引发的异常或错误将传播给尝试推进此移相器的方,在这种情况下不会发生提前。
该方法的参数提供了当前转换占优势的状态。 在onAdvance内对这个移相器引用到达,注册和等待方法的onAdvance是未指定的,不应该依赖。
如果这个移相器是分层的相位器的成员,那么onAdvance仅在每个onAdvance的根onAdvance器上被调用。
支持最常见的用例,此方法的默认实现返回true时注册方的数量已变为零作为党调用的结果arriveAndDeregister 。 您可以禁用此行为,从而通过覆盖此方法始终返回false :
Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int parties) { return false; } }
phase - 在此移相器提前之前输入此方法的当前阶段编号
registeredParties - 当前注册方的数量
true如果这个移相器应该终止
Submit a bug or feature
For further API reference and developer documentation, see Java SE Documentation. That documentation contains more detailed, developer-targeted descriptions, with conceptual overviews, definitions of terms, workarounds, and working code examples.
Copyright © 1993, 2014, Oracle and/or its affiliates. All rights reserved.