使用 TcpClient 和反应式扩展从 Stream 读取连续字节流

2024-04-26

考虑以下代码:

internal class Program
{
    private static void Main(string[] args)
    {
        var client = new TcpClient();
        client.ConnectAsync("localhost", 7105).Wait();
        var stream = client.GetStream();
        var observable = stream.ReadDataObservable().Repeat();

        var s = from d in observable.Buffer(4)
                let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2))
                let b = observable.Take(headerLength)
                select b.ToEnumerable().ToArray();
        s.Subscribe(a => Console.WriteLine("{0}", a));
        Console.ReadLine();
    }
}

public static class Extensions
{
    public static IObservable<byte> ReadDataObservable(this Stream stream)
    {
        return Observable.Defer(async () =>
        {
            var buffer = new byte[1024];
            var readBytes = await stream.ReadAsync(buffer, 0, buffer.Length);
            return buffer.Take(readBytes).ToObservable();
        });
    }
}

基本上我想解析我通过响应式扩展收到的消息。使用 Buffer(4) 正确解析消息的标头,并获得消息其余部分的长度。出现的问题是,当我执行stream.Take(headerLength)时,代码重新评估整个“链”并尝试从流中获取新消息,而不是返回已从流中读取的其余字节。更准确地说,第一个 ReadAsync(...) 返回 38 个字节,Buffer(4) 返回其中的前 4 个字节,observable.Take(headerLength) 不返回剩余的 34 个字节,而是尝试读取新的使用 ReadAsync 发送消息。

问题是,如何确保 observable.Take(headerLength) 接收已读取的 34 个字节,而不是尝试从流中读取新消息?我已经四处寻找解决方案,但我无法真正弄清楚如何实现这一目标。

编辑:这个解决方案(使用反应式扩展(Rx)进行套接字编程实用吗? https://stackoverflow.com/questions/3118289/using-reactive-extensions-rx-for-socket-programming-practical?rq=1)不是我要找的。这并不是读取流中可用的所有内容(最多达到缓冲区大小)并从中生成连续的字节流。对我来说,这个解决方案似乎不是一种非常有效的从流中读取数据的方法,因此我提出了问题。


这种方法是行不通的。问题在于您使用可观察量的方式。Buffer不会读取 4 个字节并退出,而是会持续读取 4 个字节块。这Take形成将读取重叠字节的第二个订阅。您会发现将流直接解析为消息要容易得多。

下面的代码也做了很多努力来正确清理。

假设你的Message就是这样,(ToString添加用于测试):

public class Message
{
    public byte[] PayLoad;

    public override string ToString()
    {
        return Encoding.UTF8.GetString(PayLoad);
    }
}

并且您已经获得了Stream那么你可以按如下方式解析它。首先,从流中读取确切字节数的方法:

public async static Task ReadExactBytesAsync(
    Stream stream, byte[] buffer, CancellationToken ct)
{
    var count = buffer.Length;
    var totalBytesRemaining = count;
    var totalBytesRead = 0;
    while (totalBytesRemaining != 0)
    {
        var bytesRead = await stream.ReadAsync(
            buffer, totalBytesRead, totalBytesRemaining, ct);
        ct.ThrowIfCancellationRequested();
        totalBytesRead += bytesRead;
        totalBytesRemaining -= bytesRead;
    }
}

然后将流转换为IObservable<Message>:

public static IObservable<Message> ReadMessages(
    Stream sourceStream,
    IScheduler scheduler = null)
{
    int subscribed = 0;
    scheduler = scheduler ?? Scheduler.Default;

    return Observable.Create<Message>(o =>
    {
        // first check there is only one subscriber
        // (multiple stream readers would cause havoc)
        int previous = Interlocked.CompareExchange(ref subscribed, 1, 0);

        if (previous != 0)
            o.OnError(new Exception(
                "Only one subscriber is allowed for each stream."));

        // we will return a disposable that cleans
        // up both the scheduled task below and
        // the source stream
        var dispose = new CompositeDisposable
        {
            Disposable.Create(sourceStream.Dispose)
        };

        // use async scheduling to get nice imperative code
        var schedule = scheduler.ScheduleAsync(async (ctrl, ct) =>
        {
            // store the header here each time
            var header = new byte[4];

            // loop until cancellation requested
            while (!ct.IsCancellationRequested)
            {                        
                try
                {
                    // read the exact number of bytes for a header
                    await ReadExactBytesAsync(sourceStream, header, ct);
                }
                catch (OperationCanceledException)
                {
                    throw;
                }
                catch (Exception ex)
                {
                    // pass through any problem in the stream and quit
                    o.OnError(new InvalidDataException("Error in stream.", ex));
                    return;
                }                   
                ct.ThrowIfCancellationRequested();

                var bodyLength = IPAddress.NetworkToHostOrder(
                    BitConverter.ToInt16(header, 2));
                // create buffer to read the message
                var payload = new byte[bodyLength];

                // read exact bytes as before
                try
                {
                    await ReadExactBytesAsync(sourceStream, payload, ct);
                }
                catch (OperationCanceledException)
                {
                    throw;
                }
                catch (Exception ex)
                {
                    o.OnError(new InvalidDataException("Error in stream.", ex));
                    return;
                }

                // create a new message and send it to client
                var message = new Message { PayLoad = payload };
                o.OnNext(message);
            }
            // wrap things up
            ct.ThrowIfCancellationRequested();
            o.OnCompleted();
        });

        // return the suscription handle
        dispose.Add(schedule);
        return dispose;
    });
}

编辑-我使用的非常hacky测试代码:

private static void Main(string[] args)
{
    var listener = new TcpListener(IPAddress.Any, 12873);
    listener.Start();

    var listenTask = listener.AcceptTcpClientAsync();
    listenTask.ContinueWith((Task<TcpClient> t) =>
    {
        var client = t.Result;
        var stream = client.GetStream();
        const string messageText = "Hello World!";                
        var body = Encoding.UTF8.GetBytes(messageText);                
        var header = BitConverter.GetBytes(
            IPAddress.HostToNetworkOrder(body.Length));
        for (int i = 0; i < 5; i++)
        {
            stream.Write(header, 0, 4);
            stream.Write(body, 0, 4);
            stream.Flush();
            // deliberate nasty delay
            Thread.Sleep(2000);
            stream.Write(body, 4, body.Length - 4);
            stream.Flush();
        }
        stream.Close();
        listener.Stop();
    });


    var tcpClient = new TcpClient();
    tcpClient.Connect(new IPEndPoint(IPAddress.Loopback, 12873));
    var clientStream = tcpClient.GetStream();

    ReadMessages(clientStream).Subscribe(
        Console.WriteLine,
        ex => Console.WriteLine("Error: " + ex.Message),
        () => Console.WriteLine("Done!"));

    Console.ReadLine();
}

包起来

您需要考虑设置读取超时,以防服务器死机,并且服务器应该发送某种“结束消息”。目前,此方法只会不断尝试接收字节。正如您没有指定的那样,我没有包含类似的内容 - 但如果您这样做了,那么正如我所写的那样break退出 while 循环会导致OnCompleted将被寄出。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 TcpClient 和反应式扩展从 Stream 读取连续字节流 的相关文章

随机推荐