Objective
我试图解决的问题是聚集类型(int Key, int Value)
的消息的序列(和值),直到结束观察对象发出"刷新"标记项.
例如,给定(Key,Value)
个项目的序列
(1,1) - (2,3) - (1,3) - (Flush) - (2,1) - (2,5) - (Complete)
当触发Flush
时,我预计会emits 一个带有[(1,4), (2,3)]
的array.
当序列完成时-应该emits 一个包含[(2,6)]
的array.
What I've tried
我从GroupBy
+ Aggregate
+ Buffer(flush)
开始,作为实现所需行为的直观方式.然而,该序列不会发出中间结果,而是将所有聚合作为最终输出.
这是完整代码
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
var source = new Subject<(int Key, int Value)>();
var flush = new Subject<Unit>();
var completion = StartProcessing(source, flush);
source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));
flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected
source.OnNext((2, 1));
source.OnNext((2, 5));
source.OnCompleted(); // emit of [(2,6)] is expected
await completion;
return;
static Task StartProcessing(
IObservable<(int Key, int Value)> source,
IObservable<Unit> flush)
{
return source
.GroupBy
(
keySelector: message => message.Key
)
.SelectMany
(
group => group
.Aggregate
(
seed: (group.Key, Value: 0),
accumulator: (output, item) => (output.Key, output.Value + item.Value)
)
)
.Buffer(flush)
.Select(buffer => Observable.FromAsync(() => Flush(buffer)))
.Merge()
.ToTask();
}
static async Task Flush(IEnumerable<(int Key, int Value)> data)
{
Console.WriteLine($"Flushing [{string.Join(", ", data)}]");
// processing takes time
await Task.Delay(TimeSpan.FromSeconds(1));
}
输出是
Flushing []
Flushing [(1, 4), (2, 9)]
据我所知(但不确定),除非序列完成,否则Buffer
不会发出值,因为Aggregate
不会传播中间值.
我还try 使用GroupByUntil
而不是GroupBy
和Buffer
.使用该 struct ,我在组关闭时获得中间输出,但每个组的聚合是逐个发出的,并且不清楚如何将它们Bundle 在一起,以便它们不会单独刷新.
Flushing (1, 4)
Flushing (2, 3)
Flushing (2, 6)
到目前为止,我陷入了困境并寻求一些帮助,具体来说:
- 就Rx而言,实施所需行为的正确方式是什么?
- 是否可以使用内置操作符来实现,或者我应该创建自定义操作符(例如自定义AggregateUnt)?