我对并行代码相当陌生,我试图将一些基于执行器的代码转换为 struct 化并发,但我丢失了一个必须以某种方式保留的重要属性.

在Java 21预览版中使用 struct 化并发的代码如下:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
    Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));

    scope.join().throwIfFailed(); // [1]

    var data1 = d1Subtask.get(); // [2]
    var data2 = d2Subtask.get();

    return new Response(data1, data2);
}

[1]中,最终抛出两个子任务中的第一个异常,我不希望发生这种情况.我需要并行运行这两个任务,但我首先需要d1Subtask的结果,以防它失败.换句话说:

  • 如果d1Subtask失败,我需要抛出它的异常(d2Subtask可能正在运行,可能成功,也可能失败,这些都不重要,d1Subtask的异常使第二个任务变得无关紧要);
  • 如果d1Subtask个成功,d2Subtask个失败,我需要d2Subtask的例外;
  • 如果两者都成功,则将两者的结果结合起来.

如果我将其更改为scope.join();,则如果d1Subtask未完成,则[2]可能会失败.有d1Subtask.state()个,但等待它离开State.UNAVAILABLE个状态似乎违背了 struct 化并发的 idea .

这可以通过Executors和pure StructuredTaskScope来实现,但这意味着即使作用域可以关闭并且任务中止,也可能会运行d2Subtask直到完成.

考虑到这一点,是否可以修改上面的代码,以一种干净、可读的方式等待d1Subtask的结果?我想象着scope.join(d1Subtask)d1Subtask.join()之类的东西会是这样做的方式,或者如果API存在的话,可能是一个不同的策略.


编辑:更清楚地解释所需的逻辑以及每种可能的结果.

推荐答案

您可以直接使用StructuredTaskScope,不使用ShutdownOnFailure来等待所有作业(job)完成,然后,您可以按预定的顺序判断结果和失败,例如

static Response simpleApproach() throws ExecutionException, InterruptedException {
    try(var scope = new StructuredTaskScope<>()) {
        Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
        Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));

        scope.join();

        var data1 = get(d1Subtask);
        var data2 = get(d2Subtask);

        return new Response(data1, data2);
    }
}

static <T> T get(Subtask<T> task) throws ExecutionException {
    if(task.state() == State.FAILED)
        throw new ExecutionException(task.exception());
    return task.get();
}

这是最简单的方法.它确保如果两个作业(job)都失败,则将"data1"异常传播给调用方.唯一的缺点是,如果"data1"在"data2"的S完成之前失败,它将等待"data2",而不会try 中断它.然而,这可能是可以接受的,因为我们通常不会试图(过于努力地)优化例外情况.

但你也可以实施你自己的政策.下面是一个具有"主要工作"的策略的示例.当其他作业(job)失败时,它将等待主作业(job)完成,如果它也失败了,它会优先 Select 它的异常.但当主作业(job)失败时,它将立即关闭,try 中断所有其他作业(job),而不是等待它们完成:

static Response customPolicy() throws ExecutionException, InterruptedException {
    try(var scope = new ShutdownOnPrimaryFailure<>()) {
        Subtask<Data1> d1Subtask = scope.forkPrimary(() -> getData1(input));
        Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));

        scope.join().throwIfFailed();

        var data1 = d1Subtask.get();
        var data2 = d2Subtask.get();

        return new Response(data1, data2);
    }
}
class ShutdownOnPrimaryFailure<T> extends StructuredTaskScope<T> {
    private final AtomicReference<Throwable> failure = new AtomicReference<>();
    private Subtask<?> primary;

    public <U extends T> Subtask<U> forkPrimary(Callable<? extends U> task) {
        ensureOwnerAndJoined();
        Subtask<U> forked = super.fork(task);
        primary = forked;
        return forked;
    }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        super.handleComplete(subtask);
        if(subtask.state() == State.FAILED) {
            if(subtask == primary) {
                failure.set(subtask.exception());
                shutdown();
            }
            else failure.compareAndSet(null, subtask.exception());
        }
    }

    @Override
    public ShutdownOnPrimaryFailure<T> join() throws InterruptedException {
        super.join();
        primary = null;
        return this;
    }

    @Override
    public ShutdownOnPrimaryFailure<T> joinUntil(Instant deadline)
        throws InterruptedException, TimeoutException {

        super.joinUntil(deadline);
        primary = null;
        return this;
    }

    public void throwIfFailed() throws ExecutionException {
        ensureOwnerAndJoined();
        Throwable t = failure.get();
        if(t != null) throw new ExecutionException(t);
    }
}

