我有一个异步方法,它包含一个无止境的while循环.该循环由程序的主线程频繁暂停和恢复.为此,我目前使用Nito.AsyncEx包中的PauseTokenSource类:

private readonly PauseTokenSource _pausation = new();

private async Task StartWorker()
{
    while (true)
    {
        await _pausation.Token.WaitWhilePausedAsync();
        DoSomething();
    }
}

这工作得很好,但我注意到,每次暂停和恢复PauseTokenSource时,都会分配大约ValueTask个字节.由于这种情况每秒发生多次,我正在寻找一种方法来消除这种开销.我的 idea 是用一个brew 的暂停机制替换PauseTokenSource,该机制有一个WaitWhilePausedAsync方法,返回ValueTask而不是Task.为了使实现分配自由,ValueTask应该由实现IValueTaskSource接口的东西支持.以下是我当前(失败的)try 实现此机制:

public class Pausation : IValueTaskSource
{
    private ManualResetValueTaskSourceCore<bool> _source;
    private bool _paused;

    public Pausation() => _source.RunContinuationsAsynchronously = true;

    public void Pause()
    {
        _paused = true;
        _source.Reset();
    }

    public void Resume()
    {
        _paused = false;
        _source.SetResult(default);
    }

    public ValueTask WaitWhilePausedAsync()
    {
        if (!_paused) return ValueTask.CompletedTask;
        return new ValueTask(this, _source.Version);
    }

    void IValueTaskSource.GetResult(short token)
    {
        _source.GetResult(token);
    }

    ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
    {
        return _source.GetStatus(token);
    }

    void IValueTaskSource.OnCompleted(Action<object> continuation, object state,
        short token, ValueTaskSourceOnCompletedFlags flags)
    {
        _source.OnCompleted(continuation, state, token, flags);
    }
}

我的Pausation类基于ManualResetValueTaskSourceCore<T> struct ,它应该简化最常见的IValueTaskSource个实现.显然我做错了什么,因为当我的工人awaitWaitWhilePausedAsync方法时,它与InvalidOperationException崩溃.

My question is:如何修复Pausation类,使其正常工作?我应该在_paused字段之外添加更多状态吗?我是不是在错误的地方调用了ManualResetValueTaskSourceCore<T>个方法?我要的是详细的修复说明,或者是完整的工作实现.

Specifics: Pausation类旨在用于单工作者-单控制器场景.只有一个异步工作线程(StartWorker方法),只有一个控制器线程发出PauseResume条命令.此外,不需要取消支持.辅助进程的终止由CancellationTokenSource独立处理(为简洁起见,从上述代码段中删除).唯一需要的功能是PauseResumeWaitWhilePausedAsync方法.唯一的要求是它工作正常,并且不分配内存.

我的worker-controller场景的可运行在线演示可以在here中找到.

Nito.AsyncEx.PauseTokenSource类的输出:

Controller loops: 112,748
Worker loops: 84, paused: 36 times

我的Pausation类的输出:

Controller loops: 117,397
Unhandled exception. System.InvalidOperationException: Operation is not valid due to the current state of the object.
   at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.GetStatus(Int16 token)
   at Program.Pausation.System.Threading.Tasks.Sources.IValueTaskSource.GetStatus(Int16 token)
   at Program.<>c__DisplayClass0_0.<<Main>g__StartWorker|0>d.MoveNext()

推荐答案

我相信你误解了Reset.ManualResetValueTaskSourceCore<T>.Reset一点也不像ManualResetEvent.Reset.对于ManualResetValueTaskSourceCore<T>Reset意味着"之前的操作已经完成,现在我想重用value任务,所以更改版本".因此,它应该在GetResult之后调用(即,在暂停的代码等待完成之后),而不是在Pause之内.封装此IMO的最简单方法是立即返回ValueTask.

类似地,除非已经返回值任务,否则代码不应该调用SetResult.ManualResetValueTaskSourceCore<T>实际上是严格围绕顺序操作设计的,有一个单独的"控制器"会使事情复杂化.通过跟踪消费者是否在等待,并且只有在有等待时才try 完成,您可能可以让它工作:

public class Pausation : IValueTaskSource
{
    private ManualResetValueTaskSourceCore<bool> _source;
    private readonly object _mutex = new();
    private bool _paused;
    private bool _waiting;

    public Pausation() => _source.RunContinuationsAsynchronously = true;

    public void Pause()
    {
        lock (_mutex)
            _paused = true;
    }

    public void Resume()
    {
        var wasWaiting = false;
        lock (_mutex)
        {
            wasWaiting = _waiting;
            _paused = _waiting = false;
        }

        if (wasWaiting)
            _source.SetResult(default);
    }

    public ValueTask WaitWhilePausedAsync()
    {
        lock (_mutex)
        {
            if (!_paused) return ValueTask.CompletedTask;
            _waiting = true;
            _source.Reset();
            return new ValueTask(this, _source.Version);
        }
    }

    void IValueTaskSource.GetResult(short token)
    {
        _source.GetResult(token);
    }

    ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
    {
        return _source.GetStatus(token);
    }

    void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
    {
        _source.OnCompleted(continuation, state, token, flags);
    }
}

我使用互斥锁修复了一些明显的竞争条件,但我不能保证剩下的不会有更微妙的竞争条件.它肯定会被限制为一个控制器和一个消费者,至少.

Csharp相关问答推荐

为什么我在PuppeteerSharp中运行StealthPlugin时会出现错误?

C#使用属性和值将JSON转换为XML

为什么这个Reflection. Emit代码会导致一个DDL ViolationException?

为什么Blazor值在更改后没有立即呈现?

在C#中,DirectoryEntry返回空AuditRules集合,即使审计规则确实存在

try 在Blazor项目中生成html

为什么我的表单在绑定到对象时提交空值?

在调整大小的控件上绘制

.NET SDK包中的官方C#编译器在哪里?

在C#中,是否有与变量DISARD对应的C++类似功能?

将操作从编辑页重定向到带参数的索引页

在';、';附近有错误的语法.必须声明标量变量";@Checkin";.';

使用未赋值的、传递的局部变量

RCL在毛伊岛应用程序和Blazor服务器应用程序.Net 8.0中使用页面

从GRPC连接创建ZipArchive

正在try 将自定义字体添加到我的控制台应用程序

WPF:如何从DatagridHeader的内容模板绑定到词典项

使用本地公共PEM文件加密字符串,使用Azure KayVault中的私钥解密

SqlException:无法打开数据库.升级到Dotnet 8后-数据库兼容性版本-非EFCore兼容性级别

如何保存具有多个重叠图片框的图片框?