假设您有以下接口:

interface IConflateWorkByKey<TKey, TValue>
{
  IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues();
  void Publish(TKey key, TValue value);
}

这是用来描述类似工作队列的模式.生产者可以通过Publish排队,单个消费者可以通过GetValues排出物品,从而:

  • 值应按键合并.这意味着,如果在消费者有机会耗尽下一批词典(词典)之前,生产者先拨打Publish(1, "hello"),然后再拨打Publish(1, "world"),那么消费者收到的下一本词典应该是key:1,value:"world".先前的更新(1,"Hello")将被丢弃/永远不会被消费者看到
  • 调用Publish的速度通常比耗尽工作的速度快得多.换句话说,对Publish的调用应该尽可能快,因此排空项并不是非常关键的性能.
  • 在大多数实际情况下,当工作使用者移动从GetValues返回的迭代器时,新的项可能已经可用,并且不需要实际等待;对于这种情况,具有快速路径优化将是有用的.然而,需要准备一个实现来应对这种情况,然后异步地等待新的项可用
  • 将只有一个消费者(即:GetValues将仅由1个消费者调用/消费)
  • 不会并发调用发布(尽管它可以从不同的线程顺序调用)

我当前的实现如下:

class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
{
  private Dictionary<TKey,TValue>? _buffered = null;
  private readonly object _lock = new();

  public IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues(CancellationToken ct)
  {
    while(!ct.IsCancellationRequested)
    {
      lock(_lock)
      {
        while(_buffered is null)
          Monitor.Wait(_lock);

        var result = _buffered;
        _buffered = null;
        yield return result;
      }
    }
  }

  public void Publish(TKey key, TValue value)
  {
    lock(_lock)
    {
      _buffered ??= new();
      _buffered[key] = value;
      Monitor.Pulse(_lock);
    }
  }
}

请注意,如果Publish方法对于特定实现是最佳的,我愿意将其更改为返回ValueTask.

这在原则上工作得很好,但主要问题是这里的GetValues的实现不是异步的;调用线程在Monitor.Wait上被正确地阻塞.

我也try 过从Nito.AsyncExAsyncMonitor的模式--但不幸的是,AsyncMonitor.Pulse太慢了.

有没有人能想到一个更聪明的实现/模式,它在发布值方面非常快,同时允许GetValues以内的真正的等待/信令?


编辑:这是另一个 idea .我还没有仔细考虑这是否正确,并衡量业绩,但在这里列出它以供辩论.当然,还是对其他 idea 很好奇!

class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
{
  private Dictionary<TKey,TValue>? _buffered = new();
  private readonly object _lock = new();
  private TaskCompletionSource? _tcs = null;

  public IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues(CancellationToken ct)
  {
    while(!ct.IsCancellationRequested)
    {
      Dictionary<TKey,TValue> result;

      while(true) {
        lock(_lock)
        {
          if(_buffered.Any())
          {
            // "Fast path" - next result is already available, publish directly without having to wait
            result = _buffered;
            _buffered = new();
            break;
          }
          _tcs = new();
        }
        await _tcs.Task;
      }

      yield return result;
    }
  }

  public void Publish(TKey key, TValue value)
  {
    lock(_lock)
    {
      _buffered[key] = value;

      if(_tcs is not null)
      {
        _tcs.TrySetResult();
        _tcs = null; // "Fast path", next invocation of publish doesn't even need to call TrySetResult() if values weren't drained in between
      }
    }
  }
}

推荐答案

以下是一个实现.如果消费者试图在词典为空的时刻消费批次,则使用TaskCompletionSource<T>:

class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
{
    private readonly object _locker = new();
    private TaskCompletionSource<Dictionary<TKey, TValue>> _tcs;
    private Dictionary<TKey, TValue> _dictionary;

    public void Publish(TKey key, TValue value)
    {
        lock (_locker)
        {
            if (_tcs is not null)
            {
                Debug.Assert(_dictionary is null);
                _tcs.SetResult(new() { { key, value } });
                _tcs = null;
            }
            else
            {
                _dictionary ??= new();
                _dictionary[key] = value;
            }
        }
    }

    public async IAsyncEnumerable<Dictionary<TKey, TValue>> GetValues(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        using CancellationTokenRegistration ctr = cancellationToken.Register(() =>
        {
            lock (_locker)
            {
                if (_tcs is not null)
                {
                    _tcs.SetCanceled(cancellationToken);
                    _tcs = null;
                }
            }
        });

        while (true)
        {
            Dictionary<TKey, TValue> result = null;
            Task<Dictionary<TKey, TValue>> taskResult = null;
            lock (_locker)
            {
                if (_tcs is not null) throw new InvalidOperationException(
                    "Multiple consumers are not supported.");
                cancellationToken.ThrowIfCancellationRequested();
                if (_dictionary is not null)
                {
                    result = _dictionary;
                    _dictionary = null;
                }
                else
                {
                    _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
                    taskResult = _tcs.Task;
                }
            }
            if (result is not null)
                yield return result;
            else if (taskResult is not null)
                yield return await taskResult.ConfigureAwait(false);
        }
    }
}

此实现支持多个并发生产者,最多支持一个消费者.

理想情况下,您需要一个像CompletePublishing这样的额外方法,这样生产者就可以发出不会生产更多产品的信号,从而允许消费者在处理完所有批次后退出await foreach循环.CancellationToken不适合这个角色,因为它突然取消了消费者,可能是在消费最后一批之前.至少这是CancellationToken的预期行为.例如,比较类似.NET组件的CompleteAddingComplete方法.

Csharp相关问答推荐

C#中的包版本控制

Serilog SQL服务器接收器使用UTC作为时间戳

将修剪声明放入LINQ中

使用LINQ to XML获取元素值列表是不起作用的

==和Tuple对象的相等<>

. NET 8控制台应用程序DI错误无法解析Microsoft. Extension. Logging. ILoggerFactory类型的服务'''

如何在实体框架中添加包含列表?

System.Text.Json .NET 8多形态语法化

当我将`ConcurentDictionary`转换为`IDictionary`时,出现了奇怪的并发行为

在C#中有没有办法减少大型数组中新字符串的分配?

当try 测试具有协变返回类型的抽象属性时,类似功能引发System.ArgumentException

正在从最小API-InvocationConext.Arguments中检索参数的FromBodyAttribute

如何在.NET MAUI中最大化GraphicsView的大小?

为什么当我try 为玩家角色设置动画时,没有从文件夹中拉出正确的图像?

为什么我的自定义Json.net转换器不工作?

Xamarin.Forms中具有类似AspectFill的图像zoom 的水平滚动视图

为什么我的属性即使没有显式地设置任何[必需]属性,也会显示验证?

如何为控制器PUT操作绑定对象数组

SqlException:无法打开数据库.升级到Dotnet 8后-数据库兼容性版本-非EFCore兼容性级别

如何保存具有多个重叠图片框的图片框?