您可以这样做:
static IList<(string Key, decimal Sum, int Count)> GroupStats(
IEnumerable<(string Key, decimal Value)> source)
{
return source
.ToObservable(Scheduler.Immediate)
.GroupBy(x => x.Key)
.Select(g => (
Key: g.Key,
Sum: g.Sum(x => x.Value).PublishLast().AutoConnect(0),
Count: g.Count().PublishLast().AutoConnect(0)
))
.ToList()
.Wait()
.Select(e => (e.Key, e.Sum.Wait(), e.Count.Wait()))
.ToArray();
}
使用ToObservable
可以将IEnumerable<T>
¹震源转换为IObservable<T>
震源.这有点慢,因为默认情况下,Scheduler.CurrentThread
上的订阅是scheduled,所以会传递Scheduler.Immediate
.您可以进一步了解ToObservable
运算符here的性能.
GroupBy
将IObservable<T>
转换为IObservable<IGroupedObservable<string, T>>
.
Select
将每个IGroupedObservable<string, T>
转换为(string, IObservable<decimal>, IObservable<int>)
.PublishLast
用于记住Sum
和Count
运算符发出的最后(也是唯一)值.AutoConnect(0)
在emits 这些子序列时立即订阅这些子序列.
ToList
将IObservable<T>
转换为IObservable<IList<T>>
.完成后,外部可观察对象将发出一个列表.
Wait
同步等待外部可观察对象完成,并发出单个列表.这就是所有工作发生的地方.在此之前,尚未枚举source
序列.Wait
订阅到目前为止已经构建的可观察对象,这将触发对底层可观察对象的订阅,并最终触发source
的枚举.订阅期间,在当前线程上同步执行所有计算.所以动词"wait"不能准确描述这里发生的事情.
接下来的Select
通过等待子序列将每个(string, IObservable<decimal>, IObservable<int>)
转换为(string, decimal, int)
.此时这些子序列已经完成,它们的单个输出存储在PublishLast
中.因此,这些内部Wait
调用不会触发任何严肃的工作.上一步的所有繁重工作都已完成.
最后,ToArray
将IEnumerable<(string, decimal, int)>
转换为(string, decimal, int)
的数组,这是GroupStats
方法的输出.
¹我使用T
作为复杂ValueTuple
的占位符,这样解释就不会过于冗长