如何正确并行化严重依赖 I/O 的作业

2024-06-28

我正在构建一个必须处理大量数据的控制台应用程序。

基本上,应用程序从数据库获取引用。对于每个引用,解析文件的内容并进行一些更改。这些文件是 HTML 文件,该过程正在使用 RegEx 替换进行繁重的工作(查找引用并将其转换为链接)。然后结果存储在文件系统上并发送到外部系统。

如果我按顺序恢复该过程:

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}

我的程序运行正常,但速度很慢。这就是为什么我想并行化这个过程。

到目前为止,我做了一个简单的并行化,添加了 AsParallel :

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref); 
    var html = File.ReadAllText(filePath); 
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); 
    SendToWs(ref, convertedHtml);
});

这个简单的改变减少了过程的持续时间(减少了 25% 的时间)。然而,我对并行化的理解是,如果对依赖 I/O 的资源进行并行化,则不会有太多好处(或更糟糕的是,好处更少),因为 I/O 不会神奇地加倍。

这就是为什么我认为我应该改变我的方法,而不是并行化整个过程,而是创建依赖的链式排队任务。

即,我应该创建一个类似的流程:

队列读取文件。完成后,队列 ParseHtml。完成后,Queue 既发送到 WS,又写入本地。完成后,记录结果。

然而,我不知道如何实现这样的想法。

我觉得它会以一组消费者/生产者队列结束,但我没有找到正确的样本。

而且,我不确定是否会有好处。

谢谢你的建议

[Edit]事实上,我是使用 c# 4.5 的完美人选……只要它是 rtm 就好了:)

[Edit 2]另一件让我认为并行化不正确的事情是,在资源监视器中,我看到 CPU、网络 I/O 和磁盘 I/O 的图表不稳定。当一个为高时,其他为低到中等


您没有在任何代码中利用任何异步 I/O API。你所做的一切都受 CPU 限制,所有 I/O 操作都会阻塞地浪费 CPU 资源。AsParallel适用于计算密集型任务,如果您想利用异步 I/O,则需要在 BeginXXX/EndXXX您正在使用的基于 I/O 的类上的方法,并在可用时利用这些方法。

初学者请阅读这篇文章:TPL TaskFactory.FromAsync 与具有阻塞方法的任务 https://stackoverflow.com/questions/5018897/tpl-taskfactory-fromasync-vs-tasks-with-blocking-methods/5073816#5073816

接下来,你不想使用AsParallel无论如何在这种情况下。AsParallel启用流式传输,这将导致立即为每个项目安排一个新任务,但您在这里不需要/不希望这样做。通过使用分区工作,您会得到更好的服务Parallel::ForEach.

让我们看看如何利用这些知识在您的特定情况下实现最大并发性:

var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
    refs,
    ref =>
{ 
    string filePath = GetFilePath(ref);

    byte[] fileDataBuffer = new byte[1048576];

    // Need to use FileStream API directly so we can enable async I/O
    FileStream sourceFileStream = new FileStream(
                                      filePath, 
                                      FileMode.Open,
                                      FileAccess.Read,
                                      FileShare.Read,
                                      8192,
                                      true);

    // Use FromAsync to read the data from the file
    Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                             sourceFileStream.BeginRead
                                             sourceFileStream.EndRead
                                             fileDataBuffer,
                                             fileDataBuffer.Length,
                                             null);

    // Add a continuation that will fire when the async read is completed
    readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
    {
        int soureFileStreamBytesRead;

        try
        {
            // Determine exactly how many bytes were read 
            // NOTE: this will propagate any potential exception that may have occurred in EndRead
            sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
        }
        finally
        {
            // Always clean up the source stream
            sourceFileStream.Close();
            sourceFileStream = null;
        }

        // This is here to make sure you don't end up trying to read files larger than this sample code can handle
        if(sourceFileStreamBytesRead == fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
        }

        // Convert the file data to a string
        string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

        // Parse the HTML
        string convertedHtml = ParseHtml(html);

        // This is here to make sure you don't end up trying to write files larger than this sample code can handle
        if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
        }

        // Convert the file data back to bytes for writing
        Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

        // Need to use FileStream API directly so we can enable async I/O
        FileStream destinationFileStream = new FileStream(
                                               destinationFilePath,
                                               FileMode.OpenOrCreate,
                                               FileAccess.Write,
                                               FileShare.None,
                                               8192,
                                               true);

        // Use FromAsync to read the data from the file
        Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                  destinationFileStream.BeginWrite,
                                                  destinationFileStream.EndWrite,
                                                  fileDataBuffer,
                                                  0,
                                                  fileDataBuffer.Length,
                                                  null);

        // Add a continuation that will fire when the async write is completed
        destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
        {
            try
            {
                // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                destinationFileStreamWriteAntecedent.Wait();
            }
            finally
            {
                // Always close the destination file stream
                destinationFileStream.Close();
                destinationFileStream = null;
            }
        },
        TaskContinuationOptions.AttachedToParent);

        // Send to external system **concurrent** to writing to destination file system above
        SendToWs(ref, convertedHtml);
    },
    TaskContinuationOptions.AttachedToParent);
});

