限制异步任务

2024-03-31

我想运行一堆异步任务,并限制在任何给定时间可以等待完成的任务数量。

假设您有 1000 个 URL,并且您只想一次打开 50 个请求;但是,一旦一个请求完成,您就会打开与列表中下一个 URL 的连接。这样,每次始终打开 50 个连接,直到 URL 列表耗尽为止。

如果可能的话,我还想利用给定数量的线程。

我想出了一个扩展方法,ThrottleTasksAsync这就是我想要的。是否已经有更简单的解决方案?我认为这是一个常见的情况。

Usage:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

这是代码:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

该方法利用BlockingCollection and SemaphoreSlim使其发挥作用。节流器在一个线程上运行,所有异步任务在另一线程上运行。为了实现并行性,我添加了一个 maxDegreeOfParallelism 参数,该参数传递给Parallel.ForEach循环重新用作while loop.

旧版本是:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

但是,线程池很快就会耗尽,你不能这样做async/await.

Bonus:为了解决这个问题BlockingCollection抛出异常的地方Take() when CompleteAdding()被称为,我正在使用TryTake超时过载。如果我没有使用超时TryTake,这会违背使用的目的BlockingCollection since TryTake不会阻止。有没有更好的办法?理想情况下,会有一个TakeAsync method.


按照建议,使用TPL数据流 https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library.

A TransformBlock<TInput, TOutput> https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.transformblock-2可能就是您正在寻找的。

你定义一个MaxDegreeOfParallelism限制可以并行转换的字符串数量(即可以下载多少个 url)。然后,您将 URL 发布到该块,完成后,您告诉该块您已完成添加项目并获取响应。

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

注:TransformBlock缓冲其输入和输出。那么,为什么我们需要将其链接到BufferBlock?

因为TransformBlock直到所有项目(HttpResponse)已被消耗,并且await downloader.Completion会挂起。相反,我们让downloader将其所有输出转发到专用缓冲区块 - 然后我们等待downloader完成并检查缓冲区块。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

