在我的代码中,我有一些事件需要按顺序处理,其他事件需要并行处理.

我成功地完成了并行部分,但无法完成顺序版本.

event.ofType(Message.class)
    .groupBy(Message::getGroup)
    .flatMap(g -> {
        if (g.getKey() == Message.GROUP_PARALLEL)
            return g.flatMap(m -> Observable.just(m)
                        .observeOn(Schedulers.computation())
                        .doOnNext(this::send)
                        .delay(m.getRetryInterval(), TimeUnit.SECONDS)
                        .repeat(m.getRetryCount())
                        .takeUntil(event.ofType(Sent.class)
                            .filter(a -> a.getId() == m.getId())
                        )
                        .map(s -> m)
                    );
        else if (g.getKey() == Message.GROUP_SEQUENTIAL) {
            // ?
        }
    })

我想这样实施:

When an event is received
    If the previous event is still being processed, wait before emission
    Process the event (blocking)

所以:

    event.subscribeOn(Schedulers.computation())
        .waitUntilPreviousComplete()
        .flatMap(this::processEvent) // we assume it take 1 seconds to complete
        .susbcribe(this::processed, this::onError);

    event.onNext(Event.A)
    event.onNext(Event.B)
    event.onNext(Event.C)

结果应该是:

    12h00:00 processed call with event A
    12h00:01 processed call with event B
    12h00:02 processed call with event C

所以我想要看起来像队列的东西.我试过用HashMap和主题做一些奇怪的事情,但我相信有一个干净的方法可以做到这一点.我该如何实现这一点?

推荐答案

听起来你在找concatMap个.该操作员将按顺序订阅upstream 发出的项目,并且在前一个on完成之前不会继续下一个项目.您的示例代码如下所示:

event.subscribeOn(Schedulers.computation())
    .concatMap(this::processEvent)
    .subscribe(this::processed, this::onError);

Android相关问答推荐

编写Landscape BoxWithRequests具有错误的高度.(aspectRatio matchHeight约束第一次未按预期工作)

图像在Android Studio中显示,但不在设备中显示

关闭导致Kotlin中的内存泄漏?

找不到类MultipartBody;的序列化程序

为什么它显示我的空白屏幕?

使用Jetpack Compose创建特定于电视的布局

在 Compose 中,当用户持续向下滚动时,LazyColumn 不会显示新项目

如何迭代 SqlDelight Select 结果而不将所有内容加载到内存中?

可从 Play store 下载链接访问未发布的应用

运行设备选项卡在 Android Studio 中自动打开

列表更改时,RecyclerView 中的列表不会更新

Jetpack Compose:如何绘制异形边框?

Jetpack compose :使用 rememberSaveable 时未应用待处理的合成

在 Jetpack Compose 的无状态 Compose 中管理条件逻辑

在事件中使用 Context/Toast 时不需要的重组 - Jetpack Compose

PayUCheckoutPro Android SDK 实现问题

观察软键盘可见性,打开/关闭 Jetpack Compose

清洁架构中的服务

在 jetpack compose 中使用 .shadow 和 Button 会导致问题

Google Play 服务登录在 Unity Android 上无法正常运行