如何以(线程)安全的方式跟踪 TPL 管道中的故障项

2023-12-01

我正在使用 TPL 管道设计和 Stephen Cleary 的管道设计尝试库简而言之,它包装了值/异常并将其沿着管道浮动。因此,即使是在处理方法中抛出异常的项目,最后当我await resultsBlock.Completion; have Status=RunToCompletion。所以我需要其他方法来注册有缺陷的项目。这是小样本:

var downloadBlock = new TransformBlock<int, Try<int>>(construct => Try.Create(() =>
{
    //SomeProcessingMethod();
    return 1;
}));
var processBlock = new TransformBlock<Try<int>, Try<int>>(construct => construct.Map(value =>
{
    //SomeProcessingMethod();
    return 1;
}));
var resultsBlock = new ActionBlock<Try<int>>(construct =>
{
    if (construct.IsException)
    {
        var exception = construct.Exception;
        switch (exception)
        {
            case GoogleApiException gex:
                //_notificationService.NotifyUser("OMG, my dear sir, I think I messed something up:/"
                //Register that this item was faulted, so we know that we need to retry it.
                break;
            default:
                break;
        }
    }
});

一种解决方案是创建一个List<int> FaultedItems;我将在其中插入所有有缺陷的项目Exception处理块然后之后await resultsBlock.Completion;我可以检查列表是否不为空,并为有问题的项目创建新的管道。我的问题是如果我使用List<int>如果我决定使用,我是否有遇到线程安全问题的风险MaxDegreeOfParallelism设置,我最好使用一些ConcurrentCollection?或者也许这种方法在其他方面存在缺陷?


我将重试块实现从答案转换为类似的问题, 与史蒂芬·克利里 (Stephen Cleary) 合作Try类型作为输入和输出。方法CreateRetryTransformBlock返回一个TransformBlock<Try<TInput>, Try<TOutput>>,以及方法CreateRetryActionBlock返回的东西实际上是ActionBlock<Try<TInput>>.

还有另外三个选项可用,MaxAttemptsPerItem, MinimumRetryDelay and MaxRetriesTotal,在标准之上执行选项.

public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
    /// <summary>The limit after which an item is returned as failed.</summary>
    public int MaxAttemptsPerItem { get; set; } = 1;
    /// <summary>The minimum delay duration before retrying an item.</summary>
    public TimeSpan MinimumRetryDelay { get; set; } = TimeSpan.Zero;
    /// <summary>The limit after which the block transitions to a faulted
    /// state (unlimited is the default).</summary>
    public int MaxRetriesTotal { get; set; } = -1;
}

public class RetryLimitException : Exception
{
    public RetryLimitException(string message, Exception innerException)
        : base(message, innerException) { }
}

public static TransformBlock<Try<TInput>, Try<TOutput>>
    CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));
    int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
    int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
    TimeSpan retryDelay = dataflowBlockOptions.MinimumRetryDelay;
    if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxAttemptsPerItem));
    if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxRetriesTotal));
    if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MinimumRetryDelay));

    var internalCTS = CancellationTokenSource
        .CreateLinkedTokenSource(dataflowBlockOptions.CancellationToken);

    var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
    var taskScheduler = dataflowBlockOptions.TaskScheduler;

    var exceptionsCount = 0;
    SemaphoreSlim semaphore;
    if (maxDOP == DataflowBlockOptions.Unbounded)
    {
        semaphore = new SemaphoreSlim(Int32.MaxValue);
    }
    else
    {
        semaphore = new SemaphoreSlim(maxDOP, maxDOP);

        // The degree of parallelism is controlled by the semaphore
        dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

        // Use a limited-concurrency scheduler for preserving the processing order
        dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
            taskScheduler, maxDOP).ConcurrentScheduler;
    }

    var block = new TransformBlock<Try<TInput>, Try<TOutput>>(async item =>
    {
        // Continue on captured context after every await
        if (item.IsException) return Try<TOutput>.FromException(item.Exception);
        var result1 = await ProcessOnceAsync(item);
        if (item.IsException || result1.IsValue) return result1;
        for (int i = 2; i <= maxAttemptsPerItem; i++)
        {
            await Task.Delay(retryDelay, internalCTS.Token);
            var result = await ProcessOnceAsync(item);
            if (result.IsValue) return result;
        }
        return result1; // Return the first-attempt exception
    }, dataflowBlockOptions);

    dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
    dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value

    _ = block.Completion.ContinueWith(_ => internalCTS.Dispose(),
        TaskScheduler.Default);

    return block;

    async Task<Try<TOutput>> ProcessOnceAsync(Try<TInput> item)
    {
        await semaphore.WaitAsync(internalCTS.Token);
        try
        {
            var result = await item.Map(transform);
            if (item.IsValue && result.IsException)
            {
                ObserveNewException(result.Exception);
            }
            return result;
        }
        finally
        {
            semaphore.Release();
        }
    }

    void ObserveNewException(Exception ex)
    {
        if (maxRetriesTotal == -1) return;
        uint newCount = (uint)Interlocked.Increment(ref exceptionsCount);
        if (newCount <= (uint)maxRetriesTotal) return;
        if (newCount == (uint)maxRetriesTotal + 1)
        {
            internalCTS.Cancel(); // The block has failed
            throw new RetryLimitException($"The max retry limit " +
                $"({maxRetriesTotal}) has been reached.", ex);
        }
        throw new OperationCanceledException();
    }
}

