public class Phaser
extends Object
java.lang.Object | |
↳ | java.util.concurrent.Phaser |
可重复使用的同步屏障,功能类似于 CyclicBarrier
和 CountDownLatch
但支持更灵活的使用。
注册。 与其他障碍的情况不同, 登记在移相器上进行同步的参与方的数量可能会随时间而变化。 任务可以在任何时间(使用的方法来注册register()
, bulkRegister(int)
,或构造建立各方的初始数的形式),和(使用任何抵达时任选注销arriveAndDeregister()
)。 与大多数基本同步结构一样,注册和注销只影响内部计数; 他们不建立任何进一步的内部簿记,因此任务不能查询他们是否已注册。 (但是,你可以通过继承这个类来引入这样的簿记。)
同步。 像CyclicBarrier
一样, Phaser
可能会一再等待。 方法arriveAndAwaitAdvance()
作用类似于CyclicBarrier.await
。 每代相位器都有相关的相位编号。 阶段编号从零开始,并在所有各方到达相位器时前进,在达到Integer.MAX_VALUE
后回零。 阶段号的使用能够在到达相位器时以及在等待其他时通过可能由任何注册方引用的两种方法来独立控制动作:
arrive()
and arriveAndDeregister()
record arrival. These methods do not block, but return an associated arrival phase number; that is, the phase number of the phaser to which the arrival applied. When the final party for a given phase arrives, an optional action is performed and the phase advances. These actions are performed by the party triggering a phase advance, and are arranged by overriding method onAdvance(int, int)
, which also controls termination. Overriding this method is similar to, but more flexible than, providing a barrier action to a CyclicBarrier
. awaitAdvance(int)
requires an argument indicating an arrival phase number, and returns when the phaser advances to (or is already at) a different phase. Unlike similar constructions using CyclicBarrier
, method awaitAdvance
continues to wait even if the waiting thread is interrupted. Interruptible and timeout versions are also available, but exceptions encountered while tasks wait interruptibly or with timeout do not change the state of the phaser. If necessary, you can perform any associated recovery within handlers of those exceptions, often after invoking forceTermination
. Phasers may also be used by tasks executing in a ForkJoinPool
. Progress is ensured if the pool's parallelismLevel can accommodate the maximum number of simultaneously blocked parties. 终止。 移相器可以进入终止状态,可以使用方法isTerminated()
进行检查。 终止时,所有同步方法立即返回而不等待提前,如负返回值所示。 同样,终止时注册的企图无效。 当调用onAdvance
返回true
时终止触发。 如果注销导致注册方的数量变为零,则默认实现返回true
。 如下图所示,当相位器控制具有固定迭代次数的动作时,在当前相位数达到阈值时重写此方法以导致终止常常是方便的。 方法forceTermination()
也可以突然释放等待线程并允许它们终止。
分层。 Phasers可以分层 (即,构建在树结构中)以减少争用。 具有大量当事方的阶段公司可能会遭遇严重的同步争用成本,反而可能会设立一些分阶段的小组以共享一个共同的母公司。 这可能会大大增加吞吐量,即使它会导致更高的每操作开销。
在分层阶段的树中,自动管理子阶段的注册和注销。 每当一个儿童相位器的注册方数量变为非零时(如Phaser(Phaser, int)
构造函数, register()
或bulkRegister(int)
),该孩子相位器就会向其父母注册。 每当注册方的数量因调用arriveAndDeregister()
而变为零时,儿童移相器就会从其父母注销。
监测。 虽然同步方法只能由注册方调用,但任何调用者都可以监控移相器的当前状态。 在任何时候,总共有getRegisteredParties()
方,其中getArrivedParties()
已到达目前阶段( getPhase()
)。 剩下的( getUnarrivedParties()
)各方到达时,阶段进展。 这些方法返回的值可能反映瞬态状态,所以通常不用于同步控制。 方法toString()
以便于非正式监控的形式返回这些状态查询的快照。
样品用量:
可以使用Phaser
而不是CountDownLatch
来控制服务于可变数量的各方的一次性动作。 典型的习惯用法是将该方法设置为第一个寄存器,然后启动操作,然后取消注册,如下所示:
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}
// allow threads to start and deregister self
phaser.arriveAndDeregister();
}
导致一组线程重复执行给定迭代次数的一种方法是覆盖 onAdvance
:
void startTasks(List<Runnable> tasks, final int iterations) {
final Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties == 0;
}
};
phaser.register();
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
}.start();
}
phaser.arriveAndDeregister(); // deregister self, don't wait
}
If the main task must later await termination, it may re-register and then execute a similar loop:
// ...
phaser.register();
while (!phaser.isTerminated())
phaser.arriveAndAwaitAdvance();
可以使用相关的结构来等待上下文中的特定阶段编号,您可以确定该阶段不会环绕Integer.MAX_VALUE
。 例如:
void awaitPhase(Phaser phaser, int phase) {
int p = phaser.register(); // assumes caller not already registered
while (p < phase) {
if (phaser.isTerminated())
// ... deal with unexpected termination
else
p = phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
}
要使用相位器树创建一组n
任务,您可以使用以下形式的代码,假定一个具有构造函数的Task类接受在构建时注册的Phaser
。 在调用build(new Task[n], 0, n, new Phaser())
,可以开始这些任务,例如通过提交给池:
void build(Task[] tasks, int lo, int hi, Phaser ph) {
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
build(tasks, i, j, new Phaser(ph));
}
} else {
for (int i = lo; i < hi; ++i)
tasks[i] = new Task(ph);
// assumes new Task(ph) performs ph.register()
}
}
The best value of
TASKS_PER_PHASER
depends mainly on expected synchronization rates. A value as low as four may be appropriate for extremely small per-phase task bodies (thus high rates), or up to hundreds for extremely large ones.
实施注意事项 :该实施将参与方的最大数目限制为65535.试图注册额外参与方的结果为IllegalStateException
。 但是,您可以并且应该创建分层的阶段器来容纳任意数量的参与者。
Public constructors |
|
---|---|
Phaser() 创建一个没有初始注册方的新移相器,无父母,初始阶段编号为0。 |
|
Phaser(int parties) 创建一个新的相位器,给定数量的未注册未注册方,无父母,初始阶段编号为0。 |
|
Phaser(Phaser parent) 相当于 |
|
Phaser(Phaser parent, int parties) 创建一个新的移相器,与给定的父母和注册未获得派对的人数。 |
Public methods |
|
---|---|
int |
arrive() 到达这个相位器,而不用等待其他人到达。 |
int |
arriveAndAwaitAdvance() 到达这个移相器并等待其他人。 |
int |
arriveAndDeregister() 到达这个相位器并从中取消注册,而无需等待其他人到达。 |
int |
awaitAdvance(int phase) 等待相位器的相位从给定的相位值前进,如果当前相位不等于给定的相位值或该相位器终止,则立即返回。 |
int |
awaitAdvanceInterruptibly(int phase) 等待相位器的相位从给定的相位值前进,如果在等待时中断,则抛出 |
int |
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 等待相位器的相位从给定的相位值或给定的超时时间 |
int |
bulkRegister(int parties) 将给定数量的新未授权方添加到该移相器。 |
void |
forceTermination() 强制该相位器输入终止状态。 |
int |
getArrivedParties() 返回已到达此移相器当前阶段的注册方的数量。 |
Phaser |
getParent() 返回此移相器的父级,如果没有则返回 |
final int |
getPhase() 返回当前阶段号。 |
int |
getRegisteredParties() 返回在此相位器上注册的参与方数量。 |
Phaser |
getRoot() 返回该相位器的根祖先,如果它没有父对象,则与此相位器相同。 |
int |
getUnarrivedParties() 返回尚未到达该移相器当前阶段的注册方的数量。 |
boolean |
isTerminated() 如果此移相器已终止,则返回 |
int |
register() 为这个移相器添加一个新的未获派的派对。 |
String |
toString() 返回标识此相位器的字符串以及其状态。 |
Protected methods |
|
---|---|
boolean |
onAdvance(int phase, int registeredParties) 在即将发生相位超前时执行动作并控制终止的可覆盖方法。 |
Inherited methods |
|
---|---|
From class java.lang.Object
|
Phaser (int parties)
创建一个新的相位器,给定数量的未注册未注册方,无父母,初始阶段编号为0。
Parameters | |
---|---|
parties |
int : the number of parties required to advance to the next phase |
Throws | |
---|---|
IllegalArgumentException |
if parties less than zero or greater than the maximum number of parties supported |
Phaser (Phaser parent)
相当于 Phaser(parent, 0)
。
Parameters | |
---|---|
parent |
Phaser : the parent phaser |
Phaser (Phaser parent, int parties)
创建一个新的移相器,与给定的父母和注册未获得派对的人数。 当给定的父母非空并且参与者的给定数量大于零时,该孩子相位器向其父母注册。
Parameters | |
---|---|
parent |
Phaser : the parent phaser |
parties |
int : the number of parties required to advance to the next phase |
Throws | |
---|---|
IllegalArgumentException |
if parties less than zero or greater than the maximum number of parties supported |
int arrive ()
到达这个相位器,而不用等待其他人到达。
这是未注册方调用此方法的使用错误。 然而,这个错误可能导致IllegalStateException
仅在该移相器上的一些后续操作时(如果有的话)。
Returns | |
---|---|
int |
the arrival phase number, or a negative value if terminated |
Throws | |
---|---|
IllegalStateException |
if not terminated and the number of unarrived parties would become negative |
int arriveAndAwaitAdvance ()
到达这个移相器并等待其他人。 等同于awaitAdvance(arrive())
。 如果您需要等待中断或超时,可以使用awaitAdvance
方法的其他形式之一进行类似结构安排。 如果您需要在抵达时注销,请使用awaitAdvance(arriveAndDeregister())
。
这是未注册方调用此方法的使用错误。 但是,只有在该移相器上进行了一些后续操作时,该错误才可能导致IllegalStateException
,如果有的话。
Returns | |
---|---|
int |
the arrival phase number, or the (negative) current phase if terminated |
Throws | |
---|---|
IllegalStateException |
if not terminated and the number of unarrived parties would become negative |
int arriveAndDeregister ()
到达这个相位器并从中取消注册,而无需等待其他人到达。 取消注册减少了在未来阶段推进所需的人数。 如果该移相器有父母,并且注销导致该移相器零次方,则该移相器也从其父母撤销注册。
这是未注册方调用此方法的使用错误。 然而,这个错误可能导致IllegalStateException
仅在该移相器上的一些后续操作(如果有的话)之后。
Returns | |
---|---|
int |
the arrival phase number, or a negative value if terminated |
Throws | |
---|---|
IllegalStateException |
if not terminated and the number of registered or unarrived parties would become negative |
int awaitAdvance (int phase)
等待相位器的相位从给定的相位值前进,如果当前相位不等于给定的相位值或该相位器终止,则立即返回。
Parameters | |
---|---|
phase |
int : an arrival phase number, or negative value if terminated; this argument is normally the value returned by a previous call to arrive or arriveAndDeregister . |
Returns | |
---|---|
int |
the next arrival phase number, or the argument if it is negative, or the (negative) current phase if terminated |
int awaitAdvanceInterruptibly (int phase)
等待该相位器的相位从给定的相位值前进,如果在等待时中断,则抛出 InterruptedException
,或者如果当前相位不等于给定的相位值或该相位器终止,则立即返回。
Parameters | |
---|---|
phase |
int : an arrival phase number, or negative value if terminated; this argument is normally the value returned by a previous call to arrive or arriveAndDeregister . |
Returns | |
---|---|
int |
the next arrival phase number, or the argument if it is negative, or the (negative) current phase if terminated |
Throws | |
---|---|
InterruptedException |
if thread interrupted while waiting |
int awaitAdvanceInterruptibly (int phase, long timeout, TimeUnit unit)
等待相位器的相位从给定的相位值或给定的超时时间 InterruptedException
过去,如果在等待时中断,则抛出 InterruptedException
,或者如果当前相位不等于给定相位值或该相位器终止,则立即返回。
Parameters | |
---|---|
phase |
int : an arrival phase number, or negative value if terminated; this argument is normally the value returned by a previous call to arrive or arriveAndDeregister . |
timeout |
long : how long to wait before giving up, in units of unit |
unit |
TimeUnit : a TimeUnit determining how to interpret the timeout parameter |
Returns | |
---|---|
int |
the next arrival phase number, or the argument if it is negative, or the (negative) current phase if terminated |
Throws | |
---|---|
InterruptedException |
if thread interrupted while waiting |
TimeoutException |
if timed out while waiting |
int bulkRegister (int parties)
将给定数量的新未授权方添加到该移相器。 如果正在进行的onAdvance(int, int)
调用正在进行中,则此方法可能会在返回之前等待其完成。 如果该移相器有一位父母,并且给定的参与者数量大于零,并且该移相器之前没有注册方,则该孩子移相器也向其父母注册。 如果该相位器终止,则尝试注册不起作用,并返回负值。
Parameters | |
---|---|
parties |
int : the number of additional parties required to advance to the next phase |
Returns | |
---|---|
int |
the arrival phase number to which this registration applied. If this value is negative, then this phaser has terminated, in which case registration has no effect. |
Throws | |
---|---|
IllegalStateException |
if attempting to register more than the maximum supported number of parties |
IllegalArgumentException |
if parties < 0 |
void forceTermination ()
强制该相位器输入终止状态。 注册方的数量不受影响。 如果该相位器是一组分相器的成员,则该组中的所有相位器都将被终止。 如果该移相器已经终止,则此方法不起作用。 此方法可能对于在一个或多个任务遇到意外异常之后协调恢复很有用。
int getArrivedParties ()
返回已到达此移相器当前阶段的注册方的数量。 如果该移相器已终止,则返回的值是无意义的和任意的。
Returns | |
---|---|
int |
the number of arrived parties |
Phaser getParent ()
返回该相位器的父代,或者如果没有,则返回 null
。
Returns | |
---|---|
Phaser |
the parent of this phaser, or null if none |
int getPhase ()
返回当前阶段号。 最大相数为Integer.MAX_VALUE
,之后重新开始为零。 终止时,阶段号为负数,在这种情况下,终止前的主要阶段可通过getPhase() + Integer.MIN_VALUE
获得。
Returns | |
---|---|
int |
the phase number, or a negative value if terminated |
int getRegisteredParties ()
返回在此相位器上注册的参与方数量。
Returns | |
---|---|
int |
the number of parties |
Phaser getRoot ()
返回该相位器的根祖先,如果它没有父对象,则与此相位器相同。
Returns | |
---|---|
Phaser |
the root ancestor of this phaser |
int getUnarrivedParties ()
返回尚未到达该移相器当前阶段的注册方的数量。 如果该移相器已终止,则返回的值是无意义的和任意的。
Returns | |
---|---|
int |
the number of unarrived parties |
boolean isTerminated ()
如果此移相器已终止,则返回 true
。
Returns | |
---|---|
boolean |
true if this phaser has been terminated |
int register ()
为这个移相器添加一个新的未获派的派对。 如果正在进行的onAdvance(int, int)
调用正在进行中,则此方法可能会在返回之前等待其完成。 如果该移相器有父母,并且该移相器之前没有注册方,则该孩子移相器也向其父母注册。 如果该相位器终止,则尝试注册不起作用,并返回负值。
Returns | |
---|---|
int |
the arrival phase number to which this registration applied. If this value is negative, then this phaser has terminated, in which case registration has no effect. |
Throws | |
---|---|
IllegalStateException |
if attempting to register more than the maximum supported number of parties |
String toString ()
返回标识此相位器的字符串以及其状态。 括号中的状态包括字符串"phase = "
后面跟着阶段号"parties = "
后面跟着注册方的数量, "arrived = "
后面跟着到达方的数量。
Returns | |
---|---|
String |
a string identifying this phaser, as well as its state |
boolean onAdvance (int phase, int registeredParties)
在即将发生相位超前时执行动作并控制终止的可覆盖方法。 该方法在队伍到达时推动该移相器(当所有其他等待方都处于休眠状态时)调用。 如果此方法返回true
,则该移相器将在前进时设置为最终终止状态,并且后续对isTerminated()
调用将返回true。 任何(未经检查的)由此方法的调用引发的异常或错误都会传播给试图推进该相位器的一方,在这种情况下不会发生进展。
这种方法的论据提供了当前转换所采用的相位器的状态。 从onAdvance
内调用到达,注册和等待方法对这个相位器的onAdvance
没有指定,不应该依赖。
如果这个移相器是一组阶段相位器的成员,那么 onAdvance
仅在每次前进时调用其根移相器。
支持最常见的用例,此方法的默认实现返回true
时注册方的数量已变为零作为党调用的结果arriveAndDeregister
。 您可以禁用此行为,从而可以在将来注册时继续执行此操作,方法是重写此方法始终返回false
:
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int parties) { return false; }
}
Parameters | |
---|---|
phase |
int : the current phase number on entry to this method, before this phaser is advanced |
registeredParties |
int : the current number of registered parties |
Returns | |
---|---|
boolean |
true if this phaser should terminate |