当发送数据速度足够快时,会抛出InvalidOperationException:"此WebSocket实例已经有一个未完成的'SendAsync'调用.".ClientWebSocket.ReceiveAsyncClientWebSocket.SendAsync可以同时调用,但最多允许同时为它们中的每一个调用一个未完成的操作.

第一个示例使用信号量LIM,它一次只允许一条消息来防止该问题的发生.

问题是,既然频道选项中指定了SingleReader = true,我是否需要在第二个示例中执行相同的SemaphoreSlim种变通方法?基本上应该是一样的,但我想有人确认一下,所以没有什么意外.

没有系统.穿线.渠道

private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1);

public async Task<bool> SendMessageAsync(ReadOnlyMemory<byte> message)
{
    if (_webSocket.State != WebSocketState.Open)
    {
        return false;
    }
    
    // Only one message can be sent at a time. Wait until a message can be sent.
    await sendAsyncSemaphore.WaitAsync().ConfigureAwait(false);
    
    // We can now send a message. Our thread has entered sendAsyncSemaphore, so we should be the one to release it.
    try
    {
        await _Client.SendAsync(message, WebSocketMessageType.Text, true, connectionCancellation.Token).ConfigureAwait(true);
    }
    catch (TaskCanceledException)
    {
        sendAsyncSemaphore.Release();
        return false;
    }
    
    // Our thread can now release the sendAsyncSemaphore so another message can be sent.
    sendAsyncSemaphore.Release();
    return true;
}

public void Dispose()
{
    sendAsyncSemaphore.Dispose();
}

有系统.穿线.渠道

public class WebSocketClient
{
    private readonly WebSocket _webSocket;
    private readonly Channel<string> _input;
    private readonly Channel<string> _output;

    public WebSocketClient(WebSocket webSocket)
    {
        _webSocket = webSocket;

        _input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleWriter = true
        });

        _output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleReader = true
        });
    }

    private async Task SendLoopAsync(CancellationToken cancellationToken)
    {
        await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
        {
            if (_webSocket.State != WebSocketState.Open)
            {
                return;
            }

            var bytesToSend = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
            await _webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
        }
    }
}

推荐答案

简短回答是的!您仍然需要一些东西来限制SingleReader = true时未完成的读取调用的数量.documentation清楚地表明,您将该属性设置为true"如果通道的读卡器保证一次最多只有一个读取操作".因此,该属性不提供对发送者的限制.

在我看来,使用信号量或任何其他同步原语都不是一种解决方法.如果你看一下代码,你会发现设置SingleReader选项会得到SingleConsumerUnboundedChannel.如果不自己限制读卡器,最终会无意中取消读操作.

Csharp相关问答推荐

我正在制作一个Unity游戏,如果对这个玩家控制器进行编码,但当它 skip 时,它会删除我的速度

System.文本.Json现在总是需要无参数构造函数吗?

使用其可能实现的基类和接口的属性的方法

ListaryImportProperty的默认DllImportSearchPathsProperty行为

将委托传递到serviceccollection c#web API

如何注销Microsoft帐户?

如何在C#中从正则表达式中匹配一些数字但排除一些常量(也是数字)

最新的Mediatr和具有同步方法的处理程序Handle:并非所有代码路径都返回值"

如何在没有前缀和可选后缀的情况下获取Razor Page Handler方法名称?

如何使用XmlSerializer序列化带有CDATA节的XML文件?

C#使用TextFieldParser读取.csv,但无法使用";0";替换创建的列表空条目

如何在VS代码中为C#DotNet配置.json选项以调试内部终端的控制台应用程序

什么时候接受(等待)信号灯?尽可能的本地化?

避免只读记录 struct 中的防御副本

我的命名管道在第一次连接后工作正常,但后来我得到了System.ObjectDisposedException:无法访问关闭的管道

岛屿和框架中的自定义控件库.Navigate-AccessViolationException

如何在Polly重试策略成功之前将HttpClient请求排队?

.NET文档对继承的困惑

客户端/服务器RPC如何处理全局变量?

当我在Git中暂存文件更改时,它们会消失