我正在try 使用System.Threading.Tasks.Dataflow
实现一个classic 的map-reduce问题,虽然我可以做一些(某种程度上)工作,但我很难看到如何推广此功能.
给出一个简单的问题
- 产生一个整数流;每个数字都是并行的
- 取所有数字的和
我遇到的问题是,我可以使用BufferBlock
来实现这一点,但我必须指定并行任务集的初始大小.这对于下面的测试代码来说很好,因为我预先知道我要排队的项目有多少,但说我不知道...我将如何设置此管道?
使用的测试代码(注意,我在第一个"并行"块中添加了一个短延迟,只是为了根据并行度查看一些处理时间差):
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
var input = 10;
var fanOutBlock = new TransformManyBlock<int, int>(x =>
{
return Enumerable.Range(1, x).Select(x => x);
});
var squareBlock = new TransformBlock<int, int>(async x =>
{
await Task.Delay(100);
return x * x;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var addFiveBlock = new TransformBlock<int, int>(x =>
{
return x + 5;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var divTwoBlock = new TransformBlock<int, double>(x =>
{
return x/2.0;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var batchBlock = new BatchBlock<double>(input);
var sumBlock = new TransformBlock<IList<double>,double>(x =>
{
return x.Sum();
});
var options = new DataflowLinkOptions { PropagateCompletion = true };
fanOutBlock.LinkTo(squareBlock, options);
squareBlock.LinkTo(addFiveBlock, options);
addFiveBlock.LinkTo(divTwoBlock, options);
divTwoBlock.LinkTo(batchBlock, options);
batchBlock.LinkTo(sumBlock, options);
var sw = Stopwatch.StartNew();
fanOutBlock.Post(input);
fanOutBlock.Complete();
var result = sumBlock.Receive();
Console.WriteLine(result);
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds}ms");
await sumBlock.Completion;