我正在try 使用System.Threading.Tasks.Dataflow实现一个classic 的map-reduce问题,虽然我可以做一些(某种程度上)工作,但我很难看到如何推广此功能.

给出一个简单的问题

  • 产生一个整数流;每个数字都是并行的
  • 取所有数字的和

我遇到的问题是,我可以使用BufferBlock来实现这一点,但我必须指定并行任务集的初始大小.这对于下面的测试代码来说很好,因为我预先知道我要排队的项目有多少,但说我不知道...我将如何设置此管道?

使用的测试代码(注意,我在第一个"并行"块中添加了一个短延迟,只是为了根据并行度查看一些处理时间差):

using System.Diagnostics;
using System.Threading.Tasks.Dataflow;

var input = 10;

var fanOutBlock = new TransformManyBlock<int, int>(x =>
{
    return Enumerable.Range(1, x).Select(x => x);
});

var squareBlock = new TransformBlock<int, int>(async x =>
 {
     await Task.Delay(100);
     return x * x;
 }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var addFiveBlock = new TransformBlock<int, int>(x =>
{
    return x + 5;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var divTwoBlock = new TransformBlock<int, double>(x =>
{
    return x/2.0;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var batchBlock = new BatchBlock<double>(input);

var sumBlock = new TransformBlock<IList<double>,double>(x =>
{
    return x.Sum();
});

var options = new DataflowLinkOptions { PropagateCompletion = true };

fanOutBlock.LinkTo(squareBlock, options);
squareBlock.LinkTo(addFiveBlock, options);
addFiveBlock.LinkTo(divTwoBlock, options);
divTwoBlock.LinkTo(batchBlock, options);
batchBlock.LinkTo(sumBlock, options);


var sw = Stopwatch.StartNew();
fanOutBlock.Post(input);
fanOutBlock.Complete();


var result = sumBlock.Receive();
Console.WriteLine(result);
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds}ms");

await sumBlock.Completion;

推荐答案

一个 idea 是将BatchBlock<T>配置为最大batchSize:

var batchBlock = new BatchBlock<double>(Int32.MaxValue);

batchBlock完成时(当调用其Complete方法时),它将发出一个包含所有消息的批处理.缺点是,通过缓冲每条消息,在消息数量巨大的情况下,可能会耗尽内存.或者,如果消息的数量大于Int32.MaxValue,并且奇迹般地没有耗尽内存,那么您将得到多个批,这对于您try 实现的逻辑来说将是一个bug.

一个更好的 idea 是实现一个自定义数据流块,它可以聚合动态接收的消息.类似于Aggregate LINQ操作符:

public static TResult Aggregate<TSource, TAccumulate, TResult>(
    this IEnumerable<TSource> source,
    TAccumulate seed,
    Func<TAccumulate, TSource, TAccumulate> function,
    Func<TAccumulate, TResult> resultSelector);

下面是一个实现,由两个本机块组成,用DataflowBlock.Encapsulate方法封装:

public static IPropagatorBlock<TSource, TResult>
    CreateAggregateBlock<TSource, TAccumulate, TResult>(
    TAccumulate seed,
    Func<TAccumulate, TSource, TAccumulate> function,
    Func<TAccumulate, TResult> resultSelector,
    ExecutionDataflowBlockOptions options = default)
{
    options ??= new ExecutionDataflowBlockOptions();
    var maxDOP = options.MaxDegreeOfParallelism;
    options.MaxDegreeOfParallelism = 1;

    var inputBlock = new ActionBlock<TSource>(item =>
    {
        seed = function(seed, item);
    }, options);

    var outputBlock = new TransformBlock<TAccumulate, TResult>(accumulate =>
    {
        return resultSelector(accumulate);
    }, options);

    options.MaxDegreeOfParallelism = maxDOP; // Restore initial value

    PropagateCompletion(inputBlock, outputBlock, () =>
    {
        outputBlock.Post(seed);
    });

    return DataflowBlock.Encapsulate(inputBlock, outputBlock);

    static void PropagateCompletion(IDataflowBlock source, IDataflowBlock target,
        Action onSuccessfulCompletion)
    {
        ThreadPool.QueueUserWorkItem(async _ =>
        {
            try { await source.Completion; } catch { }
            Exception exception =
                source.Completion.IsFaulted ? source.Completion.Exception : null;
            if (source.Completion.IsCompletedSuccessfully)
            {
                // The action is invoked before completing the target.
                try { onSuccessfulCompletion(); }
                catch (Exception ex) { exception = ex; }
            }
            if (exception != null) target.Fault(exception); else target.Complete();
        });
    }
}

一个棘手的部分是如何将一个块的完成传播到另一个块.我的preferred technique是在线程池上调用async void方法.这样,我的代码中的任何bug都将作为崩溃的未处理异常公开.另一种方法是将代码放入火中,然后忘记任务继续,在这种情况下,bug的影响很可能是silent死锁.

另一个问题是seed状态的Mutations 是否对计算中涉及的所有线程都可见.我避免设置明确的障碍或lock,我依赖于TPL在任务排队时以及在任务执行的开始/结束时包含的implicit barriers.

用法示例:

var sumBlock = CreateAggregateBlock<double, double, double>(0.0,
    (acc, x) => acc + x, acc => acc);

Csharp相关问答推荐

C#DateTime.ToString在ubuntu和centos中返回不同的结果

静态对象构造顺序

为什么EventInfo.EventHandlerType返回可为空的Type值?

由于POST中的应用程序/JWT,出现不支持的内容类型异常

在使用UserManager时,如何包含与其他实体的关系?

为什么我的伺服电机不动,下面的代码?

我的命名管道在第一次连接后工作正常,但后来我得到了System.ObjectDisposedException:无法访问关闭的管道

如何在使用属性 Select 器时判断是否可以为空

类/值和日期的泛型方法

当`JToken?`为空时?

如何读取TagHelper属性的文本值?

Xamarin中出错.表单:应用程序的分部声明不能指定不同的基类

如何对列表<;列表>;使用集合表达式?

如何在C#中抽象Vector256;T<;的逻辑以支持不同的硬件配置?

自定义ConsoleForMatter中的DI/Http上下文

C#.NET中的Parall.ForAsync?

为什么.NET核心数学方法重复?

RDLC比例图像对齐

C#IEEE754取整

如果线程在Windows中必须是STA,那么它在Linux或MacOS中工作吗?