我有一个TPL数据流管道,其中一个目标块链接到两个传播块,然后两个传播块都链接到一个源块.所有都与PropagateCompletion = true相连.第一个传播块与一个只接受偶数的过滤器链接,其中第二个接收所有剩余的消息.

在发布最后一条消息后,我将第一个块设置为"已完成".不过,似乎有一个比赛条件.最后一个块有时似乎处理所有值,但有时仅处理第一个传播块接受的值,并且仅处理第二个传播块接受的部分值.

我觉得这是一种比赛条件.但我不知道如何正确地指示最终的源块,只有在链接到它的两个传播块转发了它们的所有消息之后,一切才完成.

下面是我的代码,简化为一个简单的示例:

    internal static class Program
    {
        public static async Task Main(string[] args)
        {
            var linkOptions = new DataflowLinkOptions
            {
                PropagateCompletion = true
            };
            var bufferBlock = new BufferBlock<int>();
            var fork1 = new TransformBlock<int, int>(n => n);
            var fork2 = new TransformBlock<int, int>(n =>
            {
                Thread.Sleep(100);
                return n;
            });
            var printBlock = new ActionBlock<int>(Console.WriteLine);

            bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
            bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
            
            fork1.LinkTo(printBlock, linkOptions);
            fork2.LinkTo(printBlock, linkOptions);
            
            for (var n = 1; n <= 10; ++n)
            {
                bufferBlock.Post(n);
            }
            bufferBlock.Complete();

            await printBlock.Completion;
        }
    }

这将输出:

2
4
6
8
10

我希望它能输出:

2
4
6
8
10
1
3
5
7
9

推荐答案

数据流图中有一个菱形,通过两个分支中的任何一个使最终块提前完成,使完成传播得更快.

最后一个块的完成可以使用任务延续进行自定义:

          ...
            var printBlock = new ActionBlock<int>(Console.WriteLine);

            bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
            bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
            
            fork1.LinkTo(printBlock); // no completion propagation
            fork2.LinkTo(printBlock);
           
            Task.WhenAll(fork1.Completion, fork2.Completion)
    .ContinueWith(t => printBlock.Complete(), TaskContinuationOptions.ExecuteSynchronously);

            for (var n = 1; n <= 10; ++n)
            {
                bufferBlock.Post(n);
            }

            bufferBlock.Complete();

            await printBlock.Completion;

Csharp相关问答推荐

为什么Microsoft.AspNetCore. htttp.FormFile不实现IDDisposable?

C#编译器会优化表达体成员的重新判断吗?

如何从顶部提取发票号作为单词发票后的第一个匹配

错误NU 1301:无法加载源的服务索引

IComponition.获取IReadOnlyCollection的返回默认属性值

如何使用C#中的图形API更新用户配置文件图像

使用特定格式的JsonConvert序列化对象

System.Text.Json .NET 8多形态语法化

有空容错运算符的对立面吗?

关于扩展文件类C#的矛盾

是否由DI容器自动处理由ActivatorUilties.CreateInstance()创建的服务?

Google OAuth令牌交换在.Net中不起作用

获取混淆&Quot;模糊引用&Quot;错误

用MongoDB c#驱动程序删除和返回嵌套数组中的文档

如何使用IHostedService添加数据种子方法

如何在C# WinForm控件中使用Windows 10/11的黑暗主题?

如何在Xamarin.Forms中检索PanGesture事件的位置?

如何读取TagHelper属性的文本值?

SignalR跨域

如何在更新数据库实体时忽略特定字段?