public static ITargetBlock<Try<TInput>> CreateRetryActionBlock<TInput>(
    Func<TInput, Task> action,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateRetryTransformBlock<TInput, object>(async input =>
    {
        await action(input).ConfigureAwait(false); return null;
    }, dataflowBlockOptions);
    var nullTarget = DataflowBlock.NullTarget<Try<object>>();
    block.LinkTo(nullTarget);
    return block;
}

使用示例:

var downloadBlock = CreateRetryTransformBlock(async (int construct) =>
{
    int result = await DownloadAsync(construct);
    return result;
}, new RetryExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10,
    MaxAttemptsPerItem = 3,
    MaxRetriesTotal = 100,
    MinimumRetryDelay = TimeSpan.FromSeconds(10)
});

var processBlock = new TransformBlock<Try<int>, Try<int>>(
    construct => construct.Map(async value =>
{
    return await ProcessAsync(value);
}));

downloadBlock.LinkTo(processBlock,
    new DataflowLinkOptions() { PropagateCompletion = true });

为了简单起见,如果某个项目已重试最大次数,则保留的异常是第一个发生的异常。随后的异常将丢失。在大多数情况下,丢失的异常无论如何都会与第一个异常具有相同的类型。

Caution:上述实现没有高效的输入队列。如果你向这个块提供数百万个项目,内存使用量将会爆炸。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何以(线程)安全的方式跟踪 TPL 管道中的故障项 的相关文章

  • 用于在 n LSBits 之后清除 m 位的掩码

    我在一次采访中被问到这个问题 要清除 16 位整数的 n 位之后的 m 位 假设数字是 10010010010100101 清除 LSBit 中 5 位后的三位 之前 1001100111011001 之后 1001100100011001
  • 从 gdb 设置 std::string 变量值?

    是否有可能 当调试器在断点处停止时 修改 std string 变量的值 而不需要采取诸如调整当前缓冲区的内存映像之类的黑客手段 例如类似于 set var mystring hello world 试试这个 经过测试并且对我有用 call
  • 保序最小完美哈希函数

    我想用 C 为字典中的单词实现 OPMPH 函数 我该怎么做 Thanks 你看过这些论文吗 http dx doi org 10 1016 0020 0190 92 90220 P http dx doi org 10 1016 0020
  • libc++ 中短字符串优化的机制是什么?

    这个答案 https stackoverflow com a 10319672 1805388给出了短字符串优化 SSO 的高级概述 但是 我想更详细地了解它在实践中是如何工作的 特别是在 libc 实现中 字符串必须有多短才能符合 SSO
  • 如何在Windows窗体中平滑地重新绘制Panel

    如何将面板重漆成光滑的 我正在使用一个使面板无效的计时器 panel1 Invalidate 每 300 毫秒一次 然后panel1 Paint如果我向该面板添加图像 问题是它看起来像是在跳跃 我需要尽可能快地移动其上的一张图像 这是截屏问
  • 实体框架 6 - 使用我的 getHashCode()

    这篇文章需要了解一定的背景知识 请耐心等待 我们有一个使用 EF 的 n 层 WPF 应用程序 我们通过 dbContext 将数据从数据库加载到 POCO 类中 dbContext 被销毁 然后用户可以编辑数据 我们使用 Julie Le
  • C++ 将 HashMap 对象返回给 Java

    我有一个 JAVA 调用的 JNI 函数 需要构建并返回一个 HashMap 映射的键是 String 相应的值是 boolean 或 Boolean 任何一个都可以 只要它有效 使用我当前的代码 如下 该字符串已成功添加到返回的映射中 并
  • 编译器之间的重载分辨率不同

    我构建了以下我的问题的最小示例 include
  • Unity C# 嵌套 IEnumerator

    在我的代码中 我嵌套了 IEnumerator 方法 如下所示 private IEnumerator PerformRequest string url Doing stuff UnityWebRequest request UnityW
  • 类型或命名空间名称“X”在命名空间“Y”中不存在 - 在 VS 生成的代码中

    这是我遇到过的最奇怪的错误 这个 MVC Web 项目直到今天都运行良好 几周以来还没有任何人对其进行处理 尽管没有任何改变 但现在简单地运行它会导致 命名空间 CMSModels ViewModels 中不存在类型或命名空间名称 Colo
  • NHibernate 在生产中很少会抛出违反主键异常,无法用测试用例重现

    试图弄清楚这个问题 对我来说 这似乎不可能 我的服务器报告在繁忙的服务器上每天发生一次或两次以下错误 PlaylistItem create System Data SqlClient SqlException Violation of P
  • 如何使用 Sitecore Glass Mapper 渲染带有 css 类的链接

    我有以下链接 a class btn btn primary href View details a 如何使用 sitecore glass 渲染链接并使其仍然保留 css 类 使用 sitecore 中的字段渲染器 您过去可以将类作为附加
  • C# 在不使用反射的情况下运行时出现“找不到方法”异常

    我在获得上述异常时遇到问题 我有一个相对简单的结构 分为两个 dll 第一个包含 IEntityService IEntity 和基本实现 第二个包含实际的实现和接口 因此 有一个 IMachine 服务实现了 IEntityService
  • C# 从mp4文件中提取mp3文件

    有没有简单的方法从 mp4 文件中提取 mp3 文件 我已经尝试过更改文件扩展名 但这不允许我编辑 mp3 描述 谢谢你 Use Xabe FFmpeg https xabe net product xabe ffmpeg 它是免费的 非商
  • 结构体指针运算符猜想(理论)

    结构体指针的使用非常频繁 因此有一个特殊的运算符 gt 下面的表达式是等价的 x y x gt y 将此运算符简单地视为如下定义的预处理器宏是否公平 define x gt x 为什么或者为什么不 或者它从一开始就被编码为运算符 这有何不同
  • 隐式构造函数和默认构造函数有什么区别?

    这是非常微不足道的 但是捷克语 我的母语 不区分隐式和默认 所以我对一些捷克语翻译感到困惑 隐式和默认构造函数或构造函数调用之间有什么区别 struct Test Test int n 0 您能用这些术语描述以下语句的作用吗 Test t1
  • 空序列的算术平均值是多少?

    免责声明 不 我没有找到任何明显的答案 这与我的预期相反 在寻找代码示例时 算术平均值 我可以通过谷歌找到的前几个例子似乎是这样定义的 空序列生成的平均值为0 0 eg here https rosettacode org wiki Ave
  • 如何在 MVC 中点击链接的主视图中渲染部分视图?

    我有像下面这样的控制器操作方法将从数据库返回所有详细信息 public ActionResult Index BusDataContext db new BusDataContext List
  • 如何在 Linux 中创建可通过 Screen 应用程序连接的 pty

    我想创建 C C 应用程序 它在 dev xxx 中创建新的 虚拟 设备 并且能够与 屏幕 应用程序连接 例如 循环运行的程序会创建新的 dev ttyABC 然后我将使用 屏幕 dev ttyABC 当我向那里发送一些字符时 应用程序将其
  • 在 Qt 服务器上验证用户身份

    我正在尝试使用 C QtTcpSocket 为个人项目 多人国际象棋游戏 实现身份验证系统 我的朋友建议了一种验证用户的方法 但我想问是否有更简单或更好的方法 来自 Python 背景 做这个项目主要是为了加深对 C 的理解 我将发布我朋友

