这是一种基于Parallel.ForEachAsync https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasyncAPI,可从 .NET 6 及更高版本获得。习俗ForEachExclusivePerKeyAsync
下面的方法支持所有选项和功能Parallel.ForEachAsync
过载有一个IAsyncEnumerable<T>
as source
。出现错误或取消时其行为是相同的。唯一的区别是,具有相同键的元素的并发操作被阻止。每个元素的密钥是通过keySelector
功能。对具有相同键的项的处理是串行化的。
/// <summary>
/// Executes a for-each operation on an async-enumerable sequence in which
/// iterations may run concurrently, enforcing a non-concurrent execution policy
/// for elements having the same key.
/// </summary>
public static Task ForEachExclusivePerKeyAsync<TSource, TKey>(
this IAsyncEnumerable<TSource> source,
ParallelOptions parallelOptions,
Func<TSource, CancellationToken, ValueTask> body,
Func<TSource, TKey> keySelector,
IEqualityComparer<TKey> keyComparer = default)
{
ArgumentNullException.ThrowIfNull(keySelector);
// The other arguments are validated by the Parallel.ForEachAsync itself.
Dictionary<TKey, Queue<TSource>> perKey = new(keyComparer);
return Parallel.ForEachAsync(source, parallelOptions, async (item, ct) =>
{
TKey key = keySelector(item);
Queue<TSource> queue;
lock (perKey)
{
// If there is no other task in-flight with the same key,
// insert a null queue as an indicator of activity,
// and start a processing loop for items with this key.
// Otherwise enqueue this item and return.
queue = CollectionsMarshal.GetValueRefOrAddDefault(
perKey, key, out bool exists) ??= (exists ? new() : null);
if (queue is not null)
{
queue.Enqueue(item); return;
}
}
// Fire the task for this item, and for all other items with the
// same key that might be enqueued while this task is in-flight.
while (true)
{
ct.ThrowIfCancellationRequested();
await body(item, ct); // Continue on captured context
lock (perKey)
{
if (queue is null || queue.Count == 0)
{
// Assume that meanwhile no other item was enqueued.
perKey.Remove(key, out queue);
if (queue is null || queue.Count == 0) return;
// The queue is actually not empty, so add it back.
perKey.Add(key, queue);
}
item = queue.Dequeue(); // Grab the next item.
}
}
});
}
使用示例。 AChannel<T> https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channel-1被用作源/控制器 https://stackoverflow.com/questions/61540896/factory-for-iasyncenumerable-or-iasyncenumerator of the IAsyncEnumerable<T>
顺序:
var channel = Channel.CreateUnbounded<Transaction>();
//...
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };
await ForEachExclusivePerKeyAsync(channel.Reader.ReadAllAsync(), options, async (x, _) =>
{
await ProcessTransactionAsync(x);
}, keySelector: x => x.Bank);
//...
channel.Writer.TryWrite(new Transaction() { Bank = "Bank1" });
channel.Writer.TryWrite(new Transaction() { Bank = "Bank2" });
The ForEachExclusivePerKeyAsync
上面的实现使用了CollectionsMarshal.GetValueRefOrAddDefault https://learn.microsoft.com/en-us/dotnet/api/system.runtime.interopservices.collectionsmarshal.getvaluereforadddefault更新方法perKey
字典,以牺牲可读性为代价提高性能。对于性能较低但可读性更强的版本,您可以查看第四次修订 https://stackoverflow.com/revisions/71026983/4这个答案。
对于不依赖于相同方法的版本Parallel.ForEachAsync
API,所以它可以运行在.NET 6之前的版本上,你可以看看第三次修订 https://stackoverflow.com/revisions/71026983/3这个答案。