为完整起见,我在本答案的末尾提供了用于测试所有场景的代码.它判断成功和失败的所有组合.

使用已实现的方法,它将打印

  *** Original
D1 ↓  D2 →  SUCCESS      D1 D2   FAIL_FAST    D1 D2   FAIL_SLOW    D1 D2
SUCCESS:    Success       F  F   Data2 Fail    F  F   Data2 Fail    F  F
FAIL_FAST:  Data1 Fail    F  F   -             F  F   Data1 Fail    F  I
FAIL_SLOW:  Data1 Fail    F  F   Data2 Fail    I  F   -             I  F

  *** Simple
D1 ↓  D2 →  SUCCESS      D1 D2   FAIL_FAST    D1 D2   FAIL_SLOW    D1 D2
SUCCESS:    Success       F  F   Data2 Fail    F  F   Data2 Fail    F  F
FAIL_FAST:  Data1 Fail    F  F   -             F  F   Data1 Fail    F  F
FAIL_SLOW:  Data1 Fail    F  F   Data1 Fail    F  F   -             F  F

  *** Custom Policy
D1 ↓  D2 →  SUCCESS      D1 D2   FAIL_FAST    D1 D2   FAIL_SLOW    D1 D2
SUCCESS:    Success       F  F   Data2 Fail    F  F   Data2 Fail    F  F
FAIL_FAST:  Data1 Fail    F  F   -             F  F   Data1 Fail    F  I
FAIL_SLOW:  Data1 Fail    F  F   Data1 Fail    F  F   -             F  F

缩写.状态:F已完成、I已中断或R正在运行

问题是,在第三行的中间,D1缓慢失败,D2快速失败.然后,ShutdownOnFailure中止d1(d1状态I中断)并传播d2‘S故障.简单的方法清楚地修复了它,但当d1快速失败时失go 了快速失败的能力(第二行中的最后一个场景,d2状态现在为F初始化).自定义策略解决了原始问题,同时保留了快速故障支持.

public class StructuredExample {
    public static void main(String[] args) {
        record Approach(String name, Callable<?> method) {}
        List<Approach> approaches = List.of(
            new Approach("Original", StructuredExample::originalApproach),
            new Approach("Simple", StructuredExample::simpleApproach),
            new Approach("Custom Policy", StructuredExample::customPolicy));

        for(var approach: approaches) {
            System.out.println("  *** " + approach.name());
            System.out.printf("%-12s", "D1 \u2193  D2 \u2192");
            for(Mode d2Mode: Mode.values()) System.out.printf("%-12s D1 D2   ", d2Mode);
            System.out.println();
            for(Mode d1Mode: Mode.values()) {
                System.out.printf("%-12s", d1Mode + ":");
                for(Mode d2Mode: Mode.values()) {
                    String result = "-";
                    if(d2Mode == Mode.SUCCESS || d1Mode != d2Mode) try {
                        ScopedValue.where(data1Mode, d1Mode)
                            .where(data2Mode, d2Mode)
                            .call(() -> approach.method().call());
                        result = "Success";
                    }
                    catch(ExecutionException ex) { result = ex.getCause().getMessage(); }
                    catch(Exception ex) { result = ex.getMessage(); }
                    System.out.printf("%-12s%3s%3s   ", result, d1Running.name().charAt(0), d2Running.name().charAt(0));
                }
                System.out.println();
            }
            System.out.println();
        }
    }

    // mock for the getData1 and getData2 operations, producing success or failure and recording running state

    enum Mode { SUCCESS, FAIL_FAST, FAIL_SLOW }
    enum StateDebug { RUNNING, FINISHED, INTERRUPTED; }

    static final ScopedValue<Mode> data1Mode = ScopedValue.newInstance();
    static final ScopedValue<Mode> data2Mode = ScopedValue.newInstance();

    static volatile StateDebug d1Running, d2Running;

    static Data1 getData1(Object input) throws Exception {
        return getDataImpl("Data1", data1Mode, Data1::new, s -> d1Running = s);
    }

    static Data2 getData2(Object input) throws Exception {
        return getDataImpl("Data2", data2Mode, Data2::new, s -> d2Running = s);
    }

