假设我有一个多个生产者,1个消费者,未绑定Channel,与消费者:
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
}
问题是consume
函数会进行一些 IO 访问,也可能进行一些网络访问,因此在消耗 1 条消息之前,可能会产生更多消息。但是由于IO资源不能并发访问,所以我不能有很多消费者,也不能抛出consume
将函数放入任务中然后忘记它。
The consume
函数是这样的,可以轻松修改它以获取多个消息并批量处理它们。所以我的问题是是否有办法让消费者接受all每当尝试访问通道队列时,都会在通道队列中发送消息,如下所示:
while (true) {
Message[] messages = await channel.Reader.TakeAll();
await consumeAll(messages);
}
编辑:我能想到的 1 个选项是:
List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
Message msg;
while (channel.Reader.TryRead(out msg))
messages.Add(msg);
if (messages.Count > 0)
{
await consumeAll(messages);
messages.Clear();
}
}
但我觉得这应该是一个更好的方法来做到这一点。
看完之后Stephen Toub 的频道入门书 https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/,我尝试编写一个扩展方法来满足您的需要(我已经有一段时间没有使用 C# 了,所以这很有趣)。
public static class ChannelReaderEx
{
public static async IAsyncEnumerable<IEnumerable<T>> ReadBatchesAsync<T>(
this ChannelReader<T> reader,
[EnumeratorCancellation] CancellationToken cancellationToken = default
)
{
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
yield return reader.Flush().ToList();
}
}
public static IEnumerable<T> Flush<T>(this ChannelReader<T> reader)
{
while (reader.TryRead(out T item))
{
yield return item;
}
}
}
可以这样使用:
await foreach (var batch in channel.Reader.ReadBatchesAsync())
{
await ConsumeBatch(batch);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)