我需要一些帮助来解决我在使用 .NET 套接字通过 TCP 传输大量数据时遇到的问题。
简而言之,当客户端应用程序启动时,它会连接到服务器上的特定端口。连接后,服务器开始向客户端发送实时数据,客户端在类似股票的 UI 中显示信息。服务器支持多个客户端工作站,因此数据将通过多个端口(多个套接字)发送。
一切都已实施,并且在缓慢进给和低产量的情况下运行良好。我正在对系统进行压力测试,以确保弹性和可扩展性。当我增加频率时,服务器运行完美。但是,我看到客户端上似乎丢失了数据包。这种情况随机发生。
目前,每条广播消息都以一个 4 字节值开头,用于标识该消息的长度。当我们在客户端接收数据时,我们将数据附加到缓冲区(流),直到收到该数量的字节。任何附加字节都被视为下一条消息的开始。再次强调,在我调高频率之前,这种方法效果很好。
在我的测试中,我发送了一个大约 225 字节的数据包,然后是一个大约 310kB 的数据包,另一个大约 40kb 的数据包。在大约 12 个客户端运行时,每 1 秒发送一条消息不会失败。将频率增加到 1/2 秒,我最终看到客户的一个显示器冻结了。到 1/4 秒,我可以在几秒钟内用最少 4 个客户端重现该问题。
查看我的代码(如果需要,我可以提供),我看到所有客户端都在接收数据,但不知何故信息“不同步”,并且预期长度值巨大(在 1 亿范围内) 。结果,我们只是不断地读取数据,却从未察觉到消息的结束。
我要么需要更好的方法,要么需要一种方法来确保我获得预期的数据并且不会丢失数据包。你能帮我吗?
UPDATE
我做了大量额外的测试,改变了消息的大小和发送频率。肯定存在相关性。消息大小越小,达到的频率就越高。但是,不可避免的是,我总是能够打破它。
因此,更准确地描述我正在寻找的是:
了解正在发生的事情。这将帮助我确定可能的解决方案,或者至少建立可靠行为的阈值。
实施故障安全机制,以便当问题发生时,我可以处理它并可能从中恢复。也许在数据流中添加校验和或类似的东西。
这是我在客户端(接收)应用程序中运行的代码:
public void StartListening(SocketAsyncEventArgs e)
{
e.Completed += SocketReceive;
socket.ReceiveAsync(e);
}
private void SocketReceive(Object sender, SocketAsyncEventArgs e)
{
lock (_receiveLock)
{
ProcessData(e.Buffer, e.BytesTransferred);
socket.ReceiveAsync(e);
}
}
private void ProcessData(Byte[] bytes, Int32 count)
{
if (_currentBuffer == null)
_currentBuffer = new ReceiveBuffer();
var numberOfBytesRead = _currentBuffer.Write(bytes, count);
if (_currentBuffer.IsComplete)
{
// Notify the client that a message has been received (ignore zero-length, "keep alive", messages)
if (_currentBuffer.DataLength > 0)
NotifyMessageReceived(_currentBuffer);
_currentBuffer = null;
// If there are bytes remaining from the original message, recursively process
var numberOfBytesRemaining = count - numberOfBytesRead;
if (numberOfBytesRemaining > 0)
{
var remainingBytes = new Byte[numberOfBytesRemaining];
var offset = bytes.Length - numberOfBytesRemaining;
Array.Copy(bytes, offset, remainingBytes, 0, numberOfBytesRemaining);
ProcessData(remainingBytes, numberOfBytesRemaining);
}
}
}
internal sealed class ReceiveBuffer
{
public const Int32 LengthBufferSize = sizeof(Int32);
private MemoryStream _dataBuffer = new MemoryStream();
private MemoryStream _lengthBuffer = new MemoryStream();
public Int32 DataLength { get; private set; }
public Boolean IsComplete
{
get { return (RemainingDataBytesToWrite == 0); }
}
private Int32 RemainingDataBytesToWrite
{
get
{
if (DataLength > 0)
return (DataLength - (Int32)_dataBuffer.Length);
return 0;
}
}
private Int32 RemainingLengthBytesToWrite
{
get { return (LengthBufferSize - (Int32)_lengthBuffer.Length); }
}
public Int32 Write(Byte[] bytes, Int32 count)
{
var numberOfLengthBytesToWrite = Math.Min(RemainingLengthBytesToWrite, count);
if (numberOfLengthBytesToWrite > 0)
WriteToLengthBuffer(bytes, numberOfLengthBytesToWrite);
var remainingCount = count - numberOfLengthBytesToWrite;
// If this value is > 0, then we have still have more bytes after setting the length so write them to the data buffer
var numberOfDataBytesToWrite = Math.Min(RemainingDataBytesToWrite, remainingCount);
if (numberOfDataBytesToWrite > 0)
_dataBuffer.Write(bytes, numberOfLengthBytesToWrite, numberOfDataBytesToWrite);
return numberOfLengthBytesToWrite + numberOfDataBytesToWrite;
}
private void WriteToLengthBuffer(Byte[] bytes, Int32 count)
{
_lengthBuffer.Write(bytes, 0, count);
if (RemainingLengthBytesToWrite == 0)
{
var length = BitConverter.ToInt32(_lengthBuffer.ToArray(), 0);
DataLength = length;
}
}
}