现在,这里有一些注意事项:

  1. 这是示例代码,因此我使用 1MB 缓冲区来读/写文件。这对于 HTML 文件来说是过多的并且浪费系统资源。您可以降低它以满足您的最大需求,也可以将链式读/写实现到 StringBuilder 中,这是我留给您的练习,因为我要编写大约 500 行以上的代码来执行异步链式读/写。 :P
  2. 您会注意到,在读/写任务的延续中,我有TaskContinuationOptions.AttachedToParent。这非常重要,因为它将阻止工作线程Parallel::ForEach从完成开始工作,直到所有底层异步调用完成。如果不是这样,您将同时启动所有 5000 个项目的工作,这将导致数千个计划任务污染 TPL 子系统,并且根本无法正常扩展。
  3. 我在这里调用 SendToWs 并发将文件写入文件共享。我不知道 SendToWs 实现的基础是什么,但它听起来也像是进行异步的一个很好的候选者。现在假设它是纯粹的计算工作,因此在执行时会消耗 CPU 线程。我将其作为练习留给您,以了解如何最好地利用我向您展示的内容来提高吞吐量。
  4. 这都是自由输入的形式,我的大脑是这里唯一的编译器,SO 的语法高亮是我用来确保语法良好的全部。所以,请原谅任何语法错误,如果我搞砸了任何事情,以至于你无法理解它,请告诉我,我会跟进。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何正确并行化严重依赖 I/O 的作业 的相关文章

