我正在学习 C# 中的异步/等待模式。目前我正在尝试解决这样的问题:
有一个生产者(硬件设备)每秒生成 1000 个数据包。我需要将这些数据记录到文件中。
该设备只有一个ReadAsync()
一次报告单个数据包的方法。
我需要缓冲数据包并按照它们生成到文件的顺序将它们写入文件,每秒一次。
如果在下一批数据包准备写入时写入过程未及时完成,则写入操作将失败。
到目前为止,我已经写了如下内容。它有效,但我不确定这是否是解决问题的最佳方法。有什么意见或建议吗?解决此类生产者/消费者问题(消费者需要聚合从生产者接收的数据)的最佳实践是什么?
static async Task TestLogger(Device device, int seconds)
{
const int bufLength = 1000;
bool firstIteration = true;
Task writerTask = null;
using (var writer = new StreamWriter("test.log")))
{
do
{
var buffer = new byte[bufLength][];
for (int i = 0; i < bufLength; i++)
{
buffer[i] = await device.ReadAsync();
}
if (!firstIteration)
{
if (!writerTask.IsCompleted)
throw new Exception("Write Time Out!");
}
writerTask = Task.Run(() =>
{
foreach (var b in buffer)
writer.WriteLine(ToHexString(b));
});
firstIteration = false;
} while (--seconds > 0);
}
}
您可以使用以下想法,前提是刷新的标准是数据包的数量(最多 1000 个)。我没有测试它。它利用了斯蒂芬·克利里的AsyncProducerConsumerQueue<T> http://nitoasyncex.codeplex.com/wikipage?title=AsyncProducerConsumerQueue精选于这个问题 https://stackoverflow.com/questions/21225361/is-there-anything-like-asynchronous-blockingcollectiont.
AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;
// producer
async Task ReceiveAsync(CancellationToken token)
{
while (true)
{
var list = new List<byte>();
while (true)
{
token.ThrowIfCancellationRequested(token);
var packet = await _device.ReadAsync(token);
list.Add(packet);
if (list.Count == 1000)
break;
}
// push next batch
await _queue.EnqueueAsync(list.ToArray(), token);
}
}
// consumer
async Task LogAsync(CancellationToken token)
{
Task previousFlush = Task.FromResult(0);
CancellationTokenSource cts = null;
while (true)
{
token.ThrowIfCancellationRequested(token);
// get next batch
var nextBatch = await _queue.DequeueAsync(token);
if (!previousFlush.IsCompleted)
{
cts.Cancel(); // cancel the previous flush if not ready
throw new Exception("failed to flush on time.");
}
await previousFlush; // it's completed, observe for any errors
// start flushing
cts = CancellationTokenSource.CreateLinkedTokenSource(token);
previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
}
}
如果您不想让记录器失败,而是希望取消刷新并继续进行下一批,则可以通过对此代码进行最小的更改来实现。
回复 @l3arnon 评论:
- 数据包不是字节,而是字节[]。 2. 你没有使用OP的ToHexString。 3. AsyncProducerConsumerQueue 的健壮性要差得多
比 .Net 的 TPL 数据流进行了测试。 4.等待 previousFlush 错误
就在抛出异常之后,这使得该行变得多余。
等等。简而言之:我认为可能的附加值并不能证明这一点
非常复杂的解决方案。
- “数据包不是字节,它是字节[]” - 数据包is一个字节,从OP的代码中可以明显看出:
buffer[i] = await device.ReadAsync()
。然后,一批数据包byte[]
.
-
“你还没有使用OP的ToHexString。” - 目标是展示如何使用Stream.WriteAsync
which natively接受取消令牌,而不是WriteLineAsync
这不允许取消。使用起来很简单ToHexString
with Stream.WriteAsync
并仍然利用取消支持:
var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) +
Environment.NewLine);
_stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
“AsyncProducerConsumerQueue 远不如 .Net 的 TPL 数据流健壮且经过测试” - 我不认为这是一个确定的事实。但是,如果OP担心的话,他可以使用常规的BlockingCollection
,这不会阻塞生产者线程。在等待下一批时阻塞消费者线程是可以的,因为写入是并行完成的。与此相反,您的 TPL Dataflow 版本带有一个多余的CPU 和锁密集型操作:将数据从生产者管道移动到写入者管道logAction.Post(packet)
,逐字节。我的代码没有这样做。
“在抛出异常后,您会等待 previousFlush 错误,这使得该行变得多余。” - 这条线不是多余的。也许,你忽略了这一点:previousFlush.IsCompleted
can be true
when previousFlush.IsFaulted
or previousFlush.IsCancelled
也是true
. So, await previousFlush
在那里观察任何错误是相关的完全的任务(例如,写入失败),否则将会丢失。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)