Most visited

Recently visited

Added in API level 24

CountedCompleter

public abstract class CountedCompleter
extends ForkJoinTask<T>

java.lang.Object
   ↳ java.util.concurrent.ForkJoinTask<T>
     ↳ java.util.concurrent.CountedCompleter<T>


一个ForkJoinTask ,触发时执行完成操作,并且没有剩余的待处理操作。 与其他形式的ForkJoinTasks相比,CountedCompleters在存在子任务停滞和阻塞方面通常更加健壮,但编程起来不太直观。 CountedCompleter的使用与其他基于完成的组件相似,只是可能需要多个待执行完成来触发完成操作onCompletion(CountedCompleter) ,而不仅仅是一个。 除非另有初始化, pending count开始于零,但也可以是(原子),使用方法改变setPendingCount(int)addToPendingCount(int) ,和compareAndSetPendingCount(int, int) 在调用tryComplete() ,如果挂起的操作计数为非零,则递减; 否则,完成动作被执行,并且如果这个完成者本身具有完成者,则该过程继续与其完成者。 与相关同步组件(如PhaserSemaphore ,这些方法仅影响内部计数; 他们不建立任何进一步的内部簿记。 具体而言,待定任务的身份不会被维护。 如下图所示,您可以创建子类,以便在需要时记录部分或全部未决任务或其结果。 如下所示,还提供了支持自定义完成遍历的实用程序方法。 但是,因为CountedCompleters只提供基本的同步机制,所以创建更多的抽象子类可能会很有用,这些子类可以维护适用于一组相关用法的链接,字段和其他支持方法。

具体的CountedCompleter类必须定义方法compute() ,应该在大多数情况下(如下图所示)在返回之前调用tryComplete()一次。 该类也可以可选地覆盖方法onCompletion(CountedCompleter)以在正常完成时执行动作,并且方法onExceptionalCompletion(Throwable, CountedCompleter)在任何异常时执行动作。

CountedCompleters通常不会产生结果,在这种情况下,它们通常被声明为CountedCompleter<Void> ,并且总是会返回null作为结果值。 在其他情况下,您应该重写方法getRawResult()以提供join(), invoke()和相关方法的结果。 一般而言,此方法应返回完成时保存结果的CountedCompleter对象的字段值(或一个或多个字段的函数)。 方法setRawResult(T)默认在CountedCompleters中不起作用。 可能但很少适用覆盖此方法以维护其他对象或保存结果数据的字段。

一个没有完成的getCompleter() (即getCompleter()返回null )可以用作一个常规的ForkJoinTask,具有这个附加功能。 然而,任何完成者又有另一个完成者只能作为其他计算的内部帮助者,所以它自己的任务状态(如isDone()等方法中所isDone() )是任意的; 这种状况只有在明确调用改变complete(T)cancel(boolean)completeExceptionally(Throwable)或方法的特殊结束后compute 在任何异常完成时,例外情况可能会传递给任务完成者(及其完成者等)(如果存在并且尚未完成)。 同样,取消一个内部CountedCompleter对该完成者只有一个本地影响,所以通常不会有用。

样本使用。

并行递归分解。 CountedCompleters可以安排在与RecursiveAction经常使用的树相似的树上,尽管设置它们的构造通常会有所不同。 这里,每个任务的完成者是计算树中的父代。 即使它们需要更多簿记,当将可能耗时的操作(不能进一步细分)应用到数组或集合的每个元素时,CountedCompleters可能会是更好的选择; 尤其是当操作需要比其他操作花费大量时间完成某些元素时,无论是因为内在变化(例如I / O)还是辅助效果(如垃圾收集)。 因为CountedCompleters提供了自己的延续,所以其他线程不需要阻止等待执行它们。

例如,下面是一个类的初始版本,它使用二分递归分解将工作分成单个部分(叶子任务)。 即使将工作分解为单个调用,基于树的技术通常也更适合直接分派叶任务,因为它们减少了线程间通信并提高了负载平衡。 在递归情况下,每对子任务中的第二onCompletion任务完成触发其父项完成(因为没有执行结果组合,方法onCompletion的默认无操作实现未被覆盖)。 静态实用程序方法设置基本任务并调用它(此处隐式使用commonPool() )。

 class MyOperation<E> { void apply(E e) { ... }  }

 class ForEach<E> extends CountedCompleter<Void> {

   public static <E> void forEach(E[] array, MyOperation<E> op) {
     new ForEach<E>(null, array, op, 0, array.length).invoke();
   }

   final E[] array; final MyOperation<E> op; final int lo, hi;
   ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
     super(p);
     this.array = array; this.op = op; this.lo = lo; this.hi = hi;
   }

   public void compute() { // version 1
     if (hi - lo >= 2) {
       int mid = (lo + hi) >>> 1;
       setPendingCount(2); // must set pending count before fork
       new ForEach(this, array, op, mid, hi).fork(); // right child
       new ForEach(this, array, op, lo, mid).fork(); // left child
     }
     else if (hi > lo)
       op.apply(array[lo]);
     tryComplete();
   }
 }
