假设您有以下接口:
interface IConflateWorkByKey<TKey, TValue>
{
IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues();
void Publish(TKey key, TValue value);
}
这是用来描述类似工作队列的模式.生产者可以通过Publish
排队,单个消费者可以通过GetValues
排出物品,从而:
- 值应按键合并.这意味着,如果在消费者有机会耗尽下一批词典(词典)之前,生产者先拨打
Publish(1, "hello")
,然后再拨打Publish(1, "world")
,那么消费者收到的下一本词典应该是key:1,value:"world".先前的更新(1,"Hello")将被丢弃/永远不会被消费者看到 - 调用
Publish
的速度通常比耗尽工作的速度快得多.换句话说,对Publish
的调用应该尽可能快,因此排空项并不是非常关键的性能. - 在大多数实际情况下,当工作使用者移动从
GetValues
返回的迭代器时,新的项可能已经可用,并且不需要实际等待;对于这种情况,具有快速路径优化将是有用的.然而,需要准备一个实现来应对这种情况,然后异步地等待新的项可用 - 将只有一个消费者(即:
GetValues
将仅由1个消费者调用/消费) - 不会并发调用发布(尽管它可以从不同的线程顺序调用)
我当前的实现如下:
class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
{
private Dictionary<TKey,TValue>? _buffered = null;
private readonly object _lock = new();
public IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues(CancellationToken ct)
{
while(!ct.IsCancellationRequested)
{
lock(_lock)
{
while(_buffered is null)
Monitor.Wait(_lock);
var result = _buffered;
_buffered = null;
yield return result;
}
}
}
public void Publish(TKey key, TValue value)
{
lock(_lock)
{
_buffered ??= new();
_buffered[key] = value;
Monitor.Pulse(_lock);
}
}
}
请注意,如果Publish
方法对于特定实现是最佳的,我愿意将其更改为返回ValueTask
.
这在原则上工作得很好,但主要问题是这里的GetValues
的实现不是异步的;调用线程在Monitor.Wait
上被正确地阻塞.
我也try 过从Nito.AsyncEx
到AsyncMonitor
的模式--但不幸的是,AsyncMonitor.Pulse
太慢了.
有没有人能想到一个更聪明的实现/模式,它在发布值方面非常快,同时允许GetValues
以内的真正的等待/信令?
编辑:这是另一个 idea .我还没有仔细考虑这是否正确,并衡量业绩,但在这里列出它以供辩论.当然,还是对其他 idea 很好奇!
class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
{
private Dictionary<TKey,TValue>? _buffered = new();
private readonly object _lock = new();
private TaskCompletionSource? _tcs = null;
public IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues(CancellationToken ct)
{
while(!ct.IsCancellationRequested)
{
Dictionary<TKey,TValue> result;
while(true) {
lock(_lock)
{
if(_buffered.Any())
{
// "Fast path" - next result is already available, publish directly without having to wait
result = _buffered;
_buffered = new();
break;
}
_tcs = new();
}
await _tcs.Task;
}
yield return result;
}
}
public void Publish(TKey key, TValue value)
{
lock(_lock)
{
_buffered[key] = value;
if(_tcs is not null)
{
_tcs.TrySetResult();
_tcs = null; // "Fast path", next invocation of publish doesn't even need to call TrySetResult() if values weren't drained in between
}
}
}
}