我有一个场景,其中有多个线程添加到一个队列中,并且多个线程从同一队列中读取数据.如果队列达到正在填满的特定大小all threads,则队列将在添加时被阻塞,直到从队列中删除项为止.

下面的解决方案就是我现在使用的解决方案,我的问题是:如何改进这一点?在我应该使用的BCL中是否有已经启用此行为的对象?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}

推荐答案

这看起来非常不安全(几乎没有同步);这样怎么样:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(编辑)

实际上,您可能需要一种关闭队列的方法,以便读者开始干净地退出(可能类似于bool标志),如果设置了,空队列将返回(而不是阻塞):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}

.net相关问答推荐

将Visual Studio更新到v17.9.3后,IDE关闭,dotnet.exe命令报告致命错误.内部CLR错误.(0x80131506)

WinForm Task.Wait :为什么它会阻塞 UI?

.NET MAUI ListView - ObservableCollection - 在异步方法期间不更新

竖线在 PropertyGroup .csproj 文件中的含义

在 .NET 中使用 AES 解密时缺少后半字节

使用 PowerShell 从文件夹中获取文件名的最快\最好的方法是什么?

如何判断属性设置器是否公开

在 C# 中获取 log4net 日志(log)文件

如何在 MSBuild 脚本中获取当前目录?

C#6.0 字符串插值本地化

将客户端证书添加到 .NET Core HttpClient

使用 IIS Express 托管网站(临时)

extern 在 C# 中是如何工作的?

在 C# 中转义命令行参数

使用 XmlDocument 读取 XML 属性

/langversion 的错误选项6无效;必须是 ISO-1、ISO-2、3、4、5 或默认值

我可以将构造函数参数传递给 Unity 的 Resolve() 方法吗?

如何将两个 List 相互比较?

System.ServiceModel 在 .NET Core 项目中找不到

泛型类的静态成员是否与特定实例相关联?