This design can be improved by noticing that in the recursive case, the task has nothing to do after forking its right task, so can directly invoke its left task before returning. (This is an analog of tail recursion removal.) Also, because the task returns upon executing its left task (rather than falling through to invoke tryComplete) the pending count is set to one:
 class ForEach<E> ... {
   ...
   public void compute() { // version 2
     if (hi - lo >= 2) {
       int mid = (lo + hi) >>> 1;
       setPendingCount(1); // only one pending
       new ForEach(this, array, op, mid, hi).fork(); // right child
       new ForEach(this, array, op, lo, mid).compute(); // direct invoke
     }
     else {
       if (hi > lo)
         op.apply(array[lo]);
       tryComplete();
     }
   }
 }
As a further optimization, notice that the left task need not even exist. Instead of creating a new one, we can iterate using the original task, and add a pending count for each fork. Additionally, because no task in this tree implements an onCompletion(CountedCompleter) method, tryComplete() can be replaced with propagateCompletion().
 class ForEach<E> ... {
   ...
   public void compute() { // version 3
     int l = lo, h = hi;
     while (h - l >= 2) {
       int mid = (l + h) >>> 1;
       addToPendingCount(1);
       new ForEach(this, array, op, mid, h).fork(); // right child
       h = mid;
     }
     if (h > l)
       op.apply(array[l]);
     propagateCompletion();
   }
 }
Additional optimizations of such classes might entail precomputing pending counts so that they can be established in constructors, specializing classes for leaf steps, subdividing by say, four, instead of two per iteration, and using an adaptive threshold instead of always subdividing down to single elements.

搜索。 CountedCompleters树可以在数据结构的不同部分中搜索值或属性,并在找到一个结果后立即报告结果AtomicReference 其他人可以查询结果以避免不必要的工作。 (您还可以执行其他任务,但通常让他们注意到结果已设置,并且如果跳过了进一步的处理,通常会更简单,更高效。)使用完全分区再次说明数组(在实践中,叶子任务几乎总是会处理多个元素):

 class Searcher<E> extends CountedCompleter<E> {
   final E[] array; final AtomicReference<E> result; final int lo, hi;
   Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
     super(p);
     this.array = array; this.result = result; this.lo = lo; this.hi = hi;
   }
   public E getRawResult() { return result.get(); }
   public void compute() { // similar to ForEach version 3
     int l = lo, h = hi;
     while (result.get() == null && h >= l) {
       if (h - l >= 2) {
         int mid = (l + h) >>> 1;
         addToPendingCount(1);
         new Searcher(this, array, result, mid, h).fork();
         h = mid;
       }
       else {
         E x = array[l];
         if (matches(x) && result.compareAndSet(null, x))
           quietlyCompleteRoot(); // root task is now joinable
         break;
       }
     }
     tryComplete(); // normally complete whether or not found
   }
   boolean matches(E e) { ... } // return true if found

   public static <E> E search(E[] array) {
       return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
   }
 }
In this example, as well as others in which tasks have no other effects except to compareAndSet a common result, the trailing unconditional invocation of tryComplete could be made conditional ( if (result.get() == null) tryComplete();) because no further bookkeeping is required to manage completions once the root task completes.