随机推荐

  • 通用Windows平台和Live SDK

    我有个问题 我最近在我的计算机上安装了 VS 2015 和 Windows 10 拥有通用应用程序真是太棒了 我计划在 UWP 上转换我的一些程序 但我有一个问题 在我的一个程序中 我允许用户将其数据保存在 OneDrive 上 如果 On
  • 如何将 stringVar() 从 tk 转换为 pyqt

    我有这个函数作为我的代码的一部分 我正在尝试从 tk 迁移到 pyqt 但我在 pyqt 方面没有太多经验 我正在尝试生成 Tkinter 字符串变量列表来存储条目 def generate stringvars self temp ent
  • PHP date() 函数没有给出正确的时间

    我试图找出为什么 php date 给我错误的时间 将实际时间设置为 2 小时 这给出了 2011 01 01 03 14 04 而不是 2011 01 01 05 14 04 小时减少 2 我没有更改 date 的时区 当用户访问该网站时
  • 如何从 dict 中获取值列表?

    如何获取 Python 字典中的值列表 在 Java 中 以列表形式获取 Map 的值就像执行以下操作一样简单list map values 我想知道 Python 中是否有一种类似的简单方法可以从字典中获取值列表 dict values返
  • 指向整数数组的指针与指向整数的双指针

    我本以为整数数组是指向整数的指针类型 因此这意味着指向整数数组的指针是指向整数的双指针类型 但我得到的结果却表明事实并非如此 我怀疑整数数组类型不是指向整数的指针类型 这是我的例子 int main int p 3 1 2 3 int pt
  • 迭代时从哈希集中删除元素[重复]

    这个问题在这里已经有答案了 所以 如果我尝试从 Java 中删除元素HashSet迭代时 我得到并发修改异常 从数组中删除元素子集的最佳方法是什么HashSet就像下面的例子一样 Set
  • 在云功能中,我如何从另一个集合加入以获取数据?

    我正在使用云功能向移动设备发送通知 我在 Firestore 中有两个集合clientDetail and clientPersonalDetail 我有clientID两个集合中相同 但日期存储在clientDetail名称存储在 cli
  • Oracle SQL:如何显示空周/没有数据的周?

    如同这个问题 但我的数据集还有一个包含许多 ID 的附加列 每个 ID 都有一个按恒定时间范围回溯的数据集 并且某些周可能会丢失数据 我想填写丢失周的值 例如 我想要这个 ID WEEKEND DAY VALUE A00 2012 01 0
  • Android USBHost 模式 - 为什么我的 IRDA 设备在 ClaimInterface 上失败?

    我的 Xperia Neo Cyanogen Mod 9 连接了一个 Lindy IRDA USB 桥接器 我已经更改了功能以支持主机模式等 代码中一切看起来都很好 我检测到该设备 我可以看到接口和两个端点 一进一出 但是一旦我尝试声明接口
  • 列表框(JList)不会从自定义 ListModel 动态更新

    我正在使用 Seesaw 在 Clojure 中开发 GUI 应用程序 并且在我的自定义 ListModel 更新时无法更新列表框 Java 中的 JList 这是我的一些代码 deftype ActionHistoryListModel
  • python 中多线程应用程序中的分段错误

    我在 python 中有一个多线程应用程序 其中我创建了多个生产者线程 它们从数据库中提取数据 数据以块的形式提取 因此 线程创建具有限制值的sql语句的部分被保留在锁内 为了让线程同时执行查询 query 函数被保留在锁之外 然后结果获取
  • C#/WPF - 我无法从后台工作人员更新 UI

    我有一个代码可以使用以下命令从特定 Twitter 帐户获取推文推特锐利库 创建自定义实例UserControl并发布推文UserControl然后将其添加到StackPanel 但是 我必须收到很多推文 并且在向应用程序添加用户控件时 应
  • 按自定义顺序按键对数组进行排序

    我有以下多维数组 June 2015 gt LOW gt 160 50 MEDIUM gt 0 00 HIGH gt 60 80 July 2015 gt MEDIUM gt 226 00 HIGH gt 263 00 LOW gt 121
  • 如何将日期格式从 YYYY/MM/DD 更改为 DD/MM/YYYY

    我有一列日期 读作character值 是的 它们应该是相同的 str df date date chr 30 08 2017 30 08 2017 30 08 2017 30 08 2017 然后我将这些值转换为Date format s
  • 复制到剪贴板,无需 Flash

    我找到了许多复制到剪贴板的解决方案 但它们要么带有闪存 要么用于网站端 我正在寻找自动复制到剪贴板的方法 无需闪存 对于用户端 它用于用户脚本 当然还有跨浏览器 如果没有 Flash 这在大多数浏览器中都是不可能的 用户的剪贴板是与安全相关
  • htaccess:删除 .php 扩展名

    我有一个名为 Show php 的文件 我想删除这个的 php 扩展名 如果有人请求 Show php 将他重定向到没有 php 扩展名的页面 这是我的 htaccess 但它不会将用户重定向到没有扩展名的页面 RewriteCond RE
  • 将字符串中的所有字母加 1 [关闭]

    Closed 这个问题需要多问focused 目前不接受答案 当我输入 abc 我想得到 bcd 作为输出 所以我想要A to be B and B to be C等等直到Z这将是A 那么我该怎么做呢 我一点也不知道 您可以使用transl
  • 如何修改 Elastislide 使其无限循环

    我一直在寻找一种图像轮播 它可以一次显示多个图像 具有响应能力并且可以无限循环 Elastislide 似乎是最合适的 http tympanus net Development Elastislide index2 html 我能找到的唯
  • 未捕获的类型错误:无法调用未定义的方法“请求”

    在我的 JavaScript 代码中 我不断收到以下错误 Uncaught TypeError Cannot call method request of undefined 我的 JavaScript 如下 任何帮助将不胜感激 myJso
  • 如何以(线程)安全的方式跟踪 TPL 管道中的故障项

    我正在使用 TPL 管道设计和 Stephen Cleary 的管道设计尝试库简而言之 它包装了值 异常并将其沿着管道浮动 因此 即使是在处理方法中抛出异常的项目 最后当我await resultsBlock Completion have