当发送数据速度足够快时,会抛出InvalidOperationException:"此WebSocket实例已经有一个未完成的'SendAsync'调用.".ClientWebSocket.ReceiveAsync
和ClientWebSocket.SendAsync
可以同时调用,但最多允许同时为它们中的每一个调用一个未完成的操作.
第一个示例使用信号量LIM,它一次只允许一条消息来防止该问题的发生.
问题是,既然频道选项中指定了SingleReader = true
,我是否需要在第二个示例中执行相同的SemaphoreSlim
种变通方法?基本上应该是一样的,但我想有人确认一下,所以没有什么意外.
没有系统.穿线.渠道
private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1);
public async Task<bool> SendMessageAsync(ReadOnlyMemory<byte> message)
{
if (_webSocket.State != WebSocketState.Open)
{
return false;
}
// Only one message can be sent at a time. Wait until a message can be sent.
await sendAsyncSemaphore.WaitAsync().ConfigureAwait(false);
// We can now send a message. Our thread has entered sendAsyncSemaphore, so we should be the one to release it.
try
{
await _Client.SendAsync(message, WebSocketMessageType.Text, true, connectionCancellation.Token).ConfigureAwait(true);
}
catch (TaskCanceledException)
{
sendAsyncSemaphore.Release();
return false;
}
// Our thread can now release the sendAsyncSemaphore so another message can be sent.
sendAsyncSemaphore.Release();
return true;
}
public void Dispose()
{
sendAsyncSemaphore.Dispose();
}
有系统.穿线.渠道
public class WebSocketClient
{
private readonly WebSocket _webSocket;
private readonly Channel<string> _input;
private readonly Channel<string> _output;
public WebSocketClient(WebSocket webSocket)
{
_webSocket = webSocket;
_input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleWriter = true
});
_output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleReader = true
});
}
private async Task SendLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
{
if (_webSocket.State != WebSocketState.Open)
{
return;
}
var bytesToSend = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
await _webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
}
}
}