当在. NET Core 8 Web API应用程序中为服务总线实现通用消费者时,我遇到了一个问题,即当试图解决消费者中的依赖关系时,IServiceProvider实例被释放或为null.如何确保IServiceProvider在使用者中保持有效和可访问,以正确解决依赖关系(如事件处理器策略)而不被丢弃?

背景: 我试图实现的是一种可重用的通用方法,用于在多个微服务之间订阅服务总线主题.每个微服务都应该能够轻松地实现这个通用类,并提供自己的策略来处理接收到的事件.

我正在使用HostedService,特别是PostConsumerHostedService,它负责触发消费者逻辑.

当try 在使用者中使用IServiceProvider时会出现问题,因为发现它在运行时被释放.

首先,这里是触发消费者的PostConsumerHostedService:

    public class PostConsumerHostedService : CronJobServiceBase
{
    private readonly IServiceProvider _service;

    public PostConsumerHostedService(
        IOptions<PostConsumerHostedServiceSettings> postConsumerHostedServiceSettings,
        ILogger<CronJobServiceBase> log,
        IServiceProvider service)
        : base(postConsumerHostedServiceSettings, log)
    {
        _service = service;
    }

    protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
    {
        using var scope = _service.CreateAsyncScope();
        var postTaskService = scope.ServiceProvider.GetRequiredService<IPostConsumerTaskService>();
        await postTaskService.StartAsync(cancellationToken);
    }
}

接下来,调用IServiceBusConsumer的PostConsumerTaskService:

public interface IPostConsumerTaskService : ITaskService, IHandleServiceBusMessage
    {
    }
public class PostConsumerTaskService : IPostConsumerTaskService
{
    private readonly IServiceBusConsumer _serviceBusConsumer;
    private readonly IServiceProvider _serviceProvider;

    public PostConsumerTaskService(
        IServiceBusConsumer serviceBusConsumer,
        IServiceProvider serviceProvider)
    {
        _serviceBusConsumer = serviceBusConsumer;
        _serviceProvider = serviceProvider;
    }

    public async Task HandleStringMessageAsync(string message)
    {
        var jsonDocument = JsonDocument.Parse(message);

        // Extract the value of the "type" field
        if (jsonDocument.RootElement.TryGetProperty("EventType", out JsonElement typeElement))
        {
            string type = typeElement.GetString();
            Console.WriteLine("Type: " + type);
            Enum.TryParse(type, out EventType myEvent);

            var eventStrategy = await _eventProcessorFactory.GetEventProcessorStrategy(_serviceProvider,myEvent);
            eventStrategy.ProcessEvent(message);
        }
        else
        {
            Console.WriteLine("Type field not found in JSON.");
        }

        await Task.CompletedTask;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await _serviceBusConsumer.StartAsync<LikeCreated>(this, cancellationToken);
    }
}

下面是ServiceBusConsumerBase,当试图检索事件处理器策略时,问题发生在这里:

public abstract class ServiceBusConsumerBase: IServiceBusConsumer
{
    private readonly ServiceBusConsumerSettingsBase _serviceBusConsummerSettings;

    public ServiceBusConsumerBase(IOptions<ServiceBusConsumerSettingsBase> serviceBusConsummerSettings)
    {
        _serviceBusConsummerSettings = serviceBusConsummerSettings.Value;
    }

    public async Task StartAsync<T>(IHandleServiceBusMessage handleServiceBusMessage,CancellationToken cancellationToken)
    {
        var serviceBusClient = new ServiceBusClient(_serviceBusConsummerSettings.ConnectionString);
        var processor = serviceBusClient.CreateProcessor(_serviceBusConsummerSettings.Topic, _serviceBusConsummerSettings.Subscription, new ServiceBusProcessorOptions
        {
            AutoCompleteMessages = false,
            MaxConcurrentCalls = 1
        });


        // handle received messages
        //processor.ProcessMessageAsync += args => MessageHandler<T>(args,handleServiceBusMessage);
        processor.ProcessMessageAsync += args => MessageStringHandler(args, handleServiceBusMessage);
        processor.ProcessErrorAsync += ErrorHandler;

        // Start processing
        await processor.StartProcessingAsync(cancellationToken);

        Console.WriteLine("Task running");

    }

    

    // handle received messages
    async Task MessageStringHandler(ProcessMessageEventArgs args, IHandleServiceBusMessage handleServiceBusMessage)
    {
        var body = args.Message.Body.ToString();
        Console.WriteLine($"Received: imageFileName {body}");

        await handleServiceBusMessage.HandleStringMessageAsync(body);

        // complete the message. messages are deleted from the subscription. 
        await args.CompleteMessageAsync(args.Message);
    }

    // handle any errors when receiving messages
    Task ErrorHandler(ProcessErrorEventArgs args)
    {
        Console.WriteLine(args.Exception.ToString());
        return Task.CompletedTask;
    }

}

错误:

System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'IServiceProvider'.
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ThrowHelper.ThrowObjectDisposedException()
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
   at PostService.Factories.EventProcessorFactory.GetEventProcessorStrategy(IServiceProvider serviceProvider, EventType eventType) in /Users/dmata/Projects/InstagramSD/PostService/Factories/EventProcessorFactory.cs:line 31
   at PostService.TaskServices.LikeTopicConsumerTaskService.HandleStringMessageAsync(String message) in /Users/dmata/Projects/InstagramSD/PostService/TaskServices/LikeTopicConsumerTaskService.cs:line 54
   at Common.ServiceBus.ServiceBusConsumerBase.MessageStringHandler(ProcessMessageEventArgs args, IHandleServiceBusMessage handleServiceBusMessage) in /Users/dmata/Projects/InstagramSD/Common/ServiceBus/ServiceBusConsumerBase.cs:line 59
   at Azure.Messaging.ServiceBus.ServiceBusProcessor.OnProcessMessageAsync(ProcessMessageEventArgs args)
   at Azure.Messaging.ServiceBus.ReceiverManager.OnMessageHandler(EventArgs args)

推荐答案

当你在这里开始"操作时:

    protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
    {
        using var scope = _service.CreateAsyncScope();
        var postTaskService = scope.ServiceProvider.GetRequiredService<IPostConsumerTaskService>();
        await postTaskService.StartAsync(cancellationToken);
    }

请注意如何调用"Start",然后处理scope变量.问题是,实际的处理程序在所有这些之后被执行,并且它们试图再次依赖于IServiceProvider.提供者已被放置在scope旁边,所以你不能这样做.

解决这个问题的一种方法是将not丢弃在那里的scope,可能只是将其存储为一个字段,并在类上实现IDisposable,以便稍后丢弃作用域.

例如,像这样的东西可能会奏效.

public class PostConsumerHostedService : CronJobServiceBase, IAsyncDisposable
{
    private readonly IServiceProvider _service;
    private AsyncServiceScope _scope;

    public PostConsumerHostedService(
        IOptions<PostConsumerHostedServiceSettings> postConsumerHostedServiceSettings,
        ILogger<CronJobServiceBase> log,
        IServiceProvider service)
        : base(postConsumerHostedServiceSettings, log)
    {
        _service = service;
    }

    public async ValueTask DisposeAsync()
    {
        await this.scope?.DisposeAsync();
    }

    protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
    {
        this.scope = _service.CreateAsyncScope();
        var postTaskService = scope.ServiceProvider.GetRequiredService<IPostConsumerTaskService>();
        await postTaskService.StartAsync(cancellationToken);
    }
}

请记住,这个设计有一个缺陷,如果你决定在同一个实例上调用ExecuteTaskAsync多次,这个缺陷就会出现,因为作用域将被覆盖.

话虽如此,完全解决这个问题的最干净的方法可能是确保在您第一次解决postTaskService之后再次try 接触容器(使用IServiceProvider):问题也会以这种方式消失.

现在我要说的是:我认为你应该重新考虑你的设计.与在托管服务中创建作用域不同,更好的设计是 for each 需要处理的消息创建一个作用域:当消息到达时,从main IServiceProvider(或仅仅注入IServiceScopeFactory)为其创建作用域.在此作用域中,创建消息处理器,然后让它处理消息.完成后,您可以关闭示波器.

这种设计可以更好地建模现有模式,在这种模式中创建"每个请求的作用域".例如,这与AspNetCore中发生的情况类似,但您处理的不是HTTP请求,而是服务总线请求.

每个消息一个作用域还可以避免一些问题,例如将实例保留太长时间或无意中跨处理程序共享它们.例如,如果您要使用EFCore的单个DbContext,您将开始看到许多问题,例如高内存使用率和并发问题.在处理类似的类时,最好总是每个"处理单元"有一个实例.

Csharp相关问答推荐

应用程序启动时出现错误:操作无法同时使用表单和SON主体参数

[0-n]范围内有多少个Integer在其小数表示中至少包含一个9?

在C# 11之前, struct 中的每个字段都必须显式分配?不能繁殖

EF Core 8—应用客户端投影后无法转换集操作

如果存在对CodeAnalysis.CSharp的引用,则不能引用netStandard2.0库

使用C#中的SDK在Microsoft Graph API上使用SubscribedSkus的问题

将轮询与超时同步

MongoDB.NET-将数据绑定到模型类,但无法读取整数值

将现有字段映射到EFCore中的复杂类型

ASP.NET配置kestrel以使用Windows证书存储中的HTTPS

在EF Core中,有没有什么方法可以防止在查询中写入相同的条件,而代之以表达式?

如何通过属性初始化器强制初始化继承记录内的属性?

NET8 Maui&;iOS:AppCenter崩溃错误

从VS调试器而不是测试资源管理器运行的调试NUnitDotNet测试

JsonSchema.Net删除假阳性判断结果

如何在使用属性 Select 器时判断是否可以为空

未显示详细信息的弹出对话框

从MudAutoComplete打开对话框,列表仍然可见

使用C#12中的主构造函数进行空判断

根据优先级整理合同列表