记录子任务。 将多个子任务的结果组合在一起的CountedCompleter任务通常需要通过方法onCompletion(CountedCompleter)访问这些结果。 如下面的类所示(执行映射和缩减的简化形式,映射和缩减都是类型E ),在分而治之设计中做到这一点的一种方式是让每个子任务记录其兄弟,以便它可以可通过方法onCompletion访问。 该技术适用于左右结果组合顺序无关紧要的减少; 有序的减少需要明确的左/右指定。 上述例子中看到的其他流线型的变体也可能适用。

 class MyMapper<E> { E apply(E v) {  ...  } }
 class MyReducer<E> { E apply(E x, E y) {  ...  } }
 class MapReducer<E> extends CountedCompleter<E> {
   final E[] array; final MyMapper<E> mapper;
   final MyReducer<E> reducer; final int lo, hi;
   MapReducer<E> sibling;
   E result;
   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
              MyReducer<E> reducer, int lo, int hi) {
     super(p);
     this.array = array; this.mapper = mapper;
     this.reducer = reducer; this.lo = lo; this.hi = hi;
   }
   public void compute() {
     if (hi - lo >= 2) {
       int mid = (lo + hi) >>> 1;
       MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
       MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
       left.sibling = right;
       right.sibling = left;
       setPendingCount(1); // only right is pending
       right.fork();
       left.compute();     // directly execute left
     }
     else {
       if (hi > lo)
           result = mapper.apply(array[lo]);
       tryComplete();
     }
   }
   public void onCompletion(CountedCompleter<?> caller) {
     if (caller != this) {
       MapReducer<E> child = (MapReducer<E>)caller;
       MapReducer<E> sib = child.sibling;
       if (sib == null || sib.result == null)
         result = child.result;
       else
         result = reducer.apply(child.result, sib.result);
     }
   }
   public E getRawResult() { return result; }

   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
     return new MapReducer<E>(null, array, mapper, reducer,
                              0, array.length).invoke();
   }
 }
Here, method onCompletion takes a form common to many completion designs that combine results. This callback-style method is triggered once per task, in either of the two different contexts in which the pending count is, or becomes, zero: (1) by a task itself, if its pending count is zero upon invocation of tryComplete, or (2) by any of its subtasks when they complete and decrement the pending count to zero. The caller argument distinguishes cases. Most often, when the caller is this, no action is necessary. Otherwise the caller argument can be used (usually via a cast) to supply a value (and/or links to other values) to be combined. Assuming proper use of pending counts, the actions inside onCompletion occur (once) upon completion of a task and its subtasks. No additional synchronization is required within this method to ensure thread safety of accesses to fields of this task or other completed tasks.

完成遍历 如果使用onCompletion处理完成不适用或不方便,则可以使用方法firstComplete()nextComplete()创建自定义遍历。 例如,要定义仅以第三个ForEach示例的形式分割右侧任务的MapReducer,完成必须合作减少未占用的子任务链接,这可以按如下方式完成:

 class MapReducer<E> extends CountedCompleter<E> { // version 2
   final E[] array; final MyMapper<E> mapper;
   final MyReducer<E> reducer; final int lo, hi;
   MapReducer<E> forks, next; // record subtask forks in list
   E result;
   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
              MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
     super(p);
     this.array = array; this.mapper = mapper;
     this.reducer = reducer; this.lo = lo; this.hi = hi;
     this.next = next;
   }
   public void compute() {
     int l = lo, h = hi;
     while (h - l >= 2) {
       int mid = (l + h) >>> 1;
       addToPendingCount(1);
       (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
       h = mid;
     }
     if (h > l)
       result = mapper.apply(array[l]);
     // process completions by reducing along and advancing subtask links
     for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) {
       for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next)
         t.result = reducer.apply(t.result, s.result);
     }
   }
   public E getRawResult() { return result; }

   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
     return new MapReducer<E>(null, array, mapper, reducer,
                              0, array.length, null).invoke();
   }
 }

触发。 一些CountedCompleters本身从不分叉,而是在其他设计中用作管道的一部分; 包括那些完成一个或多个异步任务的人触发另一个异步任务。 例如:

 class HeaderBuilder extends CountedCompleter<...> { ... }
 class BodyBuilder extends CountedCompleter<...> { ... }
 class PacketSender extends CountedCompleter<...> {
   PacketSender(...) { super(null, 1); ... } // trigger on second completion
   public void compute() { } // never called
   public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
 }
 // sample use:
 PacketSender p = new PacketSender();
 new HeaderBuilder(p, ...).fork();
 new BodyBuilder(p, ...).fork();

Summary

Protected constructors

CountedCompleter(CountedCompleter<?> completer, int initialPendingCount)

用给定的完成者和初始等待计数创建一个新的CountedCompleter。

CountedCompleter(CountedCompleter<?> completer)

用给定完成者和初始未决计数为零创建一个新的CountedCompleter。

CountedCompleter()

创建一个没有完成者并且初始未决计数为零的新的CountedCompleter。

Public methods

final void addToPendingCount(int delta)

将给定值添加(原子)到未决计数。

final boolean compareAndSetPendingCount(int expected, int count)

只有在当前持有给定期望值的情况下,才会将待处理计数设置(原子)为给定计数。

void complete(T rawResult)