随机推荐

  • 核心数据:是否可以在分组中使用自定义函数

    在 Objective C 中制作 NSFetchRequest 时 是否可以按组使用自定义函数 strftime sql语句在sqlite中完全有效 select date count from note group by strftim
  • 捕获 javax.net.debug 到文件

    我需要将创建的 javax net debug all 输出保存到文件中 我正在使用 log4j 并尝试创建一个日志代理 如下面的代码示例所示 但是 它没有获取信息 我不确定 javax net debug 被打印到哪里 我尝试以这种方式捕
  • docker-compose pull 结果为 x509:证书由未知机构签名

    尝试从 dockerhub 提取 elastcisearch 图像时遇到以下错误 docker compose pull Pulling elasticsearch elasticsearch 2 2 0 Pulling repositor
  • 如何正确解决“弱接收器在 ARC 模式下可能会意外地为空”的问题

    我在 xcode 中打开了一个新标志 并收到警告 弱接收器在 ARC 模式下可能不可预测地为空 这让我很困惑 因为它当然可能为零 我一周前问过这个问题 但没有收到任何答复 但格雷格 帕克在邮件列表上回答了它 所以我重新发布答案 我们添加此警
  • 我删除了 Xcode 中的本地化,然后无法添加任何本地化

    我错误地从项目信息中删除了项目的本地化信息 故事板和相关语言都被删除 我从以前的文件中取回了故事板 但是当我想添加新的本地化时 会弹出一个窗口并要求我 选择文件和参考语言来创建英语本地化 并且根本没有资源文件 如何添加回本地化内容 添加回本
  • 在 Excel VBA 中,如何保存/恢复用户定义的过滤器?

    如何使用 VBA 保存并重新应用当前过滤器 在 Excel 2007 VBA 中 我试图 保存用户在当前工作表上拥有的任何过滤器 清除过滤器 做东西 重新应用保存的过滤器 看一下捕获自动筛选状态 http www mrexcel com f
  • wikidata 获取带有项目标签和值的所有属性

    我的问题是如何从 wikidata 最好是通过 SPARQL 提取在网页上呈现的所有属性及其各自的标签 以Google https www wikidata org wiki Q95维基数据上的条目 对于属性 P414 证券交易所 或 P1
  • 将 VB 转换为 C# - My.Application.Info.DirectoryPath

    以下 VB VB NET VisualBasic 语句的最佳 C csharp 等效项是什么 My Application Info DirectoryPath My Computer Clipboard My Computer Audio
  • 使用图像映射生成器的一个好的替代方案是什么? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我有一张大图像 我想让图像的某些部分可单击 我还想指定可点击区域的形状 方形 圆形 自定义 在不依赖 Javascript 的情况下 如何
  • Android 上原始文件夹和 SD 卡之间的 MediaPlayer 问题

    我正在制作一个用于使用搜索栏播放歌曲的应用程序 如果我从原始文件夹播放 它可以工作 但如果我从 SD 卡播放歌曲 它会显示空指针异常 private MediaPlayer mediaPlayer mediaPlayer MediaPlay
  • 为什么要为 RESTful API 创建单独的应用程序?

    Yii 2 的指南中说 虽然不是必需的 但建议您开发 RESTful API 作为一个单独的应用程序 与您的 Web 前端不同 后端更方便维护 Source RESTful Web 服务 快速入门 http www yiiframework
  • 执行源代码时忽略导入错误

    我有一个应用程序 它读取 python 中的测试脚本并将其通过网络发送以在远程 python 实例上执行 由于控制程序不需要运行这些脚本 我不想将测试脚本使用的所有模块安装在控制器的 python 环境中 然而 控制器确实需要来自测试脚本的
  • C# 中带有复选框的 TreeView

    我在 C 中有一个带有复选框的树视图 我希望当用户检查一个节点时 自动检查以下级别上的所有节点 有谁知道如何做到这一点 而无需在每次用户检查某个节点时在所有树上运行递归功能 Thanks 该函数返回树视图 public TreeView G
  • SQL 用逗号替换点

    我有以下代码 SELECT cast Listenpreis 1 19 as decimal 29 2 as Listenpreis FROM SL M03KNE dbo ARKALK 我得到这个值 5 59 我尝试将点替换为 komma
  • Qt QPushButton 样式表悬停

    我有以下按钮样式表 QPushButton hover background qlineargradient x1 0 y1 0 x2 0 y2 1 stop 0 0 ffd9aa stop 0 5 ffbb6e stop 0 55 fea
  • 在 PHP 中你使用复数还是单数来命名你的数组? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 当我命名数组类型变量时 我经常遇到一个困境 我使用复数还是单数命名我的数组 例如 假设我有一个名称数组 在 PHP 中我会说 names arr
  • Kubernetes 在 AWS-EBS 上创建 PersistentVolumeClaim 失败

    我使用以下命令设置了一个包含四个 EC2 实例的 Kubernetes 集群kubeadm Kubernetes 集群工作正常 但当我尝试创建PersistentVolumeClaim 首先我创建了一个StorageClass使用以下 YA
  • WPF 数据触发器和故事板

    当视图模型 演示模型繁忙时 我试图触发进度动画 我有一个 IsBusy 属性 并将 ViewModel 设置为 UserControl 的 DataContext 当 IsBusy 属性为 true 时 触发 progressAnimati
  • 暂停 Web Audio API 声音播放

    如何为我的音频创建暂停功能 我的下面的脚本中已经有一个播放函数 http pastebin com uRUQsgbh http pastebin com uRUQsgbh function loadSound url var request
  • 如何正确并行化严重依赖 I/O 的作业

    我正在构建一个必须处理大量数据的控制台应用程序 基本上 应用程序从数据库获取引用 对于每个引用 解析文件的内容并进行一些更改 这些文件是 HTML 文件 该过程正在使用 RegEx 替换进行繁重的工作 查找引用并将其转换为链接 然后结果存储