我和我的团队支持多个后台工作人员控制台应用程序,它们可以同步处理队列中的消息.

目前,当队列中的消息数量超过一定阈值时,我们使用docker来处理多个消息,从而启动应用程序的新实例.

我们正在讨论并行处理这些消息的其他方法,一位队友建议在while循环中使用顶级的async void方法来处理这些消息.

它看起来会起作用,因为它类似于一个基于UI的应用程序,使用Async void作为事件处理程序.

然而,我个人从来没有编写过以这种方式并行处理多个消息的后台工作人员,我想知道是否有人有这样做的经验,如果有,是否有我们没有考虑到的"trap ".

以下是建议解决方案的简化版本:

static async void Main(string[] args)
{
    while (true)
    {
        TopLevelHandler(await ReceiveMessage());
    }
}

static async void TopLevelHandler(object message)
{
    await DoAsyncWork(message);
}

static async Task<object> ReceiveMessage()
{
    //fetch message from queue
    return new object();
}

static async Task DoAsyncWork(object message)
{
    //processing here
}

推荐答案

对于最新的网络,我们默认有async Main个方法.

此外,如果您没有该功能,您可以将Main标记为异步.

这里的主要风险是您可能会错过异常,并且必须非常小心地处理创建的任务.如果您要在while (true)循环中创建任务,很可能会在某个时刻(当有人错误地使用某个阻塞调用时)导致线程池匮乏.

下面的示例代码显示了应该考虑的内容,但我最确定的是,还会有更复杂的内容:

using System.Collections.Concurrent;
using System.Security.Cryptography.X509Certificates;

namespace ConsoleApp2;

public static class Program
{
    /// <summary>
    /// Property to track all running tasks, should be dealt with carefully.
    /// </summary>
    private static ConcurrentBag<Task> _runningTasks = new();

    static Program()
    {
        // Subscribe to an event.
        TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException;
    }

    static async Task Main(string[] args)
    {
        var messageNo = 1;

        while (true)
        {
            // Just schedule the work
            // Here you should most probably limit number of
            // tasks created.
            var task = ReceiveMessage(messageNo)
                .ContinueWith(t => TopLevelHandler(t.Result));

            _runningTasks.Add(task);
            messageNo++;
        }
    }

    static async Task TopLevelHandler(object message)
    {
        await DoAsyncWork(message);
    }

    static async Task<object> ReceiveMessage(int messageNumber)
    {
        //fetch message from queue
        await Task.Delay(5000);
        return await Task.FromResult(messageNumber);
    }

    static async Task DoAsyncWork(object message)
    {
        //processing here
        Console.WriteLine($"start processing message {message}");
        await Task.Delay(5000);
        
        // Only when you want to test catching exception
        // throw new Exception("some malicious code");

        Console.WriteLine($"end processing message {message}");
    }

    /// <summary>
    /// Here you handle any unosberved exception thrown in a task.
    /// Preferably you should handle somehow all running work in other tasks.
    /// </summary>
    /// <param name="sender"></param>
    /// <param name="e"></param>
    static async void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
    {
        // Potentially this method could be enter by multiple tasks having exception.
        Console.WriteLine($"Exception caught: {e.Exception.InnerException.Message}");
        await Task.WhenAll(_runningTasks.Where(x => !x.IsCompleted));
    }
}

Csharp相关问答推荐

Inbox RandomizedStringHashAl算法如何帮助抵御哈希洪泛攻击?

访问C#中的数据库字段时获取数据是收件箱错误-为什么?&有效,如果声明不有效

为什么.Equals(SS,StringComparison. ClientCultureIgnoreCase)在Net 4.8和6.0之间不同?

PredicateBuilder不是循环工作,而是手动工作

注册通用工厂的C# Dep注入

使用yaml将Azure函数代码部署到FunctionApp插槽时出现问题(zip未找到)

数组被内部函数租用时,如何将数组返回给ArrayPool?

使用LINQ to XML获取元素值列表是不起作用的

"virtual"修饰符对接口成员有什么影响?

Microsoft. VisualBasic. FileIO. FileSystem. MoveFile()对话框有错误?

如何注销Microsoft帐户?

使用泛型可空类实现接口

我的MRG32k3a算法实现返回的数字超出了预期的[0,1]范围

是否由DI容器自动处理由ActivatorUilties.CreateInstance()创建的服务?

在ASP.NET Core 8 MVC中本地化共享视图

RCL在毛伊岛应用程序和Blazor服务器应用程序.Net 8.0中使用页面

如何在C#中从MongoDB IPipelineStageDefinition中获取聚合命令的字段/选项?

C#中使用ReadOnlySpan的泛型类型推理

如何在特定时间间隔运行多个后台任务?

除非首先访问使用的终结点,否则本地API上的终结点不起作用