限制异步任务 的相关文章

  • 模板类包装任意类型/非类型模板类

    假设我有一个模板类base和一个班级wrapper其中包含一个实例化成员base 我想定义班级wrapper这样它依赖于模板参数包 该参数包只是 传递 给实例化成员base 例如 考虑下面的代码 它工作得很好 include
  • 从 SQL 数据库获取日期时间

    我的数据库表中有一个 DateTime 记录 我编写一个查询从数据库中获取它 string command2 select Last Modified from Company Data where Company Name Descrip
  • 如何自定义 DataTable 列的排序

    我需要对数据表列的值进行排序 该列包含字符串 整数或混合文本 例如 数据表列包含如下值 23 18 12 store 23 store a1 1283 25 如果我使用对值进行排序Dataview sort 方法会按此顺序产生 12 128
  • 将 2D 数组映射到 1D 数组

    我想用一维数组来表示一个二维数组 函数将传递两个索引 x y 和要存储的值 这两个索引代表一维数组的单个元素 并相应地设置它 我知道一维数组需要具有 arrayWidth arrayHeight 的大小 但我不知道如何设置每个元素 例如 如
  • 测试 hdf5/c++ 中的组是否存在

    我正在打开一个现有的 HDF5 文件来附加数据 我想向那个叫做的小组保证 A存在以供后续访问 我正在寻找一种简单的方法来创建 A有条件地 如果不存在则创建并返回新组 或者返回现有组 一种方法是测试 A存在 我怎样才能高效地做到这一点 根据
  • 如何在 C# 事件中区分更改是由代码还是由用户进行?

    我有一个简单的TextBox一开始是空的 我有一个简单的事件 TextChanged 可以知道用户何时更改了其中的任何内容TextBox 但是 如果我自己在代码中对其执行任何操作 该事件就会触发 喜欢设置textbox Text Test
  • 使用 VSTO 更改 Outlook 设置

    我刚刚花了大约 4 个小时试图弄清楚如何以编程方式检索 设置 Microsoft Outlook 2010 的 Outlook 设置 我所说的 设置 是指文件 选项 邮件下的设置 我想做的是检索用户设置的设置列表 自动化我们每天需要在某些消
  • Qt中正确的线程方式

    我的图像加载非常耗时 图像很大 并且在加载时也完成了一些操作 我不想阻止应用程序 GUI 我的想法是在另一个线程中加载图像 发出图像已加载的信号 然后用该图像重绘视图 我的做法 void Window loadImage ImageLoad
  • 控制台应用程序 .net Core 2.0 的配置

    在 net Core 1 中我们可以这样做 IConfiguration config new ConfigurationBuilder AddJsonFile appsettings json true true Build 这样就可以使
  • 防止复制构造和返回值引用的分配

    如果我有一个函数返回对类实例的引用 但我无法控制其源 比如说list
  • 推送 Lua 表

    我已经创建了一个Lua表C 但我不知道如何将该表推入堆栈顶部 以便我可以将其传递给 Lua 函数 有谁知道如何做到这一点 这是我当前的代码 lua createtable state libraries size 0 int table i
  • for 循环 - 没有效果的语句

    由于某种原因 我收到错误 statement with no effect关于这个声明 for j idx j lt iter j increment printf from loop idx i int idx punc ctxt j 你
  • 使用 AutoMapper 进行 LINQ GroupBy 聚合

    试图让查询工作 但老实说不确定如何 或者是否可能 进行它 因为我尝试过的一切都不起作用 共查询6个表 Person PersonVote PersonCategory Category City FirstAdminDivision Per
  • 为什么 C 函数不能返回数组类型?

    我是 C 语言新手 想知道 为什么 C 函数不能返回数组类型 我知道数组名是数组第一个值的地址 而数组是 C 中的二等公民 您自己已经回答了这个问题 数组是二等公民 C 按值返回 数组不能按值传递 因此不能返回它们 至于为什么数组不能按值传
  • 如何使用 Clang 查找内存泄漏

    我在我的机器 ubuntu 中安装了 Clang 以便发现我的 C 代码中的内存泄漏 我编写了一个示例代码来检查它的工作情况 如下所示 File hello c for leak detection include
  • 编写专门用于类及其子类的函数模板

    我正在尝试编写一个函数模板 一个版本应该用于不满足另一版本标准的所有类型 当参数是给定类的基类或该类本身时 应使用另一个版本 我尝试过超载Base 但是当类派生自Base 他们使用通用的 而不是特定的 我也尝试过这种 SFINAE 方法 s
  • 通过 MSBuild 调用 cl.exe 时无限期挂起

    我正在尝试在我的 主要是 C 项目上运行 MSBuild 想象一下一个非常庞大的代码库 Visual Studio 2015 是有问题的工具集 Windows 7 SP1 和 VS 2015 更新 2 即使使用 m 1 从而迫使它仅使用一个
  • 在 MVVM 中,可以在视图后面的代码中访问 ViewModel 吗?

    在 MVVM 模式中 是否可以接受甚至可以访问视图代码后面的 ViewModel 属性 我有一个可观察的集合 它填充在 ViewModel 中 我需要在视图中使用它来绑定到带有链接列表的无限滚动条 IE private LinkedList
  • 有没有办法让 VS2010 在我的方法中扩展或收缩 try 块?

    我的代码有很多 try catch finally 块 与我在 VS2010 中的方法不同 除了添加区域之外 我无法在开发时扩展或收缩这些区域来隐藏内容 try vm R vm Qu vm T vm D vm Fil vm Type vm
  • C++0x 中的新 unicode 字符

    我正在构建一个 API 它允许我获取各种编码的字符串 包括 utf8 utf16 utf32 和 wchar t 根据操作系统 可能是 utf32 或 utf16 新的 C 标准引入了新类型char16 t and char32 t没有这么

