下游块中的 TPL 数据流和异常处理

2024-01-10

我有以下伪代码:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
    {
        await Task.Delay(500);
        Trace.TraceInformation(
            $"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
        // handling some logic but it throws
        if (item >= 5) throw new Exception("Something bad happened");

    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });

queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });

var targets = new List<ITargetBlock<int>> {queue};

var broadcaster = new ActionBlock<int>(
    async item =>
    {
        var processingTasks = targets.Select(async t =>
        {
            try
            {
                // This is condition is always false
                // t (bufferblock) has no exceptions. Exception is raised in downstream action block where it sends to
                if (!await t.SendAsync(item))
                    await t.Completion;
            }
            catch (Exception e)
            {
                Trace.TraceInformation("Handled exception : " + e.Message);
            }
        });

        try
        {
            // Neither here the exception is rethrowed
            await Task.WhenAll(processingTasks);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Handled exception WhenAll : " + e.Message);
        }
    });

for (var i = 1; i <= 10; i++)
{
    broadcaster.Post(i);
}

管道是这样配置的ActionBlock<int> => BufferBlock<int> => ActionBlock<int>.

最后ActionBlock<int>抛出异常,但它不会重新抛出到我想要处理它的源代码块。

如何重写此代码以便正确处理异常?


您可以找到该主题的官方指南here https://blogs.msdn.microsoft.com/pfxteam/2011/11/09/exception-handling-in-tpl-dataflow-networks/。总体解决方案是订阅所有区块Completion https://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.idataflowblock.completion.aspx任务包括检查其状态,并在需要时替换有故障的块(也应该存储块的所有引用)。请参阅整篇文章以获取更多信息。

网络的行为Faulted blocks

  1. 保留消息
    为了避免消息损坏,故障块应该清除其消息队列并移动到Faulted状态 尽快地。有一个场景不符合 这条规则:源块保存目标保留的消息。 如果遇到内部异常的块有一条消息: 被目标保留,保留消息必须不 掉落,并且该块不应移动到Faulted状态 直到消息被释放或消费。

  2. 悬挂网络
    ...

    • 保留对网络中所有块的引用并使用Task.WaitAll https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.waitall.aspx or Task.WhenAll https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenall.aspx等待他们(同步或 异步)。如果一个块出现故障,它的Completion https://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.idataflowblock.completion.aspx任务将 完成于Faulted state.
    • Use DataflowLinkOptions https://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.dataflowlinkoptions.aspx with PropagateCompletion == true构建线性网络时。这将从中传播块完成 源到目标。在这种情况下,等待网络就足够了 叶块。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