无论等待计数如何,调用 onCompletion(CountedCompleter) ,将此任务标记为完成,并在此任务的完成者(如果存在)上进一步触发 tryComplete()

abstract void compute()

此任务执行的主要计算。

final int decrementPendingCountUnlessZero()

如果挂起的计数不为零,(自动)将其递减。

final CountedCompleter<?> firstComplete()

如果此任务的挂起计数为零,则返回此任务; 否则递减其未决计数并返回null

final CountedCompleter<?> getCompleter()

返回在此任务的构造函数中建立的完成者,如果没有,则返回 null

final int getPendingCount()

返回当前未决计数。

T getRawResult()

返回计算结果。

final CountedCompleter<?> getRoot()

返回当前计算的根; 即,如果这项任务没有完成者,那么它是完成者的根。

final void helpComplete(int maxTasks)

如果此任务尚未完成,则尝试最多处理完成路径上此任务的其他未处理任务的数量(如果存在任何已知任务)。

final CountedCompleter<?> nextComplete()

如果此任务没有完成者,则调用 quietlyComplete()并返回 null

void onCompletion(CountedCompleter<?> caller)

当调用方法 tryComplete()并且待处理计数为零时,或调用无条件方法 complete(T)时,执行操作。

boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller)

当方法执行一个动作 completeExceptionally(Throwable)调用或方法 compute()抛出一个异常,而这个任务尚未否则正常完成。

final void propagateCompletion()

相当于tryComplete()但不会沿着完成路径调用onCompletion(CountedCompleter) :如果挂起计数不为零,则递减计数; 否则,同样尝试完成此任务的完成者(如果存在),否则将此任务标记为完成。

final void quietlyCompleteRoot()

相当于 getRoot().quietlyComplete()

final void setPendingCount(int count)

将待处理计数设置为给定值。

final void tryComplete()

如果挂起计数不为零,则减少计数; 否则调用onCompletion(CountedCompleter) ,然后类似地尝试完成此任务的完成者(如果存在),否则将此任务标记为完成。

Protected methods

final boolean exec()

为CountedCompleters实现执行约定。

void setRawResult(T t)

结果支持的CountedCompleters可以选择用于帮助维护结果数据的方法。

Inherited methods

From class java.util.concurrent.ForkJoinTask
From class java.lang.Object
From interface java.util.concurrent.Future

Protected constructors

CountedCompleter

Added in API level 24
CountedCompleter (CountedCompleter<?> completer, 
                int initialPendingCount)

用给定的完成者和初始等待计数创建一个新的CountedCompleter。

Parameters
completer CountedCompleter: this task's completer, or null if none
initialPendingCount int: the initial pending count

CountedCompleter

Added in API level 24
CountedCompleter (CountedCompleter<?> completer)

用给定完成者和初始未决计数为零创建一个新的CountedCompleter。

Parameters
completer CountedCompleter: this task's completer, or null if none

CountedCompleter

Added in API level 24
CountedCompleter ()

创建一个没有完成者并且初始未决计数为零的新的CountedCompleter。

Public methods

addToPendingCount

Added in API level 24
void addToPendingCount (int delta)

将给定值添加(原子)到未决计数。

Parameters
delta int: the value to add

compareAndSetPendingCount

Added in API level 24
boolean compareAndSetPendingCount (int expected, 
                int count)

只有在当前持有给定期望值的情况下,才会将待处理计数设置(原子)为给定计数。

Parameters
expected int: the expected value
count int: the new value
Returns
boolean true if successful

complete

Added in API level 24
void complete (T rawResult)

无论待处理计数如何,调用onCompletion(CountedCompleter) ,都将此任务标记为完成,并在此任务的完成者(如果存在)上进一步触发tryComplete() 在调用onCompletion(CountedCompleter)或将此任务标记为完成之前,将给定的rawResult用作参数setRawResult(T) ; 它的值只对覆盖setRawResult类才有意义。 此方法不会修改挂起的计数。

当获得任何一个(相对于所有)几个子任务结果时强制完成该方法可能很有用。 然而,在普通(和推荐)的情况下, setRawResult没有被覆盖,这个效果可以更简单地使用quietlyCompleteRoot()获得。

Parameters
rawResult T: the raw result

compute

Added in API level 24
void compute ()

此任务执行的主要计算。

decrementPendingCountUnlessZero

Added in API level 24
int decrementPendingCountUnlessZero ()

如果挂起的计数不为零,(自动)将其递减。

Returns
int the initial (undecremented) pending count holding on entry to this method

firstComplete

