如何恢复打乱的数据流管道的顺序?

2024-02-23

我有一个数据流管道,由多个处理异构文档(XLS、PDF 等)的块组成。每种类型的文档均由专门的人员处理TransformBlock。在管道的末端我有一个ActionBlock它接收所有处理后的文档,并将它们一一上传到网络服务器。我的问题是,我找不到一种方法来满足按照最初在管道中输入的顺序上传文档的要求。例如我不能使用EnsureOrdered https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblockoptions.ensureordered这个选项对我有利,因为此选项配置单个块的行为,而不是并行工作的多个块的行为。我的要求是:

  1. 按特定顺序将文档插入管道中。
  2. 根据每个文档的类型,对每个文档进行不同的处理。
  3. 特定类型的文件应按顺序处理。
  4. 不同类型的文档可以(并且应该)并行处理。
  5. 所有文件应在处理后尽快上传。
  6. 文档必须按顺序上传,并按照它们进入管道的顺序相同。

例如,要求文档#8必须在文档#7之后上传,即使它是在文档#7之前处理的。

第五个需求意味着我不能等待所有文档处理完毕,然后按索引排序,最后上传。上传必须与处理同时进行。

这是我正在尝试做的一个最小的例子。为简单起见,我不会向块提供以下实例IDocument接口,但带有简单的整数。每个整数的值代表它进入管道的顺序以及必须上传的顺序:

var xlsBlock = new TransformBlock<int, int>(document =>
{
    int duration = 300 + document % 3 * 300;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
    int duration = 100 + document % 5 * 200;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});

var uploader = new ActionBlock<int>(async document =>
{
    Console.WriteLine($"Uploading document #{document}");
    await Task.Delay(500); // Simulate I/O-bound work
});

xlsBlock.LinkTo(uploader);
pdfBlock.LinkTo(uploader);

