我有一个Worker将BatchRequest发送到API,该API在ocelot中实现了我的自定义负载均衡器,在这个CustomLoadBalance中,我希望将每个BatchRequest的70%分发给HostAndPort1,将30%分发给另一个.最终目标是将总请求的70%发送到FirstApi,将30%发送到Second Api.问题是它没有以我想要的方式分发,只有当我只有一个线程时它才能工作.

这是向ocelotApi发送请求的工作代码:

   public async ValueTask<long> DoReadAsync()
    {
        OnProcess = true;
        var count = 0;


        try
        {
            var sends = (await 
            _unitOfWork.SendRepository.Where(x =>
               x.sndPost == false, x =>
              new Send
              {
                  sndID = x.sndID,
                  SndBody = x.SndBody,
                  SndTo = x.SndTo,
                  sndFrom = x.sndFrom,
                  SndFarsi = x.SndFarsi,
                  sndMsgClass = x.sndMsgClass,
                  sndUDH = x.sndUDH
              }, c => c.OrderBy(o => o.SndPriority), BaseConfig.BatchRead)).ToList();

            count = sends.Count;

            if (count == 0)
            {
                await Task.Delay(_settings.RefreshMillisecond);
                OnProcess = false;
                return count;
            }
            var split = BaseConfig.Split(sends);

            var restTasks = split
                .Select(items => _restService.DoPostAsync(items.ToList()))
                .ToList();

            var updateTasks = new List<Task>();

            while (restTasks.Any())
            {
                var task = await Task.WhenAny(restTasks);
                //task.ThrowExceptionIfTaskIsFaulted();
                var item = await task.ConfigureAwait(false);
                if (item is { IsSuccessStatusCode: true })
                {
                    var content = await item.Content.ReadAsStringAsync();
                    var items = JsonConvert.DeserializeObject<List<ResponseDto>>(content);
                    var itemsSends = items.Select(_mapper.Map<Send>).ToList();
                    if (itemsSends.Any())
                    {
                        var updateTask = _unitOfWork.SendRepository.BulkUpdateForwardOcelotAsync(itemsSends);
                        updateTasks.Add(updateTask);
                    }

                }

                restTasks.Remove(task);

            }

            await Task.WhenAll(updateTasks).ConfigureAwait(false);
            Completed = true;
            OnProcess = false;


        }
        catch (Exception ex)
        {
            _logger.LogError(ex.Message);
            OnProcess = false;
        }
        return count;

    }

在上面的代码中,我们有批处理:50000,我将它们分成10个任务,5,000个请求,并将它们发送到ocelotApi.

这是我用ocelotapi编写的代码,我编写了一个中间件,如下所示:

public class BatchMiddleware : OcelotMiddleware
{
    private readonly RequestDelegate _next;
    private bool isRahyabRG = true;
    private int remainedBatch = 0;
    
    public BatchMiddleware(
        RequestDelegate next,
        IConfiguration configuration,
        IOcelotLoggerFactory loggerFactory) : base(loggerFactory.CreateLogger<BatchMiddleware>())

    {
        _next = next;
    }

    public async Task Invoke(HttpContext httpContext)
    {
        var request = httpContext.Request;
        var batchRequests = await request.DeserializeArrayAsync<RequestDto>();
        var batchRequestCount = batchRequests.Count;
        var RGCount = (int)Math.Floor(70 * batchRequestCount / 100.0);

        if (isRahyabRG)
        {
            var rgRequests = batchRequests.Take(RGCount).ToList();
            var requestBody = JsonConvert.SerializeObject(rgRequests);
            request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
            isRahyabRG = false;
            remainedBatch = batchRequestCount - RGCount;
            httpContext.Session.SetString("remainedBatchKey", remainedBatch.ToString());
        }
        else
        {
            var remainedBatchKey = httpContext.Session.GetString("remainedBatchKey");
            var pmRequests = new List<RequestDto>();
            if (remainedBatchKey != null)
            {
                pmRequests = batchRequests.Take(int.Parse(remainedBatchKey)).ToList();
            }
            var requestBody = JsonConvert.SerializeObject(pmRequests);
            request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
            isRahyabRG = true;
        }
        
        await _next.Invoke(httpContext);

    }

这是我的客户负载平衡器:

  public class MyRoundRobin : ILoadBalancer
{
    private readonly Func<Task<List<Service>>> _services;
    private readonly object _lock = new();

    private int _last;
    
    public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
    {
        _services = services;

    }

    public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
    {
        var services = await _services();

        lock (_lock)
        {
            if (_last >= services.Count)
                _last = 0;

            var next = services[_last++];
        
            return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
        }
    }

    public void Release(ServiceHostAndPort hostAndPort)
    {
    }


}

}

这是ocelot.json:

 {
"Routes": [

    {
        "DownstreamPathTemplate": "/api/Forward",
        "DownstreamScheme": "http",
        "DownstreamHostAndPorts": [

            {
                
                "Host": "localhost",
                "Port": 51003
            },
           
            {
                
                "Host": "localhost",
                "Port": 32667

            }

        ],
        "UpstreamPathTemplate": "/",
        "UpstreamHttpMethod": [
            "POST"
        ],


        "LoadBalancerOptions": {
            "Type": "MyRoundRobin"

        }
       
    }
]

}

推荐答案

您分享的代码是Ocelot文档中的一个示例,它是一个基本的循环负载均衡器.根据我在代码中的理解,会 for each 请求创建ILoadBalancer个实例.因此,保存共享信息的字段(lock和_last)应该是静态的.可以借助随机数实现加权轮询,随着请求数的增加,分布将接近期望值(%70-%30).您可以执行以下操作:

public class MyRoundRobin: ILoadBalancer
{
    private readonly Func<Task<List<Service>>> _services;
    
    public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
    {
        _services = services;
    }

    public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
    {
        var services = await _services();
        var whatToPick = Random.Shared.NextDouble() >= 0.7 ? 1 : 0; 
        // Beware that Random.Shared added .NET 6 onwards for thread safety
        var next = services[whatToPick];
        
        return new OkResponse<ServiceHostAndPort>(next.HostAndPort);        
    }

    public void Release(ServiceHostAndPort hostAndPort)
    {
    }
}

如果您不想实施涉及随机数的解决方案,可以try 以下解决方案:

public class MyRoundRobin : ILoadBalancer
{
    private readonly Func<Task<List<Service>>> _services;
    private static readonly object _lock = new();
    private static ulong counter;
    
    public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
    {
        _services = services;

    }

    public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
    {
        var services = await _services();
       
        lock (_lock)
        {
            var mod = unchecked(counter++) % 100;
            var whatToPick = mod < 70 ? 0 : 1;
            var next = services[whatToPick];
        
            return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
        }
    }

    public void Release(ServiceHostAndPort hostAndPort)
    {
    }


}

Csharp相关问答推荐

System. InvalidOperationException:无法将数据库中的字符串值i转换为映射的ItemType枚举中的任何值''''

应该使用哪一个?"_counter += 1 OR互锁增量(ref_counter)"""

在C#中使用类中的对象值

当通过Google的Gmail Api发送邮件时,签名会产生dkim = neutral(正文散列未验证)'

取决于您的数据量的多个嵌套循环

C# CompareTo()和Compare()可以返回除-1和1以外的整数吗?

如何将端点(或с匹配请求并判断其路径)添加到BCL?

EFR32BG22 BLE在SPP模式下与PC(Windows 10)不连接

在Docker容器中运行API项目时,无法本地浏览到index.html

使用C#和.NET 7.0无法访问Cookie中的数据

使用动态键从请求体反序列化JSON

在C#中,当输入一个方法/局部函数时,我的IEnumerator被重置/重新创建.为什么?

RCL在毛伊岛应用程序和Blazor服务器应用程序.Net 8.0中使用页面

毛伊岛.NET 8图片不再适合按钮

为什么当我try 为玩家角色设置动画时,没有从文件夹中拉出正确的图像?

如何在单击按钮后多次异步更新标签

如何在C#.NET桌面应用程序中动态更改焦点工具上的后退 colored颜色

WPF如何获取有关从一个视图模型更改另一个视图模型的信息

SignalR跨域

如何将典型的C#一行程序作为单独的代码段编写?