Added in API level 24
CountedCompleter<?> firstComplete ()

如果此任务的挂起计数为零,则返回此任务; 否则递减其未决计数并返回null 此方法旨在与完成遍历循环中的nextComplete()一起使用。

Returns
CountedCompleter<?> this task, if pending count was zero, else null

getCompleter

Added in API level 24
CountedCompleter<?> getCompleter ()

返回在此任务的构造函数中建立的完成者,如果没有,则返回 null

Returns
CountedCompleter<?> the completer

getPendingCount

Added in API level 24
int getPendingCount ()

返回当前未决计数。

Returns
int the current pending count

getRawResult

Added in API level 24
T getRawResult ()

返回计算结果。 默认情况下,返回null ,适用于Void操作,但在其他情况下应该重写,几乎总是返回一个字段或保存结果的字段的函数。

Returns
T the result of the computation

getRoot

Added in API level 24
CountedCompleter<?> getRoot ()

返回当前计算的根; 即,如果这项任务没有完成者,那么它是完成者的根。

Returns
CountedCompleter<?> the root of the current computation

helpComplete

Added in API level 24
void helpComplete (int maxTasks)

如果此任务尚未完成,则尝试最多处理完成路径上此任务的其他未处理任务的数量(如果存在任何已知任务)。

Parameters
maxTasks int: the maximum number of tasks to process. If less than or equal to zero, then no tasks are processed.

nextComplete

Added in API level 24
CountedCompleter<?> nextComplete ()

如果此任务没有完成者,则调用quietlyComplete()并返回null 或者,如果完成者的未决计数不为零,则递减未决计数并返回null 否则,返回完成者。 此方法可用作同构任务层次结构的完成遍历循环的一部分:

 for (CountedCompleter<?> c = firstComplete();
      c != null;
      c = c.nextComplete()) {
   // ... process c ...
 }

Returns
CountedCompleter<?> the completer, or null if none

onCompletion

Added in API level 24
void onCompletion (CountedCompleter<?> caller)

当调用方法tryComplete()并且待处理计数为零时,或调用无条件方法complete(T)时,执行操作。 默认情况下,这个方法什么都不做。 您可以通过检查给定调用者参数的身份来区分个案。 如果不等于this ,那么它通常是一个子任务,可能包含要结合的结果(和/或指向其他结果的链接)。

Parameters
caller CountedCompleter: the task invoking this method (which may be this task itself)

onExceptionalCompletion

Added in API level 24
boolean onExceptionalCompletion (Throwable ex, 
                CountedCompleter<?> caller)

当方法执行一个动作completeExceptionally(Throwable)调用或方法compute()抛出一个异常,而这个任务尚未否则正常完成。 在进入这个方法时,这个任务isCompletedAbnormally() 此方法的返回值控制进一步传播:如果true和此任务的完成程序尚未完成,则该完成程序也将异常完成,但与此完成程序的例外情况相同。 此方法的默认实现除了返回true之外什么也不true

Parameters
ex Throwable: the exception
caller CountedCompleter: the task invoking this method (which may be this task itself)
Returns
boolean true if this exception should be propagated to this task's completer, if one exists

propagateCompletion

Added in API level 24
void propagateCompletion ()

等同于tryComplete()但不会沿着完成路径调用onCompletion(CountedCompleter) :如果挂起计数不为零,则递减计数; 否则,同样尝试完成此任务的完成者(如果存在),否则将此任务标记为完成。 这种方法在onCompletion不应该或不需要被计算中的每个完成者调用的情况下可能是有用的。

quietlyCompleteRoot

Added in API level 24
void quietlyCompleteRoot ()

相当于 getRoot().quietlyComplete()

setPendingCount

Added in API level 24
void setPendingCount (int count)

将待处理计数设置为给定值。

Parameters
count int: the count

tryComplete

Added in API level 24
void tryComplete ()

如果挂起计数不为零,则减少计数; 否则调用onCompletion(CountedCompleter) ,然后类似地尝试完成此任务的完成者(如果存在),否则将此任务标记为完成。

Protected methods

exec

Added in API level 24
boolean exec ()

为CountedCompleters实现执行约定。

Returns
boolean true if this task is known to have completed normally

setRawResult

Added in API level 24
void setRawResult (T t)

结果支持的CountedCompleters可以选择用于帮助维护结果数据的方法。 默认情况下,什么都不做。 不推荐覆盖。 但是,如果重写此方法以更新现有对象或字段,则通常必须将其定义为线程安全的。

Parameters
t T: the value

Hooray!