我正在编写一个新的Windows服务应用程序的设计阶段,该应用程序接受用于长时间运行的连接的TCP/IP连接(即,这不像HTTP那样有许多短连接,而是客户端连接并保持连接数小时、数天甚至数周).

我正在寻找设计网络体系 struct 的最佳方法.我需要为该服务至少启动一个线程.我正在考虑使用异步API(BeginRecieve等)因为我不知道在任何给定时间我会连接多少个客户端(可能是数百个).我绝对不想 for each 连接启动一个线程.

数据将主要从我的服务器流向客户端,但有时会从客户端发送一些命令.这主要是一个监控应用程序,在该应用程序中,我的服务器定期向客户端发送状态数据.

使其尽可能具有可伸缩性的最佳方式是什么?基本工作流程?

说清楚,我在找.基于NET的解决方案(如果可能,C#,但任何.NET语言都可以).

我需要一个解决方案的工作示例,或者作为指向我可以下载的东西的指针,或者一个简短的在线示例.一定是这样.NET和基于Windows的(任何.NET语言都可以接受).

推荐答案

我以前也写过类似的东西.我几年前的研究表明,使用asynchronous个套接字编写您自己的套接字实现是最好的 Select .这意味着客户端并不真正做任何事情,实际上只需要相对较少的资源.发生的任何事情都由.NET线程池处理.

我把它写成一个类,管理服务器的所有连接.

我只是使用了一个列表来保存所有的客户端连接,但是如果你需要更快地查找更大的列表,你可以随心所欲地编写它.

private List<xConnection> _sockets;

此外,还需要套接字实际监听传入的连接.

private System.Net.Sockets.Socket _serverSocket;

start方法实际启动服务器套接字,并开始侦听任何传入连接.

public bool Start()
{
  System.Net.IPHostEntry localhost = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
  System.Net.IPEndPoint serverEndPoint;
  try
  {
     serverEndPoint = new System.Net.IPEndPoint(localhost.AddressList[0], _port);
  }
  catch (System.ArgumentOutOfRangeException e)
  {
    throw new ArgumentOutOfRangeException("Port number entered would seem to be invalid, should be between 1024 and 65000", e);
  }
  try
  {
    _serverSocket = new System.Net.Sockets.Socket(serverEndPoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
   }
   catch (System.Net.Sockets.SocketException e)
   {
      throw new ApplicationException("Could not create socket, check to make sure not duplicating port", e);
    }
    try
    {
      _serverSocket.Bind(serverEndPoint);
      _serverSocket.Listen(_backlog);
    }
    catch (Exception e)
    {
       throw new ApplicationException("An error occurred while binding socket. Check inner exception", e);
    }
    try
    {
       //warning, only call this once, this is a bug in .net 2.0 that breaks if
       // you're running multiple asynch accepts, this bug may be fixed, but
       // it was a major pain in the rear previously, so make sure there is only one
       //BeginAccept running
       _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
    }
    catch (Exception e)
    {
       throw new ApplicationException("An error occurred starting listeners. Check inner exception", e);
    }
    return true;
 }

我只想指出异常处理代码看起来很糟糕,但原因是我在里面有异常 suppress 代码,所以如果设置了配置选项,任何异常都会被 suppress 并返回false,但为了简洁起见,我想删除它.

上面的_serverSocket.BeginAccept(new AsyncCallback(CeptCallback)),_serverSocket)实质上将我们的服务器套接字设置为在用户连接时调用ceptCallback方法.此方法从.NET线程池运行,如果您有许多阻塞操作,它会自动处理创建额外的工作线程.这将以最佳方式处理服务器上的任何负载.

    private void acceptCallback(IAsyncResult result)
    {
       xConnection conn = new xConnection();
       try
       {
         //Finish accepting the connection
         System.Net.Sockets.Socket s = (System.Net.Sockets.Socket)result.AsyncState;
         conn = new xConnection();
         conn.socket = s.EndAccept(result);
         conn.buffer = new byte[_bufferSize];
         lock (_sockets)
         {
           _sockets.Add(conn);
         }
         //Queue receiving of data from the connection
         conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
         //Queue the accept of the next incoming connection
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
       catch (SocketException e)
       {
         if (conn.socket != null)
         {
           conn.socket.Close();
           lock (_sockets)
           {
             _sockets.Remove(conn);
           }
         }
         //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
       catch (Exception e)
       {
         if (conn.socket != null)
         {
           conn.socket.Close();
           lock (_sockets)
           {
             _sockets.Remove(conn);
           }
         }
         //Queue the next accept, think this should be here, stop attacks based on killing the waiting listeners
         _serverSocket.BeginAccept(new AsyncCallback(acceptCallback), _serverSocket);
       }
     }

上述代码基本上刚刚完成接受传入的连接,队列BeginReceive是一个将在客户端发送数据时运行的回调,然后将接受传入的下一个客户端连接的下一个acceptCallback排队.

BeginReceive方法调用告诉套接字从客户端接收数据时该做什么.对于BeginReceive,您需要给它一个字节数组,当客户端发送数据时,它将在该数组中复制数据.ReceiveCallback方法将被调用,这就是我们处理接收数据的方式.

private void ReceiveCallback(IAsyncResult result)
{
  //get our connection from the callback
  xConnection conn = (xConnection)result.AsyncState;
  //catch any errors, we'd better not have any
  try
  {
    //Grab our buffer and count the number of bytes receives
    int bytesRead = conn.socket.EndReceive(result);
    //make sure we've read something, if we haven't it supposadly means that the client disconnected
    if (bytesRead > 0)
    {
      //put whatever you want to do when you receive data here

      //Queue the next receive
      conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);
     }
     else
     {
       //Callback run but no data, close the connection
       //supposadly means a disconnect
       //and we still have to close the socket, even though we throw the event later
       conn.socket.Close();
       lock (_sockets)
       {
         _sockets.Remove(conn);
       }
     }
   }
   catch (SocketException e)
   {
     //Something went terribly wrong
     //which shouldn't have happened
     if (conn.socket != null)
     {
       conn.socket.Close();
       lock (_sockets)
       {
         _sockets.Remove(conn);
       }
     }
   }
 }

编辑:在这个模式中,我忘了在这个代码区域中提到:

//put whatever you want to do when you receive data here

//Queue the next receive
conn.socket.BeginReceive(conn.buffer, 0, conn.buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), conn);

通常,在"随心所欲"代码中,我会将数据包重新组装成消息,然后在线程池中创建它们作为作业(job).这样,在运行任何消息处理代码时,从客户机开始接收下一个数据块不会延迟.

Accept回调通过调用End Receive完成对数据套接字的读取.这将填充BEGIN RECEIVE函数中提供的缓冲区.一旦您在我离开注释的位置执行了任何操作,我们将调用下一个BeginReceive方法,该方法将在客户端发送更多数据时再次运行回调.

现在有一个非常棘手的部分:当客户端发送数据时,您的receive回调可能只会被部分消息调用.重新组装可能会变得非常复杂.我使用自己的方法,创建了一种专有协议来实现这一点.我漏掉了,但如果你要求,我可以加进go .这个处理程序实际上是我写过的最复杂的代码.

public bool Send(byte[] message, xConnection conn)
{
  if (conn != null && conn.socket.Connected)
  {
    lock (conn.socket)
    {
    //we use a blocking mode send, no async on the outgoing
    //since this is primarily a multithreaded application, shouldn't cause problems to send in blocking mode
       conn.socket.Send(bytes, bytes.Length, SocketFlags.None);
     }
   }
   else
     return false;
   return true;
 }

上面的send方法实际上使用了一个同步Send调用.对我来说,由于消息大小和应用程序的多线程特性,这很好.如果你想发送给每一个客户端,你只需要在_sockets列表中循环.

您在上面看到的xConnection类基本上是套接字的一个简单包装器,它包括字节缓冲区,在我的实现中还包括一些额外的.

public class xConnection : xBase
{
  public byte[] buffer;
  public System.Net.Sockets.Socket socket;
}

这里还提供了我列出的using个,因为当它们没有被包括在内时,我总是感到恼火.

using System.Net.Sockets;

我希望这会有帮助.它可能不是最干净的代码,但它可以工作.代码中还有一些细微差别,你应该对这些细微差别感到厌倦.例如,每次只能打BeginAccept个电话.过go 有一个很烦人的地方.这是多年前的事了,所以我不记得细节了.

此外,在ReceiveCallback代码中,我们在对下一个接收进行排队之前处理从套接字接收的任何内容.这意味着,对于单个套接字,在任何时间点,我们实际上只有ReceiveCallback次,并且我们不需要使用线程同步.但是,如果在提取数据后立即对其重新排序以调用下一个receive(这可能会快一点),则需要确保正确同步线程.

此外,我破解了很多代码,但保留了正在发生的事情的本质.这对你的设计来说应该是一个好的开始.如果你对此有任何疑问,请留下 comments .

.net相关问答推荐

使用PowerShell在Windows容器内安装exe

MassTransit RespondAsync 无法返回空值

lock() 是否保证按请求的顺序获得?

如何防止和/或处理 StackOverflowException?

app.config 文件和 XYZ.settings 文件有什么区别?

是什么让 Enum.HasFlag 这么慢?

SubscribeOn 和 ObserveOn 有什么区别

大型 WCF Web 服务请求因 (400) HTTP 错误请求而失败

你如何调试 MVC 4 API 路由?

在 C# 中将字符串转换为十六进制字符串

C# 测试字符串是否为整数?

.NET 中是否有可序列化的通用键/值对类?

是否有可用的 WPF 备忘单?

Moq - 不可覆盖的成员不能用于设置/验证表达式

嵌套的 Try/Catch 块是个坏主意吗?

功能说明

是否可以判断对象是否已附加到实体框架中的数据上下文?

/langversion 的错误选项6无效;必须是 ISO-1、ISO-2、3、4、5 或默认值

使用没有catch块的try-finally块

ConfigurationManager.AppSettings - 如何修改和保存?