我有一个TPL数据流管道,其中一个目标块链接到两个传播块,然后两个传播块都链接到一个源块.所有都与PropagateCompletion = true
相连.第一个传播块与一个只接受偶数的过滤器链接,其中第二个接收所有剩余的消息.
在发布最后一条消息后,我将第一个块设置为"已完成".不过,似乎有一个比赛条件.最后一个块有时似乎处理所有值,但有时仅处理第一个传播块接受的值,并且仅处理第二个传播块接受的部分值.
我觉得这是一种比赛条件.但我不知道如何正确地指示最终的源块,只有在链接到它的两个传播块转发了它们的所有消息之后,一切才完成.
下面是我的代码,简化为一个简单的示例:
internal static class Program
{
public static async Task Main(string[] args)
{
var linkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var bufferBlock = new BufferBlock<int>();
var fork1 = new TransformBlock<int, int>(n => n);
var fork2 = new TransformBlock<int, int>(n =>
{
Thread.Sleep(100);
return n;
});
var printBlock = new ActionBlock<int>(Console.WriteLine);
bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
fork1.LinkTo(printBlock, linkOptions);
fork2.LinkTo(printBlock, linkOptions);
for (var n = 1; n <= 10; ++n)
{
bufferBlock.Post(n);
}
bufferBlock.Complete();
await printBlock.Completion;
}
}
这将输出:
2
4
6
8
10
我希望它能输出:
2
4
6
8
10
1
3
5
7
9