我有一个Worker将BatchRequest发送到API,该API在ocelot中实现了我的自定义负载均衡器,在这个CustomLoadBalance中,我希望将每个BatchRequest的70%分发给HostAndPort1,将30%分发给另一个.最终目标是将总请求的70%发送到FirstApi,将30%发送到Second Api.问题是它没有以我想要的方式分发,只有当我只有一个线程时它才能工作.

这是向ocelotApi发送请求的工作代码:

   public async ValueTask<long> DoReadAsync()
    {
        OnProcess = true;
        var count = 0;


        try
        {
            var sends = (await 
            _unitOfWork.SendRepository.Where(x =>
               x.sndPost == false, x =>
              new Send
              {
                  sndID = x.sndID,
                  SndBody = x.SndBody,
                  SndTo = x.SndTo,
                  sndFrom = x.sndFrom,
                  SndFarsi = x.SndFarsi,
                  sndMsgClass = x.sndMsgClass,
                  sndUDH = x.sndUDH
              }, c => c.OrderBy(o => o.SndPriority), BaseConfig.BatchRead)).ToList();

            count = sends.Count;

            if (count == 0)
            {
                await Task.Delay(_settings.RefreshMillisecond);
                OnProcess = false;
                return count;
            }
            var split = BaseConfig.Split(sends);

            var restTasks = split
                .Select(items => _restService.DoPostAsync(items.ToList()))
                .ToList();

            var updateTasks = new List<Task>();

            while (restTasks.Any())
            {
                var task = await Task.WhenAny(restTasks);
                //task.ThrowExceptionIfTaskIsFaulted();
                var item = await task.ConfigureAwait(false);
                if (item is { IsSuccessStatusCode: true })
                {
                    var content = await item.Content.ReadAsStringAsync();
                    var items = JsonConvert.DeserializeObject<List<ResponseDto>>(content);
                    var itemsSends = items.Select(_mapper.Map<Send>).ToList();
                    if (itemsSends.Any())
                    {
                        var updateTask = _unitOfWork.SendRepository.BulkUpdateForwardOcelotAsync(itemsSends);
                        updateTasks.Add(updateTask);
                    }

                }

                restTasks.Remove(task);

            }

            await Task.WhenAll(updateTasks).ConfigureAwait(false);
            Completed = true;
            OnProcess = false;


        }
        catch (Exception ex)
        {
            _logger.LogError(ex.Message);
            OnProcess = false;
        }
        return count;

    }

在上面的代码中,我们有批处理:50000,我将它们分成10个任务,5,000个请求,并将它们发送到ocelotApi.

这是我用ocelotapi编写的代码,我编写了一个中间件,如下所示:

public class BatchMiddleware : OcelotMiddleware
{
    private readonly RequestDelegate _next;
    private bool isRahyabRG = true;
    private int remainedBatch = 0;
    
    public BatchMiddleware(
        RequestDelegate next,
        IConfiguration configuration,
        IOcelotLoggerFactory loggerFactory) : base(loggerFactory.CreateLogger<BatchMiddleware>())

    {
        _next = next;
    }

    public async Task Invoke(HttpContext httpContext)
    {
        var request = httpContext.Request;
        var batchRequests = await request.DeserializeArrayAsync<RequestDto>();
        var batchRequestCount = batchRequests.Count;
        var RGCount = (int)Math.Floor(70 * batchRequestCount / 100.0);

        if (isRahyabRG)
        {
            var rgRequests = batchRequests.Take(RGCount).ToList();
            var requestBody = JsonConvert.SerializeObject(rgRequests);
            request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
            isRahyabRG = false;
            remainedBatch = batchRequestCount - RGCount;
            httpContext.Session.SetString("remainedBatchKey", remainedBatch.ToString());
        }
        else
        {
            var remainedBatchKey = httpContext.Session.GetString("remainedBatchKey");
            var pmRequests = new List<RequestDto>();
            if (remainedBatchKey != null)
            {
                pmRequests = batchRequests.Take(int.Parse(remainedBatchKey)).ToList();
            }
            var requestBody = JsonConvert.SerializeObject(pmRequests);
            request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
            isRahyabRG = true;
        }
        
        await _next.Invoke(httpContext);

    }

这是我的客户负载平衡器:

  public class MyRoundRobin : ILoadBalancer
{
    private readonly Func<Task<List<Service>>> _services;
    private readonly object _lock = new();

    private int _last;
    
    public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
    {
        _services = services;

    }

    public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
    {
        var services = await _services();

        lock (_lock)
        {
            if (_last >= services.Count)
                _last = 0;

            var next = services[_last++];
        
            return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
        }
    }

    public void Release(ServiceHostAndPort hostAndPort)
    {
    }


}

}

这是ocelot.json:

 {
"Routes": [

    {
        "DownstreamPathTemplate": "/api/Forward",
        "DownstreamScheme": "http",
        "DownstreamHostAndPorts": [

            {
                
                "Host": "localhost",
                "Port": 51003
            },
           
            {
                
                "Host": "localhost",
                "Port": 32667

            }

        ],
        "UpstreamPathTemplate": "/",
        "UpstreamHttpMethod": [
            "POST"
        ],


        "LoadBalancerOptions": {
            "Type": "MyRoundRobin"

        }
       
    }
]

}

推荐答案

您分享的代码是Ocelot文档中的一个示例,它是一个基本的循环负载均衡器.根据我在代码中的理解,会 for each 请求创建ILoadBalancer个实例.因此,保存共享信息的字段(lock和_last)应该是静态的.可以借助随机数实现加权轮询,随着请求数的增加,分布将接近期望值(%70-%30).您可以执行以下操作:

public class MyRoundRobin: ILoadBalancer
{
    private readonly Func<Task<List<Service>>> _services;
    
    public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
    {
        _services = services;
    }

    public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
    {
        var services = await _services();
        var whatToPick = Random.Shared.NextDouble() >= 0.7 ? 1 : 0; 
        // Beware that Random.Shared added .NET 6 onwards for thread safety
        var next = services[whatToPick];
        
        return new OkResponse<ServiceHostAndPort>(next.HostAndPort);        
    }

    public void Release(ServiceHostAndPort hostAndPort)
    {
    }
}

如果您不想实施涉及随机数的解决方案,可以try 以下解决方案:

public class MyRoundRobin : ILoadBalancer
{
    private readonly Func<Task<List<Service>>> _services;
    private static readonly object _lock = new();
    private static ulong counter;
    
    public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
    {
        _services = services;

    }

    public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
    {
        var services = await _services();
       
        lock (_lock)
        {
            var mod = unchecked(counter++) % 100;
            var whatToPick = mod < 70 ? 0 : 1;
            var next = services[whatToPick];
        
            return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
        }
    }

    public void Release(ServiceHostAndPort hostAndPort)
    {
    }


}

Csharp相关问答推荐

为什么在GuardationRule的收件箱函数中,decode.TryParse(valueString,out valueParsed)在给出1.0.1时返回true?

C#将参数传递给具有变化引用的变量

NumPy s fftn in C#with pythonnet'

如何注销Microsoft帐户?

MudBlazor—MudDataGrid—默认过滤器定义不允许用户修改基本过滤器

Nuget包Serilog.Sinks.AwsCloudwatch引发TypeLoadExceptions,因为父类型是密封的

为什么EventInfo.EventHandlerType返回可为空的Type值?

当用户右键单击文本框并单击粘贴时触发什么事件?

如果设置了另一个属性,则Newtonsoft JSON忽略属性

C#Null判断处理失败

是否有必要在ASP.NET Core中注册可传递依赖项?

在使用StringBuilder时,如何根据 colored颜色 设置为richTextBox中的特定行着色?

如何实现有条件的自定义Json转换器隐藏属性

如何使用Npgsql从SELECT获得所有查询结果

我应该为C#12中的主构造函数参数创建私有属性吗?

WPF如何获取有关从一个视图模型更改另一个视图模型的信息

在Unity C#中按键点击错误的参数

使用postman 测试配置了身份的.NET 6应用程序

如何对构建在Clean架构和CQRS之上的控制器进行单元测试?

如何模拟一个返回OneOf IServiceA,IServiceB的方法?使用Moq