foreach (var document in Enumerable.Range(1, 10))
{
    if (document % 2 == 0)
        xlsBlock.Post(document);
    else
        pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
    .ContinueWith(_ => uploader.Complete());

await uploader.Completion;

输出是:

Uploading document #1
Uploading document #2
Uploading document #3
Uploading document #5
Uploading document #4
Uploading document #7
Uploading document #6
Uploading document #9
Uploading document #8
Uploading document #10

(在小提琴上尝试一下 https://dotnetfiddle.net/JGiTcI)

理想的顺序是#1、#2、#3、#4、#5、#6、#7、#8、#9、#10。

在将已处理文档发送到之前,如何恢复它们的顺序uploader block?

澄清:通过替换多个特定的管道来彻底改变管道的模式TransformBlocks 具有单个泛型TransformBlock,不是一个选项。理想的情况是拦截处理器和上传器之间的单个块,这将恢复文档的顺序。


uploader应该将文档添加到已完成文档的某种排序列表中,并检查添加的文档是否是下一个应该上传的文档。如果应该,则从排序列表中删除并上传所有文档,直到缺少一个。

还有一个同步问题。对该排序列表的访问必须跨线程同步。但您希望所有线程都执行某些操作,而不是等待其他线程完成其工作。所以,uploader应该像这样使用列表:

  • 在同步锁定内将新文档添加到列表中,然后释放锁定
  • In a loop
    • 再次输入相同的同步锁,
    • if upload_in_progress设置标志然后不执行任何操作并返回。
    • check if document on top of the list should be uploaded,
      • 如果没有则重置upload_in_progress标记,然后返回。
      • 否则从列表中删除该文档,
      • set upload_in_progress flag,
      • 释放锁,
      • 上传文档。

我希望我的想象是对的。正如您所看到的,要使其既安全又高效,这很棘手。在大多数情况下,肯定有一种方法可以只用一个锁来完成此操作,但它不会提高太多效率。这upload_in_progress标志在任务之间共享,就像列表本身一样。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何恢复打乱的数据流管道的顺序? 的相关文章

  • setContextProperty 和对象的 setProperty 之间的区别

    我现在真的很困惑 有什么区别 QQmlApplicationEngine engine engine rootContext setContextProperty myObject userData and object gt setPro
  • 如何使用 MVVM 更新 WPF 中编辑的数据? [复制]

    这个问题在这里已经有答案了 我正在为聊天应用程序构建 UI 设计 在尝试更新所选联系人的消息时遇到问题 选择现有联系人 选择编辑选项 然后编辑其属性 例如用户名和图像 后 唯一进行的更改是联系人的用户名和图像 我仍然想更改 MessageM
  • 使用 OpenGL 着色器进行数学计算 (C++)

    我有一个矩阵 例如 100x100 尺寸 我需要对每个元素进行计算 matrix i j tt 8 5例如 我有一个巨大的矩阵 我想使用 OpenGL 着色器来实现该算法 我想使用着色器 例如 uniform float val unifo
  • Qt 计算和比较密码哈希

    目前正在 Qt 中为测验程序构建面向 Web 的身份验证服务 据我了解 在数据库中存储用户密码时 必须对其进行隐藏 以防落入坏人之手 流行的方法似乎是添加的过程Salt https en wikipedia org wiki Salt cr
  • 是否有像 gccxml 这样的用于生成包装器的 C 标头解析器工具?

    我需要为一种新的编程语言编写一些 C 标头包装器 并且想要类似 gccxml 的东西 但不完全依赖 gcc 以及它在 Windows 系统上带来的问题 只需要读C而不是C 只要有完整的文档记录 任何格式的输出都可以 Linux Solari
  • 存储过程上的 OdbcCommand - 输出参数上出现“未提供参数”错误

    我正在尝试执行存储过程 通过 ODBC 驱动程序针对 SQL Server 2005 但收到以下错误 过程或函数 GetNodeID 需要参数 ID 但未提供该参数 ID 是我的过程的 OUTPUT 参数 在存储过程中指定了一个输入 mac
  • 我可以仅在少数情况下关闭模拟吗

    我有一个始终使用模拟的应用程序 但是 当用户以管理员身份登录时 一些操作需要他们写入服务器本身 现在 如果这些用户在实际服务器上没有权限 有些用户没有 则不会让他们写入 我想做的是关闭几个命令的模拟 有没有办法做这样的事情 using Ho
  • 操纵 setter 以避免 null

    通常我们有 public string code get set 如果最终有人将代码设置为 null 我需要避免空引用异常 我尝试这个想法 有什么帮助吗 public string code get set if code null cod
  • 无法加载文件或程序集“EntityFramework,版本=6.0.0.0”

    我究竟做错了什么 我该如何解决这个问题 我有一个包含多个项目的解决方案 它是一个 MVC NET 4 5 Web 应用程序 在调试模式下启动后调用其中一个项目时 出现此错误 导致此错误的项目具有以下参考 两个都是版本6 0 0 0 应用程序
  • 格式化货币

    在下面的示例中 逗号是小数点分隔符 我有这个 125456 89 我想要这个 125 456 89 其他示例 23456789 89 gt 23 456 789 89 Thanks 看看这个例子 double value 12345 678
  • 更改 IdentityServer4 实体框架表名称

    我正在尝试更改由 IdentityServer4 的 PersistedGrantDb 和 ConfigurationDb 创建的默认表名称 并让实体框架生成正确的 SQL 例如 而不是使用实体IdentityServer4 EntityF
  • C#中Enum中定义的value__是什么

    What value 可能在这里 value MSN ICQ YahooChat GoogleTalk 我运行的代码很简单 namespace EnumReflection enum Messengers MSN ICQ YahooChat
  • 在VisualStudio DTE中,如何获取ActiveDocument的内容?

    我正在 VisualStudio 中编写脚本 并尝试获取当前 ActiveDocument 的内容 这是我当前的解决方案 var visualStudio new API VisualStudio 2010 var vsDTE visual
  • 从事务范围调用 WCF 服务方法

    我有这样的代码 using TransactionScope scope TransactionScopeFactory CreateTransactionScope some methodes calls for which scope
  • ASP.NET Core Razor Page 多路径路由

    我正在使用 ASP NET Core 2 0 Razor Pages 不是 MVC 构建系统 但在为页面添加多个路由时遇到问题 例如 所有页面都应该能够通过 abc com language 访问segment shop mypage 或
  • 如何获取 QIcon 的文件/资源​​路径

    假设我做了这样的事情 QIcon myIcon resources icon ico 我稍后如何确定该图标的路径 例如 QString path myIcon getPath 问题是 没有getPath 会员 我找不到类似的东西 但肯定有办
  • 在 C++ 和 Windows 中使用 XmlRpc

    我需要在 Windows 平台上使用 C 中的 XmlRpc 尽管我的朋友向我保证 XmlRpc 是一种 广泛可用的标准技术 但可用的库并不多 事实上 我只找到一个库可以在 Windows 上执行此操作 另外一个库声称 您必须做很多工作才能
  • C# 粘贴到文本框时检查剪贴板中的字符

    有没有一些方法可以在粘贴到文本框 C 之前仅检查剪贴板中的字符 Ctrl V 和右键单击 gt 粘贴 但不使用 MaskedTextbox 在文本框文本更改中添加规则以仅接受数字 例如 private string value privat
  • c# 模拟 IFormFile CopyToAsync() 方法

    我正在对一个异步函数进行单元测试 该函数将 IFormFile 列表转换为我自己的任意数据库文件类列表 将文件数据转换为字节数组的方法是 internal async Task
  • FindAsync 很慢,但是延迟加载很快

    在我的代码中 我曾经使用加载相关实体await FindAsync 希望我能更好地遵守 C 异步指南 var activeTemplate await exec DbContext FormTemplates FindAsync exec

随机推荐