第1条,你需要debounce
条.第二条规则,事情变得复杂了.
在该序列中,当前的第一个事件应该触发一个20秒的计时器,在此之后,一个信号会发出以启动一个新的缓冲区.然而,如果与规则1有5秒的间隙,则必须取消该20秒计时器.
下面是一个打印时间和缓冲区的示例来说明解决方案:
import java.util.concurrent.*;
import io.reactivex.rxjava3.core.*;
import java.util.concurrent.atomic.AtomicBoolean;
void main(String[] args) {
// The signal pattern
var source = Observable.fromArray(1, 2, 3)
.concatWith(Observable.range(10, 25))
.concatWith(Observable.range(45, 4))
.flatMap(v -> Observable.just(v).delay(v, TimeUnit.SECONDS))
.doOnNext(v -> System.out.println("Tick - " + v));
// buffering action
source.publish(shared -> {
var db = shared.debounce(5, TimeUnit.SECONDS)
.doOnNext(v -> System.out.println("Debounce 5 seconds - " + v))
.publish()
.autoConnect();
var stop = new AtomicBoolean(true);
var wnd = shared
.take(1)
.doOnNext(v -> stop.set(false));
.delay(20, TimeUnit.SECONDS)
.takeUntil(db.doOnNext(w -> stop.set(false))
.repeatUntil(() -> stop.getAndSet(true))
.doOnNext(v -> System.out.println("Window 20 seconds - " + v));
return shared.buffer(db.mergeWith(wnd));
})
.blockingSubscribe(System.out::println);
}
我们有两个信号,db
代表5秒的反跳,wnd
代表20秒的窗口.
在go 弹跳部分,我们go 弹跳5秒钟.因为我们将需要窗口部件本身的go 反跳信号,所以我们必须发布+AutoConnect该部件以避免来自两个订阅的双重go 反跳信号.
在窗口部分,我们从共享源中获取一项,并将其延迟20秒.然而,如果debounce 部分首先发出信号,我们必须取消该序列,这样它就不会产生不需要的窗口.接下来,重复将确保如果20秒过go 或延迟被取消,我们将开始备份,并等待下一个源项目重复该过程.
现在有stop
的最后一部分来自于盲目重复窗口将导致永无止境的序列的问题.如果源文件完成,重复将在一个紧密的循环中重新订阅-完成-重新订阅.为了避免这个问题,我们需要告诉repeat
何时停止try 重新订阅.
为此,我们使用一面stop
的旗帜.如果源产生了一个项目,因此启动了计时器,我们将停止标志设置为FALSE.否则,我们一开始就没有收到物品,需要将此信息传达给Repeat-repeatUntil
.如果函数返回false
,则repeatUntil
将重复,即,如果没有项,则返回默认TRUE值.然后将其重置为下一轮.如果db先发信号,我们想继续下一轮.