我在一个定期重复的Spring Boot Web服务中安排了一个任务(@EnableSceduling).当该任务触发时,它调用注册对象的Runnable/Run方法.在Run方法中,我需要做工作,在工作完成之前不退出Run方法.问题是我让其他线程做这个Run线程工作所需的其他工作.因此,在Run线程中,我有如下内容:

@Component
public class DoWork implements Runnable {
    
    @override
    public void run() {
    
    // Setup clients.

    // Call services.
    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback()); 
    
    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback()); 

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback());
        
    while(callbacksWorkCompletedFlag == false) {
        
            Thread.sleep (1000);
        }

        // Do computation with callback responses.

        // After computation is completed, exit run method.
    }
}

public class MyResponseCallback implements Consumer<String> {
    
    @override
    public void accept (final Sting response) {
    
        // Do work with response.
    }
}

public class MyErrorCallback implements Consumer<Throwable> {
    
    @override
    public void accept (final Throwable error) {
    
        // Log error.
    }
}

在Java/Spring Boot中有没有更好的方法来做到这一点?

推荐答案

下面是一个使用CompletableFuture的示例.它使用Mono.subscribe的第三个参数来让将来知道它何时完成.

@Override
public void run() {
    Mono<String> response1 = client1.post();
    CompletableFuture<?> future1 = new CompletableFuture<>();
    response1.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> future1.complete(null));

    Mono<String> response2 = client2.post();
    CompletableFuture<?> future2 = new CompletableFuture<>();
    response2.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> future2.complete(null));

    Mono<String> responseX = clientX.post();
    CompletableFuture<?> futureX = new CompletableFuture<>();
    responseX.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> futureX.complete(null));
    
    CompletableFuture.allOf(future1, future2, futureX).join();
}

下面是一个CountDownLatch个例子:

@Override
public void run() {
    CountDownLatch latch = new CountDownLatch(3);

    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);

    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
    
    try {
        latch.await();
    } catch (InterruptedException ex) {}
}

另有CompletableFuture个例子:

@Override
public void run() {
    List<CompletableFuture<?>> futures = new ArrayList<>();
    Supplier<Runnable> onDone = () -> {
        CompletableFuture<?> future = new CompletableFuture<>();
        futures.add(future);
        return () -> future.complete(null);
    };

    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
}

所有的回调都是必需的吗?

@Override
public void run() {
    // Make requests
    Mono<String> responseMono1 = client1.post();
    Mono<String> responseMono2 = client2.post();
    Mono<String> responseMonoX = clientX.post();
    try {
        // Wait for requests to complete
        String response1 = responseMono1.block();
        String response2 = responseMono2.block();
        String responseX = responseMonoX.block();

        ...
    }
    catch (RuntimeException e) {
        ...
    }
}

Java相关问答推荐

基于仅存在于父级中的字段查询子文档?

Java中不同包中的类之间的配置共享

Quarkus keycloat配置不工作.quarkus. keycloak. policy—enforcer. enable = true在. yaml表示中不工作

Java Swing:初始化身份验证类后未检测到ATM_Interface键事件

计算两个浮点数之间的距离是否对称?

Com.example.service.QuestionService中的构造函数的参数0需要找不到的类型为';com.example.Dao.QuestionDao;的Bean

如何在Java记录中设置BigDecimal类型属性的精度?

如何将Pane的图像快照保存为BMP?

在Frege中,我如何将一个字符串安全地转换为一个可能的Int?

try 使用类来包含JSON响应

如何使用Criteria Builder处理一对多关系中的空值?

如何调整JButton的大小以适应图标?

模拟JUnit未检测到返回字符串的方法的任何声纳覆盖

如何通过用户ID向用户发送私信

如何用Micrometer&;斯普肯

放置在变量中的Java成员引用不相等

如何在java中从以百分比表示的经过时间和结束日期中找到开始日期

带有提取器的JavaFXObservableList会根据侦听器的存在而改变行为

可以';不要在Intellij IDEA中使用最新的Java版本(JDK 21)

在数组列表中找到对象后,未从数组中删除对象