如何实现高效的 WhenEach 来传输 IAsyncEnumerable 任务结果?

2024-02-07

我正在尝试使用以下提供的新工具更新我的工具集C# 8 https://learn.microsoft.com/en-us/dotnet/csharp/whats-new/csharp-8,一种似乎特别有用的方法是Task.WhenAll https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.whenall返回一个IAsyncEnumerable https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerable-1。此方法应在任务结果可用时立即对其进行流式传输,因此将其命名为WhenAll没有多大意义。WhenEach听起来更合适。该方法的签名是:

public static IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks);

这个方法可以这样使用:

var tasks = new Task<int>[]
{
    ProcessAsync(1, 300),
    ProcessAsync(2, 500),
    ProcessAsync(3, 400),
    ProcessAsync(4, 200),
    ProcessAsync(5, 100),
};

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

static async Task<int> ProcessAsync(int result, int delay)
{
    await Task.Delay(delay);
    return result;
}

预期输出:

已处理:5
已处理:4
已处理:1
已处理:3
已处理:2

我设法使用该方法编写了一个基本实现Task.WhenAny https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.whenany循环中,但是这种方法有一个问题:

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
{
    var hashSet = new HashSet<Task<TResult>>(tasks);
    while (hashSet.Count > 0)
    {
        var task = await Task.WhenAny(hashSet).ConfigureAwait(false);
        yield return await task.ConfigureAwait(false);
        hashSet.Remove(task);
    }
}

问题是性能。这Task.WhenAny方法必须监视所有提供的任务的完成,并且它通过附加和分离延续来实现,因此在循环中重复调用它会导致 O(n²) 计算复杂度。我的幼稚实现很难处理 10,000 个任务。在我的机器上,开销将近 10 秒。我希望该方法的性能几乎与内置方法一样高Task.WhenAll,可以轻松处理数十万个任务。我怎样才能改善WhenEach使其正常运行的方法?


通过使用代码this https://devblogs.microsoft.com/pfxteam/processing-tasks-as-they-complete/文章中,您可以执行以下操作:

public static Task<Task<T>>[] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
   var inputTasks = tasks.ToList();

   var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
   var results = new Task<Task<T>>[buckets.Length];
   for (int i = 0; i < buckets.Length; i++)
   {
       buckets[i] = new TaskCompletionSource<Task<T>>();
       results[i] = buckets[i].Task;
   }

   int nextTaskIndex = -1;
   Action<Task<T>> continuation = completed =>
   {
       var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
       bucket.TrySetResult(completed);
   };

   foreach (var inputTask in inputTasks)
       inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

   return results;
}

然后改变你的WhenEach打电话给Interleaved code

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)
{
    foreach (var bucket in Interleaved(tasks))
    {
        var t = await bucket;
        yield return await t;
    }
}

然后你可以打电话给你的WhenEach像往常一样

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

我对 10k 任务做了一些基本的基准测试,速度提高了 5 倍。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何实现高效的 WhenEach 来传输 IAsyncEnumerable 任务结果? 的相关文章

  • 在C语言中使用“void”

    我很困惑为什么我们需要通过void转换为 C 函数 int f void return 0 versus int f return 0 什么是正确的做法以及为什么 In C int f 是一种老式的声明 它说f需要固定但未指定数量和类型的参
  • 查找哪些页面不再与写入时复制共享

    假设我在 Linux 中有一个进程 我从中fork 另一个相同的过程 后forking 因为原始进程将开始写入内存 Linux写时复制机制将为进程提供与分叉进程使用的不同的唯一物理内存页 在执行的某个时刻 我如何知道原始进程的哪些页面已被写
  • 在搜索 List 时,为什么 Enumerable.Any(Func predicate) 比带有 if 语句的 foreach 慢

    最近有件事引起了我的好奇心 Why is the Enumerable Any Func
  • 为什么我不能用 `= delete;` 声明纯虚函数?

    Intro 纯虚函数使用通用语法声明 virtual f 0 然而 自 c 11 以来 有一种方法可以显式地传达non existence 特殊 成员函数的 Mystruct delete eg default constructor Q
  • 以编程方式检查页面是否需要基于 web.config 设置进行身份验证

    我想知道是否有一种方法可以检查页面是否需要基于 web config 设置进行身份验证 基本上如果有这样的节点
  • 显示异常时的自定义错误消息:从客户端检测到潜在危险的 Request.Form 值

    我在我的 Web 应用程序中使用 ASP NET 的登录控件 当发生此异常时 我想在标签上显示一种有趣的错误类型System Web HttpRequestValidationException A potentially dangerou
  • 如何使用recv()检测客户端是否仍然连接(并且没有挂起)?

    我写了一个多客户端服务器程序C on SuSE Linux 企业服务器 12 3 x86 64 我为每个客户端使用一个线程来接收数据 我的问题是 我使用一个终端来运行服务器 并使用其他几个终端来运行服务器telnet到我的服务器 作为客户端
  • C++ 异步线程同时运行

    我是 C 11 中线程的新手 我有两个线程 我想让它们同时启动 我可以想到两种方法 如下 然而 似乎它们都没有按照我的预期工作 他们在启动另一个线程之前启动一个线程 任何提示将不胜感激 另一个问题是我正在研究线程队列 所以我会有两个消费者和
  • 暂停下载线程

    我正在用 C 编写一个非常简单的批量下载程序 该程序读取要下载的 URL 的 txt 文件 我已经设置了一个全局线程和委托来更新 GUI 按下 开始 按钮即可创建并启动该线程 我想要做的是有一个 暂停 按钮 使我能够暂停下载 直到点击 恢复
  • Azure 事件中心 - 按顺序接收事件

    我使用下面的代码从 Azure Event Hub 接收事件 https learn microsoft com en us azure event hubs event hubs dotnet framework getstarted s
  • 如何重置捕获像素的值

    我正在尝试创建一个 C 函数 该函数返回屏幕截图位图中每四个像素的 R G 和 B 值 这是我的代码的一部分 for int ix 4 ix lt 1366 ix ix 4 x x 4 for int iy 3 iy lt 768 iy i
  • C# 中条件编译符号的编译时检查(参见示例)?

    在 C C 中你可以这样做 define IN USE 1 define NOT IN USE 1 define USING system 1 system 1 IN USE 进而 define MY SYSTEM IN USE if US
  • 如何在c#中的内部类中访问外部类的变量[重复]

    这个问题在这里已经有答案了 我有两个类 我需要声明两个类共有的变量 如果是嵌套类 我需要访问内部类中的外部类变量 请给我一个更好的方法来在 C 中做到这一点 示例代码 Class A int a Class B Need to access
  • 如何挤出平面 2D 网格并赋予其深度

    我有一组共面 连接的三角形 即二维网格 现在我需要将其在 z 轴上挤出几个单位 网格由一组顶点定义 渲染器通过与三角形数组匹配来理解这些顶点 网格示例 顶点 0 0 0 10 0 0 10 10 0 0 10 0 所以这里我们有一个二维正方
  • 如何一步步遍历目录树?

    我发现了很多关于遍历目录树的示例 但我需要一些不同的东西 我需要一个带有某种方法的类 每次调用都会从目录返回一个文件 并逐渐遍历目录树 请问我该怎么做 我正在使用函数 FindFirstFile FindNextFile 和 FindClo
  • 是否可以有一个 out ParameterExpression?

    我想定义一个 Lambda 表达式out范围 有可能做到吗 下面是我尝试过的 C Net 4 0 控制台应用程序的代码片段 正如您在 procedure25 中看到的 我可以使用 lambda 表达式来定义具有输出参数的委托 但是 当我想使
  • 转到定义:“无法导航到插入符号下的符号。”

    这个问题的答案是社区努力 help privileges edit community wiki 编辑现有答案以改进这篇文章 目前不接受新的答案或互动 我今天突然开始在我的项目中遇到一个问题 单击 转到定义 会出现一个奇怪的错误 无法导航到
  • WinRT 定时注销

    我正在开发一个 WinRT 应用程序 要求之一是应用程序应具有 定时注销 功能 这意味着在任何屏幕上 如果应用程序空闲了 10 分钟 应用程序应该注销并导航回主屏幕 显然 执行此操作的强力方法是在每个页面的每个网格上连接指针按下事件 并在触
  • Googletest:如何异步运行测试?

    考虑到一个包含数千个测试的大型项目 其中一些测试需要几分钟才能完成 如果按顺序执行 整套测试需要一个多小时才能完成 通过并行执行测试可以减少测试时间 据我所知 没有办法直接从 googletest mock 做到这一点 就像 async选项
  • 是否可以在 C# 中强制接口实现为虚拟?

    我今天遇到了一个问题 试图重写尚未声明为虚拟的接口方法的实现 在这种情况下 我无法更改接口或基本实现 而必须尝试其他方法 但我想知道是否有一种方法可以强制类使用虚拟方法实现接口 Example interface IBuilder

随机推荐

  • 如何将 URL 中的图像附加到 FormData - Javascript

    这是我的小 JavaScript 代码
  • Caffe:如何通过代码获取`solver.prototxt`参数?

    我想访问solver prototxt参数如base lr 基础学习率 或weight decay来自Python代码 有什么方法可以从solver net目的 谢谢 根据本教程 http nbviewer jupyter org gith
  • iOS 15 safari 工具栏现在在元素内滚动时隐藏

    在 iOS 15 上 无论您有顶部还是底部工具栏 在元素内滚动都会导致窗口调整大小 工具栏消失 在 iOS 14 上 只有当主体滚动时才会发生这种情况 请参阅下面的 gif 注意 黄色区域是一个带有溢出滚动的 div 并且主体不滚动 iOS
  • 使用 JavaScript 截断文本并附加省略号

    如何截断字符串并附加省略号 我想截断类似的东西 this is a very long string to this is a ve function truncate input if input length gt 5 return i
  • 检查 Activity 是否正在从 Service 运行

    怎样才能一个Service检查其应用程序之一是否Activity正在前台运行 使用以下方法和您的包名称 如果您的任何活动位于前台 它将返回 true public boolean isForeground String myPackage
  • urlopen() gbk 页面时 Python 中的编码问题

    我的代码在这里 coding utf 8 if name main from urllib2 import urlopen url http iccna blog sohu com 164572951 html data urlopen u
  • java中pdf解析为文本

    我有一个阿拉伯语 PDF 我想使用 Java 将其解析为文本文档 我已经尝试了很多次 英语单词解析成功 但阿拉伯语单词解析失败 谁能推荐一个可以正确转换阿拉伯语单词的解决方案 我想到了几个图书馆 阿帕奇蒂卡 http tika apache
  • onChange 是一个延迟字符 - Hooks

    我是 React 和 Hooks 的新手 我创建了一个简单的搜索栏 用户可以在其中输入一些文本 然而 如果我console log之后的状态onChange 它总是落后一个字符 例如 如果我输入 披萨 console log鞋子 披萨 我的
  • 循环依赖——什么时候终止?

    我无法理解 python 是如何管理的imports 假设我有以下应用程序结构 application application py model init py user py 假设application py文件在创建数据库后导入模型模块
  • 使用 jQuery 将参数发送到 Java Server Pages (JSP)

    我想向 JSP 发送不同的参数 是否可以在 jQuery 中向 JSP 发送多个参数 因为jQuery是客户端 JSP是服务器端 告诉我 您可以通过ajax请求传递参数 例如 ajax type POST url userNameCheck
  • EventWaitHandle 是否有任何隐式 MemoryBarrier?

    我是这个网站的新手 所以如果我没有以可接受的方式发帖 请告诉我 我经常按照下面的示例编写一些代码 为了清楚起见 省略了诸如 Dispose 之类的内容 我的问题是 是否需要如图所示的挥发物 或者 ManualResetEvent Set 是
  • 双重提交 Cookie 和多个选项卡?

    The 双重提交cookie https www owasp org index php Cross Site Request Forgery 28CSRF 29 Prevention Cheat Sheet Double Submit C
  • 图像上方的文本 CSS Z 索引不起作用

    我试图强制文本位于图像上方 但是 它不想工作 我已经尝试在文本上设置 z index 100 在图像上设置 100 但它仍然不起作用 主要 HTML div class menu defaults menu overlay div clas
  • Windows 8 Metro 风格应用程序和窗口挂钩

    我对 window hooks 和 Windows 8 Metro 应用程序都很陌生 我想开发一个后台服务 流程 检查用户刚刚点击启动的 Metro 应用程序的 属性 例如播放音乐的应用程序 文本文档创建应用程序 根据找到的属性 激活其他应
  • 自定义 sbt 任务按标签运行测试

    我想做一些 sbt 自定义任务来按标签运行测试 scalatest 例如 现在我可以在 sbt 控制台中运行它 sbt test only n UnitTests 我想运行这个做类似的事情 sbt test unit or somethin
  • pexpect 发送光标移动

    如何使用 pexpect 发送光标移动 如上 下 左 右键 下面的示例是自动化 elink 它使用向上 向下键选择页面上的不同链接 from pexpect import spawn child spawn elinks http pyth
  • git:忽略*受*版本控制的文件

    A gitignorefile 允许忽略版本控制中的文件 我们有不同的情况 我们想在存储库中放置一些配置文件 这些文件需要根据每台机器进行更改 例如数据库访问信息 我们确实希望将它们作为占位符进行分发 因此我们将它们包含到存储库中 但是 稍
  • 正则表达式 - 贪婪量词[重复]

    这个问题在这里已经有答案了 我真的很纠结这个问题 import java util regex class Regex2 public static void main String args Pattern p Pattern compi
  • 数组索引越界异常[重复]

    这个问题在这里已经有答案了 一直在环顾四周 看看是否有什么可以帮助我 但我不太明白人们在回答什么 而我所理解的任何东西似乎都不能解决问题 所以基本上正如标题所说 我遇到了数组索引越界异常 但我不知道为什么 任何帮助是极大的赞赏 Code i
  • 如何实现高效的 WhenEach 来传输 IAsyncEnumerable 任务结果?

    我正在尝试使用以下提供的新工具更新我的工具集C 8 https learn microsoft com en us dotnet csharp whats new csharp 8 一种似乎特别有用的方法是Task WhenAll http