Objective

我试图解决的问题是聚集类型(int Key, int Value)的消息的序列(和值),直到结束观察对象发出"刷新"标记项.

例如,给定(Key,Value)个项目的序列

(1,1) - (2,3) - (1,3) - (Flush) - (2,1) - (2,5) - (Complete)

当触发Flush时,我预计会emits 一个带有[(1,4), (2,3)]的array.

当序列完成时-应该emits 一个包含[(2,6)]的array.

What I've tried

我从GroupBy + Aggregate + Buffer(flush)开始,作为实现所需行为的直观方式.然而,该序列不会发出中间结果,而是将所有聚合作为最终输出.

这是完整代码

using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

var source = new Subject<(int Key, int Value)>();
var flush = new Subject<Unit>();

var completion = StartProcessing(source, flush);

source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));

flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected

source.OnNext((2, 1));
source.OnNext((2, 5));

source.OnCompleted(); // emit of [(2,6)] is expected

await completion;

return;

static Task StartProcessing(
    IObservable<(int Key, int Value)> source,
    IObservable<Unit> flush)
{
    return source
        .GroupBy
        (
            keySelector: message => message.Key
        )
        .SelectMany
        (
            group => group
                .Aggregate
                (
                    seed: (group.Key, Value: 0),
                    accumulator: (output, item) => (output.Key, output.Value + item.Value)
                )
        )
        .Buffer(flush)
        .Select(buffer => Observable.FromAsync(() => Flush(buffer)))
        .Merge()
        .ToTask();
}

static async Task Flush(IEnumerable<(int Key, int Value)> data)
{
    Console.WriteLine($"Flushing [{string.Join(", ", data)}]");

    // processing takes time
    await Task.Delay(TimeSpan.FromSeconds(1));
}

输出是

Flushing []
Flushing [(1, 4), (2, 9)]

据我所知(但不确定),除非序列完成,否则Buffer不会发出值,因为Aggregate不会传播中间值.

我还try 使用GroupByUntil而不是GroupByBuffer.使用该 struct ,我在组关闭时获得中间输出,但每个组的聚合是逐个发出的,并且不清楚如何将它们Bundle 在一起,以便它们不会单独刷新.

Flushing (1, 4)
Flushing (2, 3)
Flushing (2, 6)

到目前为止,我陷入了困境并寻求一些帮助,具体来说:

  • 就Rx而言,实施所需行为的正确方式是什么?
  • 是否可以使用内置操作符来实现,或者我应该创建自定义操作符(例如自定义AggregateUnt)?

推荐答案

Updated:

重写了你的例子.看起来适合您的需求:

    static async Task Main()
    {
        var source = new Subject<(int Key, int Value)>();
        var flush = new Subject<Unit>();

        var completion = StartProcessing(source, flush);

        source.OnNext((1, 1));
        source.OnNext((2, 3));
        source.OnNext((1, 3));

        flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected

        source.OnNext((2, 1));
        source.OnNext((2, 5));

        source.OnCompleted(); // emit of [(2,6)] is expected

        await completion;

        return;
    }

    static Task StartProcessing(
        IObservable<(int Key, int Value)> source,
        IObservable<Unit> flush)
    {
        return source
            .Buffer(flush) 
            .Select(list => list.GroupBy(i=>i.Key).Select(group => (group.Key, group.Sum(i=>i.Value))))
            .Select(item => Observable.FromAsync(() => Flush(item)))
            .Merge()
            .ToTask();
    }


    static async Task Flush((int Key, int Value) data)
    {
        Console.WriteLine($"Flushing [{data}]");

        // processing takes time
        await Task.Delay(TimeSpan.FromSeconds(1));
    }

制作:

Flushing [(1, 4), (2, 3)]
Flushing [(2, 6)]

Csharp相关问答推荐

我可以将Expressc操作设置为在另一个Expressc操作完成后自动发生吗?

为什么在GuardationRule的收件箱函数中,decode.TryParse(valueString,out valueParsed)在给出1.0.1时返回true?

在Dapper中使用IasyncEum重写GetAsyncEum方法

在Linq中调用需要limit和offset的方法''''

FileStream. FlushAsync()是否确保文件被写入磁盘?

`Task`只有在C#中等待时才会运行吗?

如何将不同类型的扩展参数的javascript函数转换成C#风格?

使用Entity Framework6在对象中填充列表会导致列表大多为空

如何在C#中转换泛型包装类内部的派生类

有没有类似于扩展元素的合并元组的语法?

是否有必要在ASP.NET Core中注册可传递依赖项?

.NET 8在appsettings.json中核心使用词典URI、URI&>

在使用AWS SDK for.NET时,如何判断S3是否已验证我的对象的校验和?

在等待OnGetAsync时打开Razor Page显示微调器

使用未赋值的、传递的局部变量

如何在一次数据库调用中为ASP.NET核心身份用户加载角色

如何读取TagHelper属性的文本值?

使用免费的DotNet库从Azure函数向Azure文件共享上的现有Excel文件追加行

使用C#代码和SQL SERVER中的相同证书签名会产生不同的结果

实例化列表时的集合表达式是什么?