下游块中的 TPL 数据流和异常处理 的相关文章

  • 起订量要求?违背了目的?

    是否需要虚拟化您想要模拟的所有属性访问器就违背了模拟的目的 我的意思是 如果我必须修改我的对象并虚拟化我想要模拟的每个访问器 我难道不能继承我的类并自己模拟它吗 你的问题非常有效 但如果你仔细想想 没有其他方法可以模拟课程 如果你采用一个接
  • SharpZipLib - 将文件夹/目录添加到 zip 存档

    通过示例 我很好地掌握了如何提取 zip 文件 几乎在每个示例中 识别 ZipEntry 是否为目录的方法如下 string directoryName Path GetDirectoryName theEntry Name string
  • Accept() 是线程安全的吗?

    我目前正在用 C 语言为我正在做的课程编写一个简单的网络服务器 我们的一项要求是实现一个线程池来使用 pthread 处理连接 我知道我将如何粗略地执行此操作 在主线程中调用accept并将文件描述符传递给freee线程 但是我的朋友建议了
  • 如何使用 MVVM 更新 WPF 中编辑的数据? [复制]

    这个问题在这里已经有答案了 我正在为聊天应用程序构建 UI 设计 在尝试更新所选联系人的消息时遇到问题 选择现有联系人 选择编辑选项 然后编辑其属性 例如用户名和图像 后 唯一进行的更改是联系人的用户名和图像 我仍然想更改 MessageM
  • 使用 OpenGL 着色器进行数学计算 (C++)

    我有一个矩阵 例如 100x100 尺寸 我需要对每个元素进行计算 matrix i j tt 8 5例如 我有一个巨大的矩阵 我想使用 OpenGL 着色器来实现该算法 我想使用着色器 例如 uniform float val unifo
  • 为什么这个函数指针赋值在直接赋值时有效,但在使用条件运算符时无效?

    本示例未使用 include 在 MacOS10 14 Eclipse IDE 上编译 使用 g 选项 O0 g3 Wall c fmessage length 0 假设这个变量声明 int fun int 这无法通过 std touppe
  • 访问“if”语句之外的变量

    我怎样才能使insuranceCost以外可用if陈述 if this comboBox5 Text Third Party Fire and Theft double insuranceCost 1 在 if 语句之外定义它 double
  • 从 future 中检索值时的 SIGABRT

    我在使用 C 11 future 时遇到问题 当我打电话时wait or get 关于返回的未来std async 程序接收从mutex标头 可能是什么问题呢 如何修复它 我在 Linux 上使用 g 4 6 将以下代码粘贴到 ideone
  • 如何在编译C代码时禁用警告?

    我正在使用 32 位 Fedora 14 系统 我正在使用编译我的源代码gcc 有谁知道如何在编译c代码时禁用警告 EDIT 是的 我知道 最好的办法是修复这些警告以避免任何未定义 未知的行为 但目前在这里 我第一次编写了巨大的代码 并且在
  • 导出类时编译器错误

    我正在使用 Visual Studio 2013 但遇到了一个奇怪的问题 当我导出一个类时 它会抛出 尝试引用已删除的函数 错误 但是 当该类未导出时 它的行为会正确 让我举个例子 class Foo note the export cla
  • 如何使用 wpf webbrowser 将数据发布到 Web 服务器

    我想从数据库获取数据并使用它来让用户登录到网站 我有一个包含 Web 浏览器控件的 wpf 页面 我有这样的代码 用于将用户登录到用 php 编写的网站
  • 更改 IdentityServer4 实体框架表名称

    我正在尝试更改由 IdentityServer4 的 PersistedGrantDb 和 ConfigurationDb 创建的默认表名称 并让实体框架生成正确的 SQL 例如 而不是使用实体IdentityServer4 EntityF
  • 以编程方式更新 ClickOnce 应用程序的部署清单会导致缺少 4.0 中所需的 <兼容框架> 元素

    我正在致力于自动化 NET 4 0 ClickOnce WPF 应用程序的安装程序 该应用程序需要在应用程序配置文件 我经历了寻找必须遵循的具体步骤的棘手过程Mage exe http msdn microsoft com en us li
  • 系统错误 124 - SHFileOperation 的 ERROR_INVALID_LEVEL

    我在使用时遇到问题SHFileOperation SHFileOperation SHFILEOPSTRUCT https stackoverflow com questions 9191415 shfileoperation shfile
  • 设计 Javascript 前端 <-> C++ 后端通信

    在我最近的将来 我将不得不制作一个具有 C 后端和 Web 前端的系统 要求 目前 我对此了解不多 我认为前端将触发数据传输 而不是后端 所以不需要类似 Comet 的东西 由于在该领域的经验可能很少 我非常感谢您对我所做的设计决策的评论
  • 使用(linq to sql)更新错误

    我有两个表 通过外键 CarrierID 绑定 Carrier CarrierID CarrierName CarrierID 1 CarrierName DHL CarrierID 2 CarrierName Fedex Vendor V
  • 如何使用收益返回和递归获得字母的每个组合?

    我有几个像这样的字符串列表 可能有几十个列表 1 A B C 2 1 2 3 3 D E F 这三个仅作为示例 用户可以从几十个具有不同数量元素的类似列表中进行选择 再举个例子 这对于用户来说也是一个完全有效的选择 25 empty 4 1
  • 相当于 C# 中 Java 的“ByteBuffer.putType()”

    我正在尝试通过从 Java 移植代码来格式化 C 中的字节数组 在 Java 中 使用方法 buf putInt value buf putShort buf putDouble 等等 但我不知道如何将其移植到 C 我尝试过 MemoryS
  • 新的 .NET 6 控制台模板中的 C# 函数重载不起作用

    我在尝试重载该函数时遇到错误Print object in the 新的 NET 6 C 控制台应用程序模板 https learn microsoft com en us dotnet core tutorials top level t
  • FindAsync 很慢,但是延迟加载很快

    在我的代码中 我曾经使用加载相关实体await FindAsync 希望我能更好地遵守 C 异步指南 var activeTemplate await exec DbContext FormTemplates FindAsync exec

随机推荐

  • 无法在 iOS 上使用自定义 @protocol

    注意 以下是使用启用了自动引用计数 ARC 的 iOS 我认为 ARC 可能与它不起作用有很大关系 因为这是根据我通过谷歌找到的示例设置的 我正在尝试创建一个协议来通知委托用户从 UITableView 选择的文件名 文件列表视图控制器 h
  • 离子搜索栏搜索不适用于 cypress {enter}

    我有一个 Ionic 6 应用程序 我正在使用 cypress 9 3 1 对其进行测试 在我的应用程序中 我使用像这样的离子搜索栏
  • 插入...值(SELECT ... FROM ...)

    我在尝试着INSERT INTO使用另一个表的输入的表 尽管这对于许多数据库引擎来说是完全可行的 但我似乎总是很难记住正确的语法SQL当天的发动机 MySQL http en wikipedia org wiki MySQL Oracle
  • WPF DPI 问题

    我开发了一个应用程序 在我的计算机上看起来很棒 但当我将其安装到具有不同分辨率和 DPI 设置的其他计算机上时 它看起来很糟糕 控件相互重叠 这真是太痛苦了 有人对如何避免这种情况有什么建议吗 Windows 无法知道屏幕的本机 DPI 每
  • Python3 - 无法读取 docx、odt 文件 - UnicodeDecodeError:“utf-8”编解码器无法解码位置 10 中的字节 0xea:无效的连续字节

    我正在尝试将大 docx 文件拆分为小文件 为此 当读取文件时python3 6使用以下代码 with open h docx r as f a f read 它抛出这个错误 Traceback most recent call last
  • Linux 中的沙箱

    我想创建一个 Web 应用程序 允许用户上传一些 C 代码 并查看其执行结果 代码将在服务器上编译 用户不受信任 这显然会带来巨大的安全隐患 所以我需要为应用程序创建某种沙箱 在最基本的层面上 我想将对文件系统的访问限制为某些指定的目录 我
  • Spring JPA 中的 @Entity 是什么?

    具体来说 我指的是javax persistence Entity 根据我将鼠标悬停在上面时显示的文档 在 VS Code 中它指出 指定该类是一个实体 该注解适用于 实体类 对于 Spring JPA 来说 类是实体意味着什么 Entit
  • GetComInterfaceForObject 是否固定对象?

    使用 GetComInterfaceForObject 并将返回的 IntPtr 传递给非托管代码是否会阻止托管对象在内存中移动 或者 clr 是否以某种方式维护该 ptr 请注意 非托管代码将在程序的生命周期内使用它 并且我需要确保托管对
  • 在单个测试类中测试接口的多个实现

    我需要通过班级级别的测试数据但是Theory and InlineData属性只能用在方法上 public class ContainerTests TestFixture private IContainer container publ
  • 在 GPU 上预加载整个数据集以训练 Keras 模型

    我有一个特定的情况 其中网络相对较小 为了收敛和泛化问题 我应该保持较小的批量大小 例如 256 这导致每个时期要处理数百个批量 不幸的是 在这种情况下 批量 加载和损失计算成为瓶颈 如timeline工具告诉我 在 TensorFlow
  • spring可以支持多应用共享成员资格吗?

    spring 框架是否支持共享公共用户群的多个应用程序 例如2 个独立的 Web 应用程序以某种方式连接到单个数据库以获取用户相关信息 用户名 密码 甚至角色 这个想法是这样的 类似于ASP NET 会员资格 http msdn micro
  • 图像隐写术抵御各种攻击的最佳实践是什么? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我对此真的很好奇 因为现在每个频道都可以以某种方式修改或压缩图像 这可以被视为对隐写术的攻击 我们可以将隐写术分为两种基本类型 第一种在图像的空
  • ClassNotFound launchig 地图活动在 Android 库中声明

    当尝试启动从 MapActivity 派生并在 Android 库项目中声明的活动 TestLocationActivity 时 我收到此异常 09 08 09 29 45 357 ERROR AndroidRuntime 7502 jav
  • 是否可以更改Matlab绘图功能中的标记? [复制]

    这个问题在这里已经有答案了 我正在尝试使用 matlab 绘图函数来创建绘图 然而 可用的标记是有限的 例如 plot x y o 将用圆形标记绘制 但是 如果我想要带有箭头符号或字母的标记 这是不可能的 有谁知道有什么方法可以做到这一点
  • Excel VBA 调试:循环不搜索整个范围

    我编写了一个基本宏来搜索范围 在一张纸中 然后根据保存选择值的第三张纸复制所选单元格 到另一张纸 我已经使用了 i x to y 的循环 但看起来宏正在跳过一些行 即 如果第 1 行到第 4 行有 4 个要复制的有效值 则宏仅复制第 2 行
  • 如何拥有连续的 Firebase 云函数来获取连续的数据流?

    我需要使用 Twitter Stream API 将推文数据流式传输到我的 firebase 云函数 如下所示 client stream statuses filter params stream gt stream on data tw
  • 为什么 .NET JIT 编译器决定不内联或优化对没有副作用的空静态方法的调用?

    我认为我观察到 NET JIT 编译器没有内联或优化对没有副作用的空静态方法的调用 考虑到一些定制的在线资源 这有点令人惊讶 我的环境是 x64 Windows 8 1 NET Framework 4 5 上的 Visual Studio
  • 从posenet提取关键点到json文件?

    我正在研究posenet的张量流实现来实时进行姿态估计 如果可能的话还可以在离线模式下进行 我正在研究以下存储库 https github com tensorflow tfjs models tree master posenet htt
  • 如何在 Maven 中调试工件替换

    我有一个父项目包含十几个子项目 其中一个子项目使用org apache httpcomponents httpclient jar 4 3 5 这取决于org apache httpcomponents httpcore jar 4 3 2
  • 下游块中的 TPL 数据流和异常处理

    我有以下伪代码 var queue new BufferBlock