You could try Flux.generate
to create a custom sequence that emits items with the desired delays.
See also "Difference Between Flux.create
and Flux.generate
".
Flux<Object> flux = Flux.generate(
() -> 0L,
(state, sink) -> {
sink.next(state);
return state + 1;
}
)
.delayUntil(state -> {
long delay = 10; // default delay
if (state < 30) { // first 5 minutes with 10 seconds delay
delay = 10;
} else if (state < 50) { // next 10 minutes with 30 seconds delay
delay = 30;
} else { // thereafter, 1 minute delay
delay = 60;
}
return Mono.delay(Duration.ofSeconds(delay));
})
.flatMap(state -> service.doSomething())
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();
That would initialize the state to 0
and increments it with each iteration.
Depending on the state's value, which represents the number of elapsed periods, it applies different delays.
┌────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Flux │ │ delayUntil with │ │ doSomething │
│ generate ├───────► │ custom delays ├────────► │ Service │
└────────────┘ └─────────────────┘ └──────────────┘
│ │
│ │
▼ ▼
┌────────────┐ ┌────────────────┐ ┌───────────────┐
│ flatMap │ │ takeUntil │ │ onErrorStop │
│ function ├───────►│ shouldContinue ├─────────► │ method │
└────────────┘ └────────────────┘ └───────────────┘
请注意,在实际应用程序中,您可能需要考虑service.doSomething()
的执行时间.如果需要很长时间,您可能需要相应地调整延迟.
如果doSomething()
的执行时间大于延迟时间,会发生什么情况?这可能会并行触发doSomething()
次.
当执行时间doSomething()
大于延迟时,后续执行将不会等待当前执行完成,可能会导致并发执行.
To prevent this, you could use a combination of .concatMap()
which makes sure doSomething()
is executed sequentially, along with a stateful map to calculate the delay based on when the last execution was started or completed.
See also "What's the difference between flatMap
, flatMapSequential
and concatMap
in Project Reactor?"
另外,如果我想以0秒的延迟开始,然后慢慢增加延迟,我如何实现这一点?
然后你可以实现你的功能,延迟为0秒,然后慢慢增加延迟:
AtomicLong lastExecutionStart = new AtomicLong(System.currentTimeMillis());
Flux<Object> flux = Flux.generate(
() -> 0L,
(state, sink) -> {
sink.next(state);
return state + 1;
}
)
.concatMap(state -> {
long delay;
long currentTime = System.currentTimeMillis();
long elapsedTime = currentTime - lastExecutionStart.get();
long executionTime = 0; // replace with actual execution time if known
// Calculate the delay based on elapsed time since the start of the last execution
if (elapsedTime <= 5 * 60 * 1000) { // first 5 minutes
delay = Math.max(0, (10 - executionTime) * 1000); // 10 seconds delay, adjust for execution time
} else if (elapsedTime <= 15 * 60 * 1000) { // next 10 minutes
delay = Math.max(0, (30 - executionTime) * 1000); // 30 seconds delay, adjust for execution time
} else { // thereafter
delay = Math.max(0, (60 - executionTime) * 1000); // 60 seconds delay, adjust for execution time
}
return Mono.delay(Duration.ofMillis(delay))
.then(Mono.fromCallable(() -> {
lastExecutionStart.set(System.currentTimeMillis());
return "doSomething";
}))
.flatMap(o -> service.doSomething());
})
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();
concatMap
可确保按顺序进行doSomething()
个呼叫.lastExecutionStart
变量跟踪最后一次执行的开始时间.该变量用于计算下一次执行的延迟.
The Mono.delay()
is used with then()
to wait for the calculated delay before starting the next execution.
After the delay, doSomething()
is called and the lastExecutionStart
timestamp is updated.
这种模式确保:
doSomething()
不会并发执行.
- 在下一个
doSomething()
次执行之前的延迟考虑了实际执行时间.
- 初始延迟可以是0,然后根据您提供的逻辑递增.
┌──────────────────────────────────────────────┐
│ Flux │
└──────────────────────────────────────────────┘
│
┌─────────────┴─────────────┐
│ Generate (state) │
└─────────────┬─────────────┘
│
┌─────────────▼─────────────┐
│ Calculate Next Delay │ <────┐
└─────────────┬─────────────┘ │
│ │
┌────────────────▼─────────────────┐ │
│ Mono.delay │ │
└────────────────┬─────────────────┘ │
│ │
┌────────────────▼──────────────────┐ │
│ Set lastExecutionStart Timestamp │ │
└────────────────┬──────────────────┘ │
│ │
┌─────────────▼─────────────┐ │
│ service.doSomething │ │
└─────────────┬─────────────┘ │
│ │
┌─────────────▼─────────────┐ │
│ map │ │
└─────────────┬─────────────┘ │
│ │
┌─────────────▼─────────────┐ │
│ takeUntil │ │
└─────────────┬─────────────┘ │
│ │
┌─────────────▼─────────────┐ │
│ onErrorStop │ │
└─────────────┬─────────────┘ │
│ │
└────────────────────┘
I noticed that if dataProvider.fetchData()
returns a Flux
, I was getting an error Cannot infer type argument(s) for <V> concatMap(Function<? super T,? extends Publisher<? extends V>>)
.
But it works fine if Mono
was returned. So I had to use a flatMapMany
instead of flatMap
.
但是,如果service.doSomething()
可以返回Flux
或Mono
,有什么方法可以处理吗?
我想把它写成一个通用的服务.因此,我想看看这是否可以处理类型Flux<T>
、Mono<T>
甚至类型T
的数据.
要处理可以返回Flux<T>
、Mono<T>
或T
的服务方法,您需要调整响应式管道中的方法调用,以满足这些不同的响应式类型.一种方法是实际上使用flatMapMany
来处理Mono
和Flux
,因为Mono
可以转换为Flux
,但反之亦然.
AtomicLong lastExecutionStart = new AtomicLong(System.currentTimeMillis());
Flux<Object> flux = Flux.generate(
() -> 0L,
(state, sink) -> {
sink.next(state);
return state + 1;
}
)
.concatMap(state -> {
long delay;
long currentTime = System.currentTimeMillis();
long elapsedTime = currentTime - lastExecutionStart.get();
long executionTime = 0; // replace with actual execution time if known
// Calculate the delay based on elapsed time since the start of the last execution
if (elapsedTime <= 5 * 60 * 1000) { // first 5 minutes
delay = Math.max(0, (10 - executionTime) * 1000); // 10 seconds delay, adjust for execution time
} else if (elapsedTime <= 15 * 60 * 1000) { // next 10 minutes
delay = Math.max(0, (30 - executionTime) * 1000); // 30 seconds delay, adjust for execution time
} else { // thereafter
delay = Math.max(0, (60 - executionTime) * 1000); // 60 seconds delay, adjust for execution time
}
return Mono.delay(Duration.ofMillis(delay))
.then(Mono.fromCallable(() -> {
lastExecutionStart.set(System.currentTimeMillis());
return "doSomething";
}))
// Use flatMapMany to handle both Mono and Flux from doSomething()
.flatMapMany(o -> Flux.from(service.doSomething()));
})
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();
通过使用flatMapMany
,您可以确保服务调用可以返回Flux
或Mono
,而Mono
将被转换为Flux
.对于处理同步类型T
,通常使用Mono.just()
或Flux.just()
对其进行包装,具体取决于您想要的是单个结果还是结果流.
要真正使您的服务泛型,必须确保doSomething()
方法签名被适当地设计为返回Publisher<T>
,Mono
和Flux
都实现了这一点.这将允许您在服务方法本身中抽象出这些类型之间的差异.
为了处理同步值(T
),您需要在服务方法中创建一个react 包装器,如下所示:
public Publisher<T> doSomething() {
T result = ...; // Your synchronous logic here
return Mono.just(result); // or Flux.just(result) if it makes sense to return a stream
}
这样,您就可以在您的react 管道中使用相同的flatMapMany
方法来处理来自doSomething()
的所有类型的返回值.