随机推荐

  • 向量(插入然后排序)或集合哪种方法更快?

    我有数字序列 未排序 无重复 目标是对它们进行排序 方法一 插入向量 O n 使用排序算法并排序 O nlogn 方法2 插入集合 o nlogn 哪种方法会更快 我觉得设置会更快 因为向量中的每个插入都必须分配 完整的数组元素并复制它然后
  • 通过从客户区的一部分拖动无框窗口来移动它

    正如标题所示 我想仅当用户将窗口从客户区域的一部分拖动时才移动窗口 这将是对正常标题栏移动的模仿 这是因为我的表单是自定义的 并且没有任何标题或标题栏 目前 我使用的代码如下 case WM NCHITTEST return HTCAPTI
  • 你能用 Java 获得基本的 GC 统计数据吗?

    我想让一些长时间运行的服务器应用程序定期输出 Java 中的一般 GC 性能数据 例如 Runtime freeMemory 的 GC 等价物 例如完成的周期数 平均时间等 我们的系统在客户机器上运行 怀疑配置错误的内存池导致了过多的 GC
  • 仅旋转特定 Excel 行中的文本

    我想使用以下命令旋转 Excel 文件中的标题Microsoft Office Interop 为了实现这一目标 我使用以下代码 worksheet Range A1 worksheet UsedRange Columns Count 1
  • HTTP 标头中什么被视为空白

    我刚刚阅读了 HTTP 标准 拟议标准更准确地说 第 1 部分 并对第 3 节倒数第二段中他们认为的 空白 感到困惑 https www rfc editor org rfc rfc7230 section 3 https www rfc
  • 在 Visual Studio 2008 for .NET CF 中处理不同分辨率

    我正在开发一个基于 NET CF 的图形应用程序 我的项目涉及大量绘图图像 我们决定在不同的手机分辨率上移植该应用程序 240 X 240 480 X 640 等 我将如何在单个解决方案 项目中实现这一目标 是否需要根据决议创建不同的项目
  • R 错误:java.lang.OutOfMemoryError:Java 堆空间

    我正在尝试将 R 连接到 Teradata 以将数据直接提取到 R 中进行分析 但是 我收到错误 Error in jcall rp I fetch stride block java lang OutOfMemoryError Java
  • SIP:错误数据连接丢失

    我已经在 android 中使用本机 sip 创建了 sip 应用程序 在其中 我在从 sip 服务器取消注册帐户时遇到问题 每次我得到数据连接丢失我也在android文档中看到 但没有对此错误的简短解释 而且它在注册时面临各种错误 如in
  • AutoMapper 排除字段

    我正在尝试将一个对象映射到另一个对象 但该对象非常复杂 在开发过程中 我希望能够排除一堆字段并逐一访问它们 或者能够指定仅映射我想要的字段 并在每次测试成功时增加字段 So class string field1 string field2
  • 使用 apache 2.4 设置 git-http-backend

    我试图使用 git http backend 和 apache 2 4 设置一个 git 服务器 我发现这个问题 https stackoverflow com questions 26734933 how to set up git ov
  • 如何制作多表头

    I am trying to make a table with 2 headers merged At the moment i made 2 seperate tables with 2 seperate headers and it
  • 用 ssh 替换 telnet

    我有一些程序使用 Net Telnet 模块连接到多个服务器 现在管理员决定将 Telnet 服务替换为 SSH 保留其他所有内容 例如用户帐户 我查看了 Net SSH2 发现我必须更改大部分程序 您是否知道其他更适合相同替代品的 SSH
  • 将 FrameworkElement 及其 DataContext 保存到图像文件未成功

    我有一个名为 UserControl1 的简单 UserControl 其中包含一个 TextBlock
  • 使用正则表达式从字符串中删除日期

    好的 我有一个字符串 title string 它可能类似于以下任何一个 title string 20 08 12 First Test Event title string First Test event 20 08 12 title
  • 为什么 Python 中字典中的项目顺序会改变? [复制]

    这个问题在这里已经有答案了 我正在尝试从一些教程中学习Python 这是我遇到的一个让我困惑的简单例子 gt gt gt d server mpilgrim database master uid sa pwd secret gt gt g
  • Web Worker 和 Canvas 数据

    我看过很多关于网络工作者的帖子
  • 带按钮控件的 DataGridView - 删除行

    我想要在每行的末尾有一个删除按钮DataGridView通过单击我想从绑定列表中删除所需的行 该绑定列表是我的网格的数据源 但我似乎无法做到这一点 我在产品类中创建了一个按钮对象 并使用唯一的 id 实例化它以从列表中删除该对象 但按钮未显
  • 如何桥接 JavaScript(参差不齐)数组和 std::vector> 对象?

    在 JavaScript 中 我有一个 线 列表 每条线都由不定数量的 点 组成 每个点都有以下形式 x y 所以它是一个 3D 参差不齐的数组 现在我需要在 emscripten 的帮助下将它传递给我的 C 代码 embind https
  • Nuget Push 总是返回 404(未找到)

    我尝试将 nuget 包发布到我的 GitHub Packages 帐户 但在所有情况下我都会遇到 404 错误 我已按照 GitHub 网站上的要求进行操作 nuget source Add Name GitHub Source http
  • 限制异步任务

    我想运行一堆异步任务 并限制在任何给定时间可以等待完成的任务数量 假设您有 1000 个 URL 并且您只想一次打开 50 个请求 但是 一旦一个请求完成 您就会打开与列表中下一个 URL 的连接 这样 每次始终打开 50 个连接 直到 U