按组并行但在每个组内串行动态处理并发集合

2024-01-08

我遇到了一个可以轻松定义的问题,但我似乎无法消化 MSDN 以获得最佳解决方案。我已经有一段时间没有真正考虑 UI 响应能力之外的并行处理了。

可以说,我有一个需要处理的并发任务集合。例如,它可能正在按类型(Consumer1、Consumer2、Consumer3...Consumer[N])将数据加载到各种消费者,发送数据的底层任务对于每个任务都是相同的,但每个消费者一次只能接受一个源

基本上,我希望尽可能并行处理,但需要注意的是,我一次只能向每个消费者发送 1 个任务。因此,如果消费者的当前作业已经在进行中,那么我应该移至集合中的下一项,并将其保留到该消费者正在进行的作业完成时。并发集合也可以随时从外部添加,如果我们有新类型,我们就需要额外的线程。

我想我的问题归结为如何从集合中自定义“Take”,以便我只获取下一个任务,其属性指定它有一个尚未有正在进行的作业的消费者。

关于我在这里缺少什么或者我是否走在正确的道路上有什么想法吗?

例如,我们有一个中介队列,其中包含与银行交易相关的任务。

因此,我们可能会添加到中介队列中(假设发送 SummaryData 和发送 TransactionData 使用相同的接口合约来发送数据)

  1. 发送交易数据 -> 银行 1
  2. 发送交易数据 -> 银行 2
  3. SendSummaryData -> 仲裁器
  4. 发送交易数据 -> 银行 1
  5. 发送交易数据 -> Bank3
  6. 发送交易数据 -> 银行 1
  7. 发送交易数据 -> 银行 2

1,2,3,5 可以并行处理,但由于各自的系统,每个消费者一次只能接受一个输入,事务 4 必须等待事务 1 完成,事务 6 必须等待事务 4 完成过程。同样,事务 7 必须等待事务 2。

在任何初始过程完成之前,有人可能会添加另一个分组。

  1. SendSummaryData -> 仲裁器

  2. 发送交易数据 -> 银行 1

  3. 发送交易数据 -> Bank4

如果线程可用,则可以立即拾取 10,但 8 和 9 必须在其他相关任务后面排队。

显然,会有更好的方法来设计一个系统来实现这一目标,但这些本质上是我想要满足的规格。


这是一种基于Parallel.ForEachAsync https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasyncAPI,可从 .NET 6 及更高版本获得。习俗ForEachExclusivePerKeyAsync下面的方法支持所有选项和功能Parallel.ForEachAsync过载有一个IAsyncEnumerable<T> as source。出现错误或取消时其行为是相同的。唯一的区别是,具有相同键的元素的并发操作被阻止。每个元素的密钥是通过keySelector功能。对具有相同键的项的处理是串行化的。

/// <summary>
/// Executes a for-each operation on an async-enumerable sequence in which
/// iterations may run concurrently, enforcing a non-concurrent execution policy
/// for elements having the same key.
/// </summary>
public static Task ForEachExclusivePerKeyAsync<TSource, TKey>(
    this IAsyncEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask> body,
    Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = default)
{
    ArgumentNullException.ThrowIfNull(keySelector);
    // The other arguments are validated by the Parallel.ForEachAsync itself.
    Dictionary<TKey, Queue<TSource>> perKey = new(keyComparer);
    return Parallel.ForEachAsync(source, parallelOptions, async (item, ct) =>
    {
        TKey key = keySelector(item);
        Queue<TSource> queue;
        lock (perKey)
        {
            // If there is no other task in-flight with the same key,
            // insert a null queue as an indicator of activity,
            // and start a processing loop for items with this key.
            // Otherwise enqueue this item and return.
            queue = CollectionsMarshal.GetValueRefOrAddDefault(
                perKey, key, out bool exists) ??= (exists ? new() : null);
            if (queue is not null)
            {
                queue.Enqueue(item); return;
            }
        }

        // Fire the task for this item, and for all other items with the
        // same key that might be enqueued while this task is in-flight.
        while (true)
        {
            ct.ThrowIfCancellationRequested();
            await body(item, ct); // Continue on captured context
            lock (perKey)
            {
                if (queue is null || queue.Count == 0)
                {
                    // Assume that meanwhile no other item was enqueued.
                    perKey.Remove(key, out queue);
                    if (queue is null || queue.Count == 0) return;
                    // The queue is actually not empty, so add it back.
                    perKey.Add(key, queue);
                }
                item = queue.Dequeue(); // Grab the next item.
            }
        }
    });
}

使用示例。 AChannel<T> https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channel-1被用作源/控制器 https://stackoverflow.com/questions/61540896/factory-for-iasyncenumerable-or-iasyncenumerator of the IAsyncEnumerable<T>顺序:

var channel = Channel.CreateUnbounded<Transaction>();
//...
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };
await ForEachExclusivePerKeyAsync(channel.Reader.ReadAllAsync(), options, async (x, _) =>
{
    await ProcessTransactionAsync(x);
}, keySelector: x => x.Bank);
//...
channel.Writer.TryWrite(new Transaction() { Bank = "Bank1" });
channel.Writer.TryWrite(new Transaction() { Bank = "Bank2" });

The ForEachExclusivePerKeyAsync上面的实现使用了CollectionsMarshal.GetValueRefOrAddDefault https://learn.microsoft.com/en-us/dotnet/api/system.runtime.interopservices.collectionsmarshal.getvaluereforadddefault更新方法perKey字典,以牺牲可读性为代价提高性能。对于性能较低但可读性更强的版本,您可以查看第四次修订 https://stackoverflow.com/revisions/71026983/4这个答案。

对于不依赖于相同方法的版本Parallel.ForEachAsyncAPI,所以它可以运行在.NET 6之前的版本上,你可以看看第三次修订 https://stackoverflow.com/revisions/71026983/3这个答案。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

按组并行但在每个组内串行动态处理并发集合 的相关文章

随机推荐