我有一个Worker将BatchRequest发送到API,该API在ocelot中实现了我的自定义负载均衡器,在这个CustomLoadBalance中,我希望将每个BatchRequest的70%分发给HostAndPort1,将30%分发给另一个.最终目标是将总请求的70%发送到FirstApi,将30%发送到Second Api.问题是它没有以我想要的方式分发,只有当我只有一个线程时它才能工作.
这是向ocelotApi发送请求的工作代码:
public async ValueTask<long> DoReadAsync()
{
OnProcess = true;
var count = 0;
try
{
var sends = (await
_unitOfWork.SendRepository.Where(x =>
x.sndPost == false, x =>
new Send
{
sndID = x.sndID,
SndBody = x.SndBody,
SndTo = x.SndTo,
sndFrom = x.sndFrom,
SndFarsi = x.SndFarsi,
sndMsgClass = x.sndMsgClass,
sndUDH = x.sndUDH
}, c => c.OrderBy(o => o.SndPriority), BaseConfig.BatchRead)).ToList();
count = sends.Count;
if (count == 0)
{
await Task.Delay(_settings.RefreshMillisecond);
OnProcess = false;
return count;
}
var split = BaseConfig.Split(sends);
var restTasks = split
.Select(items => _restService.DoPostAsync(items.ToList()))
.ToList();
var updateTasks = new List<Task>();
while (restTasks.Any())
{
var task = await Task.WhenAny(restTasks);
//task.ThrowExceptionIfTaskIsFaulted();
var item = await task.ConfigureAwait(false);
if (item is { IsSuccessStatusCode: true })
{
var content = await item.Content.ReadAsStringAsync();
var items = JsonConvert.DeserializeObject<List<ResponseDto>>(content);
var itemsSends = items.Select(_mapper.Map<Send>).ToList();
if (itemsSends.Any())
{
var updateTask = _unitOfWork.SendRepository.BulkUpdateForwardOcelotAsync(itemsSends);
updateTasks.Add(updateTask);
}
}
restTasks.Remove(task);
}
await Task.WhenAll(updateTasks).ConfigureAwait(false);
Completed = true;
OnProcess = false;
}
catch (Exception ex)
{
_logger.LogError(ex.Message);
OnProcess = false;
}
return count;
}
在上面的代码中,我们有批处理:50000,我将它们分成10个任务,5,000个请求,并将它们发送到ocelotApi.
这是我用ocelotapi编写的代码,我编写了一个中间件,如下所示:
public class BatchMiddleware : OcelotMiddleware
{
private readonly RequestDelegate _next;
private bool isRahyabRG = true;
private int remainedBatch = 0;
public BatchMiddleware(
RequestDelegate next,
IConfiguration configuration,
IOcelotLoggerFactory loggerFactory) : base(loggerFactory.CreateLogger<BatchMiddleware>())
{
_next = next;
}
public async Task Invoke(HttpContext httpContext)
{
var request = httpContext.Request;
var batchRequests = await request.DeserializeArrayAsync<RequestDto>();
var batchRequestCount = batchRequests.Count;
var RGCount = (int)Math.Floor(70 * batchRequestCount / 100.0);
if (isRahyabRG)
{
var rgRequests = batchRequests.Take(RGCount).ToList();
var requestBody = JsonConvert.SerializeObject(rgRequests);
request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
isRahyabRG = false;
remainedBatch = batchRequestCount - RGCount;
httpContext.Session.SetString("remainedBatchKey", remainedBatch.ToString());
}
else
{
var remainedBatchKey = httpContext.Session.GetString("remainedBatchKey");
var pmRequests = new List<RequestDto>();
if (remainedBatchKey != null)
{
pmRequests = batchRequests.Take(int.Parse(remainedBatchKey)).ToList();
}
var requestBody = JsonConvert.SerializeObject(pmRequests);
request.Body = new MemoryStream(Encoding.UTF8.GetBytes(requestBody));
isRahyabRG = true;
}
await _next.Invoke(httpContext);
}
这是我的客户负载平衡器:
public class MyRoundRobin : ILoadBalancer
{
private readonly Func<Task<List<Service>>> _services;
private readonly object _lock = new();
private int _last;
public MyRoundRobin(Func<Task<List<Service>>> services, IConfiguration configuration)
{
_services = services;
}
public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
{
var services = await _services();
lock (_lock)
{
if (_last >= services.Count)
_last = 0;
var next = services[_last++];
return new OkResponse<ServiceHostAndPort>(next.HostAndPort);
}
}
public void Release(ServiceHostAndPort hostAndPort)
{
}
}
}
这是ocelot.json:
{
"Routes": [
{
"DownstreamPathTemplate": "/api/Forward",
"DownstreamScheme": "http",
"DownstreamHostAndPorts": [
{
"Host": "localhost",
"Port": 51003
},
{
"Host": "localhost",
"Port": 32667
}
],
"UpstreamPathTemplate": "/",
"UpstreamHttpMethod": [
"POST"
],
"LoadBalancerOptions": {
"Type": "MyRoundRobin"
}
}
]
}