我想用RxJava实现以下功能:

  1. 缓冲元素,并在最后一个元素后5秒后发布它们
  2. 在第一个元素之后的20秒内发布缓冲元素

元素示例:

[A->2秒->B->3秒->C->6秒->D->4秒->E->9秒->F->1秒->G->15秒->H]

结果应该是:

[A、B、C]

[D,E,F]

[G,H]

目前,我可以在第一个元素生成后延迟20秒后发布元素,我应该更新什么来实现第一部分?

fun <T> Observable<T>.buffered(): Observable<List<T>> = publish { shared ->

    val startEvent = shared.throttleFirst(20, TimeUnit.SECONDS, scheduler)

    shared.buffer(startEvent.mergeWith(startEvent.delay(20, TimeUnit.SECONDS, scheduler, false)))

}

推荐答案

第1条,你需要debounce条.第二条规则,事情变得复杂了.

在该序列中,当前的第一个事件应该触发一个20秒的计时器,在此之后,一个信号会发出以启动一个新的缓冲区.然而,如果与规则1有5秒的间隙,则必须取消该20秒计时器.

下面是一个打印时间和缓冲区的示例来说明解决方案:

import java.util.concurrent.*;
import io.reactivex.rxjava3.core.*;
import java.util.concurrent.atomic.AtomicBoolean;

void main(String[] args) {
    
    // The signal pattern
    var source = Observable.fromArray(1, 2, 3)
    .concatWith(Observable.range(10, 25))
    .concatWith(Observable.range(45, 4))
    .flatMap(v -> Observable.just(v).delay(v, TimeUnit.SECONDS))
    .doOnNext(v -> System.out.println("Tick - " + v));
    
    // buffering action
    source.publish(shared -> {
        var db = shared.debounce(5, TimeUnit.SECONDS)
                .doOnNext(v -> System.out.println("Debounce 5 seconds - " + v))
                .publish()
                .autoConnect();

        var stop = new AtomicBoolean(true);

        var wnd = shared
                .take(1)
                .doOnNext(v -> stop.set(false));
                .delay(20, TimeUnit.SECONDS)
                .takeUntil(db.doOnNext(w -> stop.set(false))
                .repeatUntil(() -> stop.getAndSet(true))
                .doOnNext(v -> System.out.println("Window 20 seconds - " + v));
        
        return shared.buffer(db.mergeWith(wnd));
    })
    .blockingSubscribe(System.out::println);
}

我们有两个信号,db代表5秒的反跳,wnd代表20秒的窗口.

在go 弹跳部分,我们go 弹跳5秒钟.因为我们将需要窗口部件本身的go 反跳信号,所以我们必须发布+AutoConnect该部件以避免来自两个订阅的双重go 反跳信号.

在窗口部分,我们从共享源中获取一项,并将其延迟20秒.然而,如果debounce 部分首先发出信号,我们必须取消该序列,这样它就不会产生不需要的窗口.接下来,重复将确保如果20秒过go 或延迟被取消,我们将开始备份,并等待下一个源项目重复该过程.

现在有stop的最后一部分来自于盲目重复窗口将导致永无止境的序列的问题.如果源文件完成,重复将在一个紧密的循环中重新订阅-完成-重新订阅.为了避免这个问题,我们需要告诉repeat何时停止try 重新订阅.

为此,我们使用一面stop的旗帜.如果源产生了一个项目,因此启动了计时器,我们将停止标志设置为FALSE.否则,我们一开始就没有收到物品,需要将此信息传达给Repeat-repeatUntil.如果函数返回false,则repeatUntil将重复,即,如果没有项,则返回默认TRUE值.然后将其重置为下一轮.如果db先发信号,我们想继续下一轮.

Java相关问答推荐

在Spring Boot中测试时出现SQL语法错误

Java字符串常数池困惑

将具有多个未知字段的SON映射到Java POJO

如何转换Tue Feb 27 2024 16:35:30 GMT +0800 String至ZonedDateTime类型""

Java List with all combinations of 8 booleans

SpringBootreact 式Web应用程序的Spring Cloud Configer服务器中的资源控制器损坏

调用引发泛型异常的泛型方法时出现编译错误

Spring Boot 3.2.2中的@Inject和@Resource Remove

为什么Java Annotation接口覆盖对象类中的方法

使用GridBagLayout正确渲染

测试期间未执行开放重写方法

无法在Java中处理PayPal支付响应

Java泛型类方法的静态返回类型是否被类型擦除?

带有可选部分的Java DateTimeForMatter

如何使用Java对随机生成的字母数字优惠券代码进行过期设置

无限递归Java问题

Intellij 2023 IDE:始终在顶部显示菜单栏

简化每个元素本身都是 map 列表的列表

在JSON上获取反斜杠

声纳覆盖范围为 0%,未生成 jacoco.xml