在我的代码中,我有一些事件需要按顺序处理,其他事件需要并行处理.
我成功地完成了并行部分,但无法完成顺序版本.
event.ofType(Message.class)
.groupBy(Message::getGroup)
.flatMap(g -> {
if (g.getKey() == Message.GROUP_PARALLEL)
return g.flatMap(m -> Observable.just(m)
.observeOn(Schedulers.computation())
.doOnNext(this::send)
.delay(m.getRetryInterval(), TimeUnit.SECONDS)
.repeat(m.getRetryCount())
.takeUntil(event.ofType(Sent.class)
.filter(a -> a.getId() == m.getId())
)
.map(s -> m)
);
else if (g.getKey() == Message.GROUP_SEQUENTIAL) {
// ?
}
})
我想这样实施:
When an event is received
If the previous event is still being processed, wait before emission
Process the event (blocking)
所以:
event.subscribeOn(Schedulers.computation())
.waitUntilPreviousComplete()
.flatMap(this::processEvent) // we assume it take 1 seconds to complete
.susbcribe(this::processed, this::onError);
event.onNext(Event.A)
event.onNext(Event.B)
event.onNext(Event.C)
结果应该是:
12h00:00 processed call with event A
12h00:01 processed call with event B
12h00:02 processed call with event C
所以我想要看起来像队列的东西.我试过用HashMap和主题做一些奇怪的事情,但我相信有一个干净的方法可以做到这一点.我该如何实现这一点?