您可以直接使用StructuredTaskScope
,不使用ShutdownOnFailure
来等待所有作业(job)完成,然后,您可以按预定的顺序判断结果和失败,例如
static Response simpleApproach() throws ExecutionException, InterruptedException {
try(var scope = new StructuredTaskScope<>()) {
Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));
scope.join();
var data1 = get(d1Subtask);
var data2 = get(d2Subtask);
return new Response(data1, data2);
}
}
static <T> T get(Subtask<T> task) throws ExecutionException {
if(task.state() == State.FAILED)
throw new ExecutionException(task.exception());
return task.get();
}
这是最简单的方法.它确保如果两个作业(job)都失败,则将"data1"异常传播给调用方.唯一的缺点是,如果"data1"在"data2"的S完成之前失败,它将等待"data2",而不会try 中断它.然而,这可能是可以接受的,因为我们通常不会试图(过于努力地)优化例外情况.
但你也可以实施你自己的政策.下面是一个具有"主要工作"的策略的示例.当其他作业(job)失败时,它将等待主作业(job)完成,如果它也失败了,它会优先 Select 它的异常.但当主作业(job)失败时,它将立即关闭,try 中断所有其他作业(job),而不是等待它们完成:
static Response customPolicy() throws ExecutionException, InterruptedException {
try(var scope = new ShutdownOnPrimaryFailure<>()) {
Subtask<Data1> d1Subtask = scope.forkPrimary(() -> getData1(input));
Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));
scope.join().throwIfFailed();
var data1 = d1Subtask.get();
var data2 = d2Subtask.get();
return new Response(data1, data2);
}
}
class ShutdownOnPrimaryFailure<T> extends StructuredTaskScope<T> {
private final AtomicReference<Throwable> failure = new AtomicReference<>();
private Subtask<?> primary;
public <U extends T> Subtask<U> forkPrimary(Callable<? extends U> task) {
ensureOwnerAndJoined();
Subtask<U> forked = super.fork(task);
primary = forked;
return forked;
}
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
super.handleComplete(subtask);
if(subtask.state() == State.FAILED) {
if(subtask == primary) {
failure.set(subtask.exception());
shutdown();
}
else failure.compareAndSet(null, subtask.exception());
}
}
@Override
public ShutdownOnPrimaryFailure<T> join() throws InterruptedException {
super.join();
primary = null;
return this;
}
@Override
public ShutdownOnPrimaryFailure<T> joinUntil(Instant deadline)
throws InterruptedException, TimeoutException {
super.joinUntil(deadline);
primary = null;
return this;
}
public void throwIfFailed() throws ExecutionException {
ensureOwnerAndJoined();
Throwable t = failure.get();
if(t != null) throw new ExecutionException(t);
}
}
为完整起见,我在本答案的末尾提供了用于测试所有场景的代码.它判断成功和失败的所有组合.
使用已实现的方法,它将打印
*** Original
D1 ↓ D2 → SUCCESS D1 D2 FAIL_FAST D1 D2 FAIL_SLOW D1 D2
SUCCESS: Success F F Data2 Fail F F Data2 Fail F F
FAIL_FAST: Data1 Fail F F - F F Data1 Fail F I
FAIL_SLOW: Data1 Fail F F Data2 Fail I F - I F
*** Simple
D1 ↓ D2 → SUCCESS D1 D2 FAIL_FAST D1 D2 FAIL_SLOW D1 D2
SUCCESS: Success F F Data2 Fail F F Data2 Fail F F
FAIL_FAST: Data1 Fail F F - F F Data1 Fail F F
FAIL_SLOW: Data1 Fail F F Data1 Fail F F - F F
*** Custom Policy
D1 ↓ D2 → SUCCESS D1 D2 FAIL_FAST D1 D2 FAIL_SLOW D1 D2
SUCCESS: Success F F Data2 Fail F F Data2 Fail F F
FAIL_FAST: Data1 Fail F F - F F Data1 Fail F I
FAIL_SLOW: Data1 Fail F F Data1 Fail F F - F F
缩写.状态:F已完成、I已中断或R正在运行
问题是,在第三行的中间,D1缓慢失败,D2快速失败.然后,ShutdownOnFailure
中止d1(d1状态I中断)并传播d2‘S故障.简单的方法清楚地修复了它,但当d1快速失败时失go 了快速失败的能力(第二行中的最后一个场景,d2状态现在为F初始化).自定义策略解决了原始问题,同时保留了快速故障支持.
public class StructuredExample {
public static void main(String[] args) {
record Approach(String name, Callable<?> method) {}
List<Approach> approaches = List.of(
new Approach("Original", StructuredExample::originalApproach),
new Approach("Simple", StructuredExample::simpleApproach),
new Approach("Custom Policy", StructuredExample::customPolicy));
for(var approach: approaches) {
System.out.println(" *** " + approach.name());
System.out.printf("%-12s", "D1 \u2193 D2 \u2192");
for(Mode d2Mode: Mode.values()) System.out.printf("%-12s D1 D2 ", d2Mode);
System.out.println();
for(Mode d1Mode: Mode.values()) {
System.out.printf("%-12s", d1Mode + ":");
for(Mode d2Mode: Mode.values()) {
String result = "-";
if(d2Mode == Mode.SUCCESS || d1Mode != d2Mode) try {
ScopedValue.where(data1Mode, d1Mode)
.where(data2Mode, d2Mode)
.call(() -> approach.method().call());
result = "Success";
}
catch(ExecutionException ex) { result = ex.getCause().getMessage(); }
catch(Exception ex) { result = ex.getMessage(); }
System.out.printf("%-12s%3s%3s ", result, d1Running.name().charAt(0), d2Running.name().charAt(0));
}
System.out.println();
}
System.out.println();
}
}
// mock for the getData1 and getData2 operations, producing success or failure and recording running state
enum Mode { SUCCESS, FAIL_FAST, FAIL_SLOW }
enum StateDebug { RUNNING, FINISHED, INTERRUPTED; }
static final ScopedValue<Mode> data1Mode = ScopedValue.newInstance();
static final ScopedValue<Mode> data2Mode = ScopedValue.newInstance();
static volatile StateDebug d1Running, d2Running;
static Data1 getData1(Object input) throws Exception {
return getDataImpl("Data1", data1Mode, Data1::new, s -> d1Running = s);
}
static Data2 getData2(Object input) throws Exception {
return getDataImpl("Data2", data2Mode, Data2::new, s -> d2Running = s);
}
static <T> T getDataImpl(String which, ScopedValue<Mode> mode, Supplier<T> s, Consumer<StateDebug> c) throws Exception {
c.accept(StateDebug.RUNNING);
boolean interrupted = false;
try {
Thread.sleep(500);
switch(mode.get()) {
case SUCCESS: return s.get();
case FAIL_SLOW: Thread.sleep(500);
}
throw new Exception(which + " Fail");
}
catch(InterruptedException ex) {
interrupted = true;
c.accept(StateDebug.INTERRUPTED);
throw ex;
}
finally {
if(!interrupted) c.accept(StateDebug.FINISHED);
}
}
// dummy data and types
record Data1() {}
record Data2() {}
record Response(Data1 data1, Data2 data2) {}
static Object input;
// the implementations
static Response originalApproach() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));
scope.join().throwIfFailed(); // [1]
var data1 = d1Subtask.get(); // [2]
var data2 = d2Subtask.get();
return new Response(data1, data2);
}
}
static Response simpleApproach() throws ExecutionException, InterruptedException {
try(var scope = new StructuredTaskScope<>()) {
Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));
scope.join();
var data1 = get(d1Subtask);
var data2 = get(d2Subtask);
return new Response(data1, data2);
}
}
static <T> T get(Subtask<T> task) throws ExecutionException {
if(task.state() == State.FAILED)
throw new ExecutionException(task.exception());
return task.get();
}
static Response customPolicy() throws ExecutionException, InterruptedException {
try(var scope = new ShutdownOnPrimaryFailure<>()) {
Subtask<Data1> d1Subtask = scope.forkPrimary(() -> getData1(input));
Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));
scope.join().throwIfFailed();
var data1 = d1Subtask.get();
var data2 = d2Subtask.get();
return new Response(data1, data2);
}
}
}
class ShutdownOnPrimaryFailure<T> extends StructuredTaskScope<T> {
private final AtomicReference<Throwable> failure = new AtomicReference<>();
private Subtask<?> primary;
public <U extends T> Subtask<U> forkPrimary(Callable<? extends U> task) {
ensureOwnerAndJoined();
Subtask<U> forked = super.fork(task);
primary = forked;
return forked;
}
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
super.handleComplete(subtask);
if(subtask.state() == State.FAILED) {
if(subtask == primary) {
failure.set(subtask.exception());
shutdown();
}
else failure.compareAndSet(null, subtask.exception());
}
}
@Override
public ShutdownOnPrimaryFailure<T> join() throws InterruptedException {
super.join();
primary = null;
return this;
}
@Override
public ShutdownOnPrimaryFailure<T> joinUntil(Instant deadline) throws InterruptedException, TimeoutException {
super.joinUntil(deadline);
primary = null;
return this;
}
public void throwIfFailed() throws ExecutionException {
ensureOwnerAndJoined();
Throwable t = failure.get();
if(t != null) throw new ExecutionException(t);
}
}