我有一段代码,我可以使用Flux在循环中执行一段逻辑,中间有一些延迟.像这样的东西,

Flux.defer(() -> service.doSomething())
            .repeatWhen(v -> Flux.interval(Duration.ofSeconds(10)))
            .map(data -> mapper(data)) //map data
            .takeUntil(v -> shouldContinue(v)) //checks if the loop can be terminated
            .onErrorStop();

现在,我希望有递增的延迟.这意味着,在前5分钟内,每次执行之间的延迟可能为10秒.然后,在接下来的10分钟内,延迟可以是30秒.此后,每次执行之间的延迟可以是一分钟.

我如何使用Flux实现这一点?

先谢谢你.

推荐答案

You could try Flux.generate to create a custom sequence that emits items with the desired delays.
See also "Difference Between Flux.create and Flux.generate".

Flux<Object> flux = Flux.generate(
    () -> 0L,
    (state, sink) -> {
        sink.next(state);
        return state + 1;
    }
)
.delayUntil(state -> {
    long delay = 10; // default delay
    if (state < 30) { // first 5 minutes with 10 seconds delay
        delay = 10;
    } else if (state < 50) { // next 10 minutes with 30 seconds delay
        delay = 30;
    } else { // thereafter, 1 minute delay
        delay = 60;
    }
    return Mono.delay(Duration.ofSeconds(delay));
})
.flatMap(state -> service.doSomething())
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();

That would initialize the state to 0 and increments it with each iteration.
Depending on the state's value, which represents the number of elapsed periods, it applies different delays.

      ┌────────────┐         ┌─────────────────┐          ┌──────────────┐
      │   Flux     │         │ delayUntil with │          │  doSomething │
      │ generate   ├───────► │  custom delays  ├────────► │   Service    │
      └────────────┘         └─────────────────┘          └──────────────┘
            │                                                   │
            │                                                   │
            ▼                                                   ▼
      ┌────────────┐        ┌────────────────┐           ┌───────────────┐
      │   flatMap  │        │  takeUntil     │           │   onErrorStop │
      │  function  ├───────►│ shouldContinue ├─────────► │    method     │
      └────────────┘        └────────────────┘           └───────────────┘

请注意,在实际应用程序中,您可能需要考虑service.doSomething()的执行时间.如果需要很长时间,您可能需要相应地调整延迟.


如果doSomething()的执行时间大于延迟时间,会发生什么情况?这可能会并行触发doSomething()次.

当执行时间doSomething()大于延迟时,后续执行将不会等待当前执行完成,可能会导致并发执行.

To prevent this, you could use a combination of .concatMap() which makes sure doSomething() is executed sequentially, along with a stateful map to calculate the delay based on when the last execution was started or completed.
See also "What's the difference between flatMap, flatMapSequential and concatMap in Project Reactor?"

另外,如果我想以0秒的延迟开始,然后慢慢增加延迟,我如何实现这一点?

然后你可以实现你的功能,延迟为0秒,然后慢慢增加延迟:

AtomicLong lastExecutionStart = new AtomicLong(System.currentTimeMillis());

Flux<Object> flux = Flux.generate(
    () -> 0L,
    (state, sink) -> {
        sink.next(state);
        return state + 1;
    }
)
.concatMap(state -> {
    long delay;
    long currentTime = System.currentTimeMillis();
    long elapsedTime = currentTime - lastExecutionStart.get();
    long executionTime = 0; // replace with actual execution time if known

    // Calculate the delay based on elapsed time since the start of the last execution
    if (elapsedTime <= 5 * 60 * 1000) { // first 5 minutes
        delay = Math.max(0, (10 - executionTime) * 1000); // 10 seconds delay, adjust for execution time
    } else if (elapsedTime <= 15 * 60 * 1000) { // next 10 minutes
        delay = Math.max(0, (30 - executionTime) * 1000); // 30 seconds delay, adjust for execution time
    } else { // thereafter
        delay = Math.max(0, (60 - executionTime) * 1000); // 60 seconds delay, adjust for execution time
    }

    return Mono.delay(Duration.ofMillis(delay))
        .then(Mono.fromCallable(() -> {
            lastExecutionStart.set(System.currentTimeMillis());
            return "doSomething";
        }))
        .flatMap(o -> service.doSomething());
})
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();

concatMap可确保按顺序进行doSomething()个呼叫.lastExecutionStart变量跟踪最后一次执行的开始时间.该变量用于计算下一次执行的延迟.

The Mono.delay() is used with then() to wait for the calculated delay before starting the next execution.
After the delay, doSomething() is called and the lastExecutionStart timestamp is updated.

这种模式确保:

  • doSomething()不会并发执行.
  • 在下一个doSomething()次执行之前的延迟考虑了实际执行时间.
  • 初始延迟可以是0,然后根据您提供的逻辑递增.
        ┌──────────────────────────────────────────────┐
        │                    Flux                      │
        └──────────────────────────────────────────────┘
                          │
            ┌─────────────┴─────────────┐
            │     Generate (state)      │
            └─────────────┬─────────────┘
                          │
            ┌─────────────▼─────────────┐
            │   Calculate Next Delay    │ <────┐
            └─────────────┬─────────────┘      │
                          │                    │
         ┌────────────────▼─────────────────┐  │
         │            Mono.delay            │  │
         └────────────────┬─────────────────┘  │
                          │                    │
         ┌────────────────▼──────────────────┐ │
         │  Set lastExecutionStart Timestamp │ │
         └────────────────┬──────────────────┘ │
                          │                    │
            ┌─────────────▼─────────────┐      │
            │       service.doSomething │      │
            └─────────────┬─────────────┘      │
                          │                    │
            ┌─────────────▼─────────────┐      │
            │            map            │      │
            └─────────────┬─────────────┘      │
                          │                    │
            ┌─────────────▼─────────────┐      │
            │         takeUntil         │      │
            └─────────────┬─────────────┘      │
                          │                    │
            ┌─────────────▼─────────────┐      │
            │         onErrorStop       │      │
            └─────────────┬─────────────┘      │
                          │                    │
                          └────────────────────┘

I noticed that if dataProvider.fetchData() returns a Flux, I was getting an error Cannot infer type argument(s) for <V> concatMap(Function<? super T,? extends Publisher<? extends V>>).
But it works fine if Mono was returned. So I had to use a flatMapMany instead of flatMap.

但是,如果service.doSomething()可以返回FluxMono,有什么方法可以处理吗?

我想把它写成一个通用的服务.因此,我想看看这是否可以处理类型Flux<T> Mono<T>甚至类型T的数据.

要处理可以返回Flux<T>Mono<T>T的服务方法,您需要调整响应式管道中的方法调用,以满足这些不同的响应式类型.一种方法是实际上使用flatMapMany来处理MonoFlux,因为Mono可以转换为Flux,但反之亦然.

AtomicLong lastExecutionStart = new AtomicLong(System.currentTimeMillis());

Flux<Object> flux = Flux.generate(
    () -> 0L,
    (state, sink) -> {
        sink.next(state);
        return state + 1;
    }
)
.concatMap(state -> {
    long delay;
    long currentTime = System.currentTimeMillis();
    long elapsedTime = currentTime - lastExecutionStart.get();
    long executionTime = 0; // replace with actual execution time if known

    // Calculate the delay based on elapsed time since the start of the last execution
    if (elapsedTime <= 5 * 60 * 1000) { // first 5 minutes
        delay = Math.max(0, (10 - executionTime) * 1000); // 10 seconds delay, adjust for execution time
    } else if (elapsedTime <= 15 * 60 * 1000) { // next 10 minutes
        delay = Math.max(0, (30 - executionTime) * 1000); // 30 seconds delay, adjust for execution time
    } else { // thereafter
        delay = Math.max(0, (60 - executionTime) * 1000); // 60 seconds delay, adjust for execution time
    }

    return Mono.delay(Duration.ofMillis(delay))
        .then(Mono.fromCallable(() -> {
            lastExecutionStart.set(System.currentTimeMillis());
            return "doSomething";
        }))
        // Use flatMapMany to handle both Mono and Flux from doSomething()
        .flatMapMany(o -> Flux.from(service.doSomething()));
})
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();

通过使用flatMapMany,您可以确保服务调用可以返回FluxMono,而Mono将被转换为Flux.对于处理同步类型T,通常使用Mono.just()Flux.just()对其进行包装,具体取决于您想要的是单个结果还是结果流.

要真正使您的服务泛型,必须确保doSomething()方法签名被适当地设计为返回Publisher<T>,MonoFlux都实现了这一点.这将允许您在服务方法本身中抽象出这些类型之间的差异.

为了处理同步值(T),您需要在服务方法中创建一个react 包装器,如下所示:

public Publisher<T> doSomething() {
    T result = ...; // Your synchronous logic here
    return Mono.just(result); // or Flux.just(result) if it makes sense to return a stream
}

这样,您就可以在您的react 管道中使用相同的flatMapMany方法来处理来自doSomething()的所有类型的返回值.

Java相关问答推荐

在Spring Boot中测试时出现SQL语法错误

将偶数元素移动到数组的前面,同时保持相对顺序

Java:根据4象限中添加的行数均匀分布行的公式

对运行在GraalVM-21上的JavaFX应用程序使用分代ZGC会警告不支持JVMCI,为什么?

解释左移在Java中的工作原理

Com.google.firebase.database.DatabaseException:无法将类型为java.lang.Boolean的值转换为字符串.这是关于什么的?

与不同顺序的组进行匹配,不重复组但分开

使用存储在字符串变量中的路径目录打开.pdf文件

如何集成语义发布和BitBucket(Java项目)

Java17支持哪个MapR版本?

扩展视图高度,并将其拖动到较低的视图上,而不是将其向下推?

JNI:将代码打包成自包含的二进制文件

我可以在@Cacheable中使用枚举吗

在使用具有不同成本的谓词调用allMatch之前对Java流进行排序会带来什么好处吗?

有没有办法知道在合并中执行了什么操作?

RestTemplate Bean提供OkHttp3ClientHttpRequestFactory不支持Spring Boot 3中的请求正文缓冲

为什么项目名称出现在我的GET请求中?

JavaFX:为什么我的ComboBox添加了一个不必要的单元格的一部分?

如何使用外部函数从Java中获取C++ struct 的返回值&;内存API

元音变音字符:如何在 Java 中将Á<0x9c>转换为Ü?