    static <T> T getDataImpl(String which, ScopedValue<Mode> mode, Supplier<T> s, Consumer<StateDebug> c) throws Exception {
        c.accept(StateDebug.RUNNING);
        boolean interrupted = false;
        try {
            Thread.sleep(500);
            switch(mode.get()) {
                case SUCCESS: return s.get();
                case FAIL_SLOW: Thread.sleep(500);
            }
            throw new Exception(which + " Fail");
        }
        catch(InterruptedException ex) {
            interrupted = true;
            c.accept(StateDebug.INTERRUPTED);
            throw ex;
        }
        finally {
            if(!interrupted) c.accept(StateDebug.FINISHED);
        }
    }

    // dummy data and types

    record Data1() {}
    record Data2() {}

    record Response(Data1 data1, Data2 data2)  {}

    static Object input;

    // the implementations

    static Response originalApproach() throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
            Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));
    
            scope.join().throwIfFailed(); // [1]
    
            var data1 = d1Subtask.get(); // [2]
            var data2 = d2Subtask.get();
    
            return new Response(data1, data2);
        }
    }

    static Response simpleApproach() throws ExecutionException, InterruptedException {
        try(var scope = new StructuredTaskScope<>()) {
            Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
            Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));

            scope.join();

            var data1 = get(d1Subtask);
            var data2 = get(d2Subtask);

            return new Response(data1, data2);
        }
    }

    static <T> T get(Subtask<T> task) throws ExecutionException {
        if(task.state() == State.FAILED)
            throw new ExecutionException(task.exception());
        return task.get();
    }

    static Response customPolicy() throws ExecutionException, InterruptedException {
        try(var scope = new ShutdownOnPrimaryFailure<>()) {
            Subtask<Data1> d1Subtask = scope.forkPrimary(() -> getData1(input));
            Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));

            scope.join().throwIfFailed();

            var data1 = d1Subtask.get();
            var data2 = d2Subtask.get();

            return new Response(data1, data2);
        }
    }
}

class ShutdownOnPrimaryFailure<T> extends StructuredTaskScope<T> {
    private final AtomicReference<Throwable> failure = new AtomicReference<>();
    private Subtask<?> primary;

    public <U extends T> Subtask<U> forkPrimary(Callable<? extends U> task) {
        ensureOwnerAndJoined();
        Subtask<U> forked = super.fork(task);
        primary = forked;
        return forked;
    }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        super.handleComplete(subtask);
        if(subtask.state() == State.FAILED) {
            if(subtask == primary) {
                failure.set(subtask.exception());
                shutdown();
            }
            else failure.compareAndSet(null, subtask.exception());
        }
    }

    @Override
    public ShutdownOnPrimaryFailure<T> join() throws InterruptedException {
        super.join();
        primary = null;
        return this;
    }

    @Override
    public ShutdownOnPrimaryFailure<T> joinUntil(Instant deadline) throws InterruptedException, TimeoutException {
        super.joinUntil(deadline);
        primary = null;
        return this;
    }

    public void throwIfFailed() throws ExecutionException {
        ensureOwnerAndJoined();
        Throwable t = failure.get();
        if(t != null) throw new ExecutionException(t);
    }
}

Java相关问答推荐

如何在SystemiccationRetryListenerSupport中获得类级别的spring retryable annotation中指定的标签?

方法没有用正确的值填充数组—而是将数组保留为null,'

如何转换Tue Feb 27 2024 16:35:30 GMT +0800 String至ZonedDateTime类型""

即使我正在使用并发方法,使用Javascript的应用程序也会继续冻结'

现场观看Android Studio中的变化

在AVL树的Remove方法中使用NoSuchElementException时遇到问题

如何正确创建序列图?

无法了解Java线程所消耗的时间

测试何时使用Mockito强制转换对象会导致ClassCastException

自定义批注的外推属性值

使用Jolt将字段转换为列表

在添加AdMob时无法为Google Play构建应用程序包:JVM垃圾收集器崩溃和JVM内存耗尽

在VS代码中,如何启用Java Main函数的&Q;Run|DEBUG&Q;代码?

Java泛型类方法的静态返回类型是否被类型擦除?

在使用具有不同成本的谓词调用allMatch之前对Java流进行排序会带来什么好处吗?

在应用getCellFormula()时,Excel引用中的文件名始终为";[1]";使用Apache POI()

如何制作回文程序?

在Java中比较同一多维数组的两个不同的字符串元素

如何使用Rascal Evaluator从编译的JAR访问Rascal函数?

升级版本后出现非法访问错误