如何在 TPL 数据流中安排流控制?

2023-12-06

我正在尝试控制 TPL 数据流中的数据流。我有一个非常快的生产者和一个非常慢的消费者。 (我的真实代码更复杂,但尽管如此,这是一个非常好的模型,它重现了问题。)

当我运行它时,代码开始消耗内存,就像它已经过时一样——并且生产者上的输出队列会尽快填满。我真正希望看到的是生产者停止运行一段时间,直到消费者有机会请求它。根据我对文档的阅读,这是应该发生的事情:也就是说,我认为生产者会等待,直到消费者有空间。

显然,情况并非如此。我该如何解决这个问题,以免队列变得疯狂?

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;

namespace MemoryLeakTestCase
{
    class Program
    {

        static void Main(string[] args)
        {
            var CreateData = new TransformManyBlock<int, string>(ignore =>
            {
                return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
            });

            var ParseFile = new TransformManyBlock<string, string>(fileContent =>
            {
                Thread.Sleep(1000);
                return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
            }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
            );

            var EndOfTheLine = new ActionBlock<object>(f =>
                {
                });


            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
            CreateData.LinkTo(ParseFile, linkOptions);
            ParseFile.LinkTo(EndOfTheLine, linkOptions);

            Task t = new Task(() =>
            {
                while (true)
                {
                    Console.WriteLine("CreateData: " + Report(CreateData));
                    Console.WriteLine("ParseData:  " + Report(ParseFile));
                    Console.WriteLine("NullTarget: " +  EndOfTheLine.InputCount );
                    Thread.Sleep(1000);
                }

            });
            t.Start();

            CreateData.SendAsync(0);
            CreateData.Complete();

            EndOfTheLine.Completion.Wait();
        }

        public static string Report<T, U>(TransformManyBlock<T, U> block)
        {
            return String.Format("INPUT: {0}   OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' '));
        }


    }
}

通常,在这种情况下你会做的就是设置BoundedCapacity of the CreateData堵塞。但这在这里行不通,因为TransformManyBlock似乎不需要BoundedCapacity从单个填充输出队列时考虑IEnumerable.

您可以做的是创建一个迭代集合并使用的函数SendAsync()仅当目标可以接受时才发送更多数据:

/// <remarks>
/// If iterating data throws an exception, the target block is faulted
/// and the returned Task completes successfully.
/// 
/// Depending on the usage, this might or might not be what you want.
/// </remarks>
public static async Task SendAllAsync<T>(
    this ITargetBlock<T> target, IEnumerable<T> data)
{
    try
    {
        foreach (var item in data)
        {
            await target.SendAsync(item);
        }
    }
    catch (Exception e)
    {
        target.Fault(e);
    }
}

Usage:

var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
await ParseFile.SendAllAsync(data);
ParseFile.Complete();

如果你还想拥有CreateData块的行为与原始代码类似,您可以有两个有界的BufferBlocks, SendAllAsync()他们之间,然后使用Encapsulate()使它们看起来像一个块:

/// <remarks>
/// boundedCapacity represents the capacity of the input queue
/// and the output queue separately, not their total.
/// </remarks>
public static IPropagatorBlock<TInput, TOutput>
    CreateBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TOutput>> transform, int boundedCapacity)
{
    var input = new BufferBlock<TInput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
    var output = new BufferBlock<TOutput>(
        new DataflowBlockOptions { BoundedCapacity = boundedCapacity });

    Task.Run(
        async () =>
        {
            try
            {
                while (await input.OutputAvailableAsync())
                {
                    var data = transform(await input.ReceiveAsync());

                    await output.SendAllAsync(data);
                }

                output.Complete();
            }
            catch (Exception e)
            {
                ((IDataflowBlock)input).Fault(e);
                ((IDataflowBlock)output).Fault(e);
            }
        });

    return DataflowBlock.Encapsulate(input, output);
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 TPL 数据流中安排流控制? 的相关文章

  • 为什么相同的代码在同一台计算机上的执行时间可能不同?

    我是 C 编程新手 我编写了代码并希望获得它的运行时 这就是我所做的 每次运行代码时 我都会得到不同的运行时值 这样对吗 或者我的代码有问题吗 int main int argc char argv time t start end sta
  • 如何读取扩展文件属性/文件元数据

    因此 我按照教程使用 ASP net core 将文件 上传 到本地路径 这是代码 public IActionResult About IList
  • std::cout 和 std::wcout 有什么区别?

    在c 中 有什么区别std cout and std wcout 它们都控制流缓冲区的输出或将内容打印到控制台 或者它们只是相似吗 它们作用于不同的字符类型 std cout uses char作为字符类型 std wcout uses w
  • 在新的浏览器进程中打开 URL

    我需要在新的浏览器进程中打开 URL 当浏览器进程退出时我需要收到通知 我当前使用的代码如下 Process browser new Process browser EnableRaisingEvents true browser Star
  • XamlReader.Load 在后台线程中。是否可以?

    WPF 应用程序具有从单独的文件加载用户控件的操作 使用XamlReader Load method StreamReader mysr new StreamReader pathToFile DependencyObject rootOb
  • 获取 WPF 控件的所有附加事件处理程序

    我正在开发一个应用程序 在其中动态分配按钮的事件 现在的问题是 我希望获取按钮单击事件的所有事件 因为我希望删除以前的处理程序 我尝试将事件处理程序设置为 null 如下所示 Button Click null 但是我收到了一个无法分配 n
  • ASP.NET:获取自 1970 年 1 月 1 日以来的毫秒数

    我有一个 ASP NET VB NET 日期 我试图获取自 1970 年 1 月 1 日以来的毫秒数 我尝试在 MSDN 中寻找方法 但找不到任何东西 有谁知道如何做到这一点 从 NET 4 6 开始 该方法ToUnixTimeMillis
  • 关于在 Windows 上使用 WiFi Direct Api?

    我目前正在开发一个应用程序 我需要在其中创建链接 阅读 无线网络连接 在桌面应用程序 在 Windows 10 上 和平板电脑 Android 但无关紧要 之间 工作流程 按钮 gt 如果需要提升权限 gt 创建类似托管网络的 WiFi 网
  • 在一个字节中存储 4 个不同的值

    我有一个任务要做 但我不知道从哪里开始 我不期待也绝对不想要代码中的答案 我想要一些关于该怎么做的指导 因为我感到有点失落 将变量打包和解包到一个字节中 您需要在一个字节中存储 4 个不同的值 这些值为 NAME RANGE BITS en
  • Visual Studio 中的测试单独成功,但一组失败

    当我在 Visual Studio 中单独运行测试时 它们都顺利通过 然而 当我同时运行所有这些时 有些通过 有些失败 我尝试在每个测试方法之间暂停 1 秒 但没有成功 有任何想法吗 在此先感谢您的帮助 你们可能有一些共享数据 检查正在使用
  • 上下文敏感与歧义

    我对上下文敏感性和歧义如何相互影响感到困惑 我认为正确的是 歧义 歧义语法会导致使用左推导或右推导构建多个解析树 所有可能的语法都是二义性的语言是二义性语言 例如 C 是一种不明确的语言 因为 x y 总是可以表示两个不同的事物 如下所述
  • 使用 Moq 使用内部构造函数模拟类型

    我正在尝试模拟 Microsoft Sync Framework 中的一个类 它只有一个内部构造函数 当我尝试以下操作时 var fullEnumerationContextMock new Mock
  • 将 log4net 与 Autofac 结合使用

    我正在尝试将 log4net 与 Autofac 一起使用 我粘贴了这段代码http autofac readthedocs org en latest examples log4net html http autofac readthed
  • 如何编写一个同时需要请求和响应Dtos的ServiceStack插件

    我需要提供本地化数据服务 所有本地化的响应 Dto 都共享相同的属性 IE 我定义了一个接口 ILocalizedDto 来标记那些 Dto 在请求端 有一个ILocalizedRequest对于需要本地化的请求 Using IPlugin
  • std::async 与重载函数

    可能的重复 std bind 重载解析 https stackoverflow com questions 4159487 stdbind overload resolution 考虑以下 C 示例 class A public int f
  • 有人可以提供一个使用 Amazon Web Services 的 itemsearch 的 C# 示例吗

    我正在尝试使用 Amazon Web Services 查询艺术家和标题信息并接收回专辑封面 使用 C 我找不到任何与此接近的示例 所有在线示例都已过时 并且不适用于 AWS 的较新版本 有一个开源项目CodePlex http www c
  • 如何在按钮单击时模拟按键 - Unity

    我对 Unity 中的脚本编写非常陌生 我正在尝试创建一个按钮 一旦单击它就需要模拟按下 F 键 要拾取一个项目 这是我当前的代码 在编写此代码之前我浏览了所有统一论坛 但找不到任何有效的东西 Code using System Colle
  • 检查Windows控制台中是否按下了键[重复]

    这个问题在这里已经有答案了 可能的重复 C 控制台键盘事件 https stackoverflow com questions 2067893 c console keyboard events 我希望 Windows 控制台程序在按下某个
  • 当另一个线程可能设置共享布尔标志(最多一次)时,是否可以读取共享布尔标志而不锁定它?

    我希望我的线程能够更优雅地关闭 因此我尝试实现一个简单的信号机制 我不认为我想要一个完全事件驱动的线程 所以我有一个工作人员有一种方法可以使用关键部分优雅地停止它Monitor 相当于C lock我相信 绘图线程 h class Drawi
  • 如何使用 Word Automation 获取页面范围

    如何使用办公自动化找到 Microsoft Word 中第 n 页的范围 似乎没有 getPageRange n 函数 并且不清楚它们是如何划分的 这就是您从 VBA 执行此操作的方法 转换为 Matlab COM 调用应该相当简单 Pub

随机推荐

  • Java 中的空指针异常和范围

    我正在尝试在 Java 环境中学习面向对象编程 我正在编写一个相当简单的代码 但收到此错误 Exception in thread main java lang NullPointerException at Advisor score R
  • 通过 GRAPHQL 订阅传递数据仅在其中一个参数上给出 null

    我有以下 GRAPHQL 订阅 Schema graphql type Subscription booking SubscriptionData type SubscriptionData booking Booking action S
  • NSStoryboardSegue 示例代码(Yosemite Storyboard)

    OS X Yosemite 推出NSStoryboardSegue 故事板转场指定故事板中两个场景之间的过渡或包含关系 Update 如果我尝试在带有 Yosemite 的故事板中使用 NSStoryboardSegue 子类 它会因 SI
  • PrintWriter 未写入文件(Java)

    我正在编写一种提款机程序 它将数据输出到文件中 是 的 我知道它不是英文的 但这不是重点 但我遇到了错误 当我尝试使用时PrintWriter它不起作用 我不知道为什么 public void writeFooter List
  • URL 中的 Unicode 字符(都可以 - 除了 IE)

    我有一些简单的 HTML 其中有另一个 HTML 文件的链接 但该文件名包含 unicode 字符 根据我对链接进行编码的方式 Windows 上的 IE 将不会打开它 但相同的链接可以在所有其他浏览器 Windows 和 Mac 上运行
  • 当顶层没有焦点时更新idletasks

    I ve a ttk progressbar在我的顶层 唯一的 我通过以下函数更新 proc progress x global prog set prog expr fmod prog x 100 update idletasks pro
  • R的download.file的“内部方法”是什么?

    我正在尝试下载以下数据集download file 仅当method wget Doesn t work download file http uofi box com shared static bba3968d7c3397c024ec
  • 在 Eclipse 中从 Java 访问 Scala 实例变量

    我怀疑我一定错过了一些非常基本的东西 但我似乎无法从 Java 代码访问 Scala 字段 package test class TestScala myNumber Int val myNum Int myNumber package t
  • 为什么这个简单的代码适用于 `exit` 而不适用于 `_exit`?

    请看一下这个示例代码 它使用一个非常完善的编程模式来重定向stdout到管道 include
  • CakePHP 2.3 - 单元测试 用户登录

    我想我必须在这里寻求一些帮助来解决我的问题 我花了整个晚上的时间来处理这个问题 我有一个登录方法UsersController像这样 public function login if this gt request gt is post i
  • 为什么 Glass.Mapper 没有填充我的 Enumerable?

    我完全困惑为什么这不起作用 我有一个类 其中包含其他类的列表作为其属性 public class Widget public virtual IEnumerable
  • 如何在Makefile中导出环境变量

    我正在尝试导出环境变量 以便可以在父 shell 中访问它们 生成文件 export vars extract vars from text file remove comments export output export cat env
  • 如何让用户退出使用 Google OAuth2 登录的应用程序?

    我已经在我的网络服务器应用程序中实现了 Google OAuth2 登录流程 使用 python flask 我的应用程序将用户重定向到 Google 他们使用 Google 凭据登录并被引导回我的应用程序 我无法决定如何为此应用程序实现注
  • 提取完成前显示的骨干木偶

    我知道我在做一些愚蠢的事情 但我的骨干木偶应用程序给了我没有意义的模板错误 它似乎在获取事件发生之前渲染单个项目 templateSettings interpolate g MyApp new Backbone Marionette Ap
  • mod_rewrite $_GET

    我有一个FrontController期待两个 GET params controller action 对该站点的典型调用如下所示 我想要做的是允许用户通过以下网址访问该网站 http foo bar start register 我尝试
  • jQuery AJAX post 与文件上传

    我有一个功能页面将数据发布到页面 WebMethod WebMethod public static string sayHello string pTest string pText return pTest pText 我用这个 jQu
  • 用于批量重命名文件夹中文件的 Shell/Bash 快捷方式

    Shell Bash 中是否有一个快捷方式可以根据正则表达式或其他一些条件重命名文件夹中的所有文件 我在这里寻找的是我的文件夹文档 假设有 100 个具有以下命名约定的文本文件
  • “降级”MS dll 在升级时消失 - Windows Installer

    我们开发了一个通过 Windows Installer 分发的应用程序 该应用程序是使用 WiX 创建的 我们的客户可以从任何旧版本升级到最新版本 然而 我们的最新版本删除了 2 个 dll 这只能通过重新安装来纠正 有关 NuGet 包的
  • 如何配置 Socket.io 在 https 上的同一端口上运行?

    和往常一样 我绞尽脑汁地寻找问题的答案 所以我再次向你们所有的天才伸出援手 我已经使用 socket io 使用express 设置了一个 Node js 服务器 它使用端口 8443 运行良好 它运行良好 由于我的许多客户似乎不允许端口
  • 如何在 TPL 数据流中安排流控制?

    我正在尝试控制 TPL 数据流中的数据流 我有一个非常快的生产者和一个非常慢的消费者 我的真实代码更复杂 但尽管如此 这是一个非常好的模型 它重现了问题 当我运行它时 代码开始消耗内存 就像它已经过时一样 并且生产者上的输出队列会尽快填满