当在. NET Core 8 Web API应用程序中为服务总线实现通用消费者时,我遇到了一个问题,即当试图解决消费者中的依赖关系时,IServiceProvider实例被释放或为null.如何确保IServiceProvider在使用者中保持有效和可访问,以正确解决依赖关系(如事件处理器策略)而不被丢弃?
背景: 我试图实现的是一种可重用的通用方法,用于在多个微服务之间订阅服务总线主题.每个微服务都应该能够轻松地实现这个通用类,并提供自己的策略来处理接收到的事件.
我正在使用HostedService,特别是PostConsumerHostedService,它负责触发消费者逻辑.
当try 在使用者中使用IServiceProvider时会出现问题,因为发现它在运行时被释放.
首先,这里是触发消费者的PostConsumerHostedService:
public class PostConsumerHostedService : CronJobServiceBase
{
private readonly IServiceProvider _service;
public PostConsumerHostedService(
IOptions<PostConsumerHostedServiceSettings> postConsumerHostedServiceSettings,
ILogger<CronJobServiceBase> log,
IServiceProvider service)
: base(postConsumerHostedServiceSettings, log)
{
_service = service;
}
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
using var scope = _service.CreateAsyncScope();
var postTaskService = scope.ServiceProvider.GetRequiredService<IPostConsumerTaskService>();
await postTaskService.StartAsync(cancellationToken);
}
}
接下来,调用IServiceBusConsumer的PostConsumerTaskService:
public interface IPostConsumerTaskService : ITaskService, IHandleServiceBusMessage
{
}
public class PostConsumerTaskService : IPostConsumerTaskService
{
private readonly IServiceBusConsumer _serviceBusConsumer;
private readonly IServiceProvider _serviceProvider;
public PostConsumerTaskService(
IServiceBusConsumer serviceBusConsumer,
IServiceProvider serviceProvider)
{
_serviceBusConsumer = serviceBusConsumer;
_serviceProvider = serviceProvider;
}
public async Task HandleStringMessageAsync(string message)
{
var jsonDocument = JsonDocument.Parse(message);
// Extract the value of the "type" field
if (jsonDocument.RootElement.TryGetProperty("EventType", out JsonElement typeElement))
{
string type = typeElement.GetString();
Console.WriteLine("Type: " + type);
Enum.TryParse(type, out EventType myEvent);
var eventStrategy = await _eventProcessorFactory.GetEventProcessorStrategy(_serviceProvider,myEvent);
eventStrategy.ProcessEvent(message);
}
else
{
Console.WriteLine("Type field not found in JSON.");
}
await Task.CompletedTask;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _serviceBusConsumer.StartAsync<LikeCreated>(this, cancellationToken);
}
}
下面是ServiceBusConsumerBase,当试图检索事件处理器策略时,问题发生在这里:
public abstract class ServiceBusConsumerBase: IServiceBusConsumer
{
private readonly ServiceBusConsumerSettingsBase _serviceBusConsummerSettings;
public ServiceBusConsumerBase(IOptions<ServiceBusConsumerSettingsBase> serviceBusConsummerSettings)
{
_serviceBusConsummerSettings = serviceBusConsummerSettings.Value;
}
public async Task StartAsync<T>(IHandleServiceBusMessage handleServiceBusMessage,CancellationToken cancellationToken)
{
var serviceBusClient = new ServiceBusClient(_serviceBusConsummerSettings.ConnectionString);
var processor = serviceBusClient.CreateProcessor(_serviceBusConsummerSettings.Topic, _serviceBusConsummerSettings.Subscription, new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1
});
// handle received messages
//processor.ProcessMessageAsync += args => MessageHandler<T>(args,handleServiceBusMessage);
processor.ProcessMessageAsync += args => MessageStringHandler(args, handleServiceBusMessage);
processor.ProcessErrorAsync += ErrorHandler;
// Start processing
await processor.StartProcessingAsync(cancellationToken);
Console.WriteLine("Task running");
}
// handle received messages
async Task MessageStringHandler(ProcessMessageEventArgs args, IHandleServiceBusMessage handleServiceBusMessage)
{
var body = args.Message.Body.ToString();
Console.WriteLine($"Received: imageFileName {body}");
await handleServiceBusMessage.HandleStringMessageAsync(body);
// complete the message. messages are deleted from the subscription.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
}
错误:
System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'IServiceProvider'.
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ThrowHelper.ThrowObjectDisposedException()
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at PostService.Factories.EventProcessorFactory.GetEventProcessorStrategy(IServiceProvider serviceProvider, EventType eventType) in /Users/dmata/Projects/InstagramSD/PostService/Factories/EventProcessorFactory.cs:line 31
at PostService.TaskServices.LikeTopicConsumerTaskService.HandleStringMessageAsync(String message) in /Users/dmata/Projects/InstagramSD/PostService/TaskServices/LikeTopicConsumerTaskService.cs:line 54
at Common.ServiceBus.ServiceBusConsumerBase.MessageStringHandler(ProcessMessageEventArgs args, IHandleServiceBusMessage handleServiceBusMessage) in /Users/dmata/Projects/InstagramSD/Common/ServiceBus/ServiceBusConsumerBase.cs:line 59
at Azure.Messaging.ServiceBus.ServiceBusProcessor.OnProcessMessageAsync(ProcessMessageEventArgs args)
at Azure.Messaging.ServiceBus.ReceiverManager.OnMessageHandler(EventArgs args)