消耗 System.Threading.Channels.Channel 中的所有消息

2024-02-16

假设我有一个多个生产者,1个消费者,未绑定Channel,与消费者:

await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
    await consume(message);
}

问题是consume函数会进行一些 IO 访问,也可能进行一些网络访问,因此在消耗 1 条消息之前,可能会产生更多消息。但是由于IO资源不能并发访问,所以我不能有很多消费者,也不能抛出consume将函数放入任务中然后忘记它。

The consume函数是这样的,可以轻松修改它以获取多个消息并批量处理它们。所以我的问题是是否有办法让消费者接受all每当尝试访问通道队列时,都会在通道队列中发送消息,如下所示:

while (true) {
    Message[] messages = await channel.Reader.TakeAll();
    await consumeAll(messages);
}

编辑:我能想到的 1 个选项是:

List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
    await consume(message);
    Message msg;
    while (channel.Reader.TryRead(out msg))
        messages.Add(msg);
    if (messages.Count > 0)
    {
        await consumeAll(messages);
        messages.Clear();
    }
}

但我觉得这应该是一个更好的方法来做到这一点。


看完之后Stephen Toub 的频道入门书 https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/,我尝试编写一个扩展方法来满足您的需要(我已经有一段时间没有使用 C# 了,所以这很有趣)。

public static class ChannelReaderEx
{
    public static async IAsyncEnumerable<IEnumerable<T>> ReadBatchesAsync<T>(
        this ChannelReader<T> reader, 
        [EnumeratorCancellation] CancellationToken cancellationToken = default
    )
    {
        while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
        {
            yield return reader.Flush().ToList();
        }
    }

    public static IEnumerable<T> Flush<T>(this ChannelReader<T> reader)
    {
        while (reader.TryRead(out T item))
        {
            yield return item;
        }
    }
}

可以这样使用:

await foreach (var batch in channel.Reader.ReadBatchesAsync())
{
    await ConsumeBatch(batch);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

消耗 System.Threading.Channels.Channel 中的所有消息 的相关文章

随机推荐