使用 C# 读取数百万个小文件

2024-04-27

我有数百万个每天生成的日志文件,我需要读取所有这些文件并将其放在一起作为单个文件,以便在其他应用程序中对其进行一些处理。

我正在寻找最快的方法来做到这一点。目前我正在使用线程、任务和并行,如下所示:

Parallel.For(0, files.Length, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>
{
    ReadFiles(files[i]);
});

void ReadFiles(string file)
{
    try
    {
        var txt = File.ReadAllText(file);
        filesTxt.Add(tmp);
    }
    catch { }
    GlobalCls.ThreadNo--;
}

or

foreach (var file in files)
{
    //Int64 index = i;
    //var file = files[index];
    while (Process.GetCurrentProcess().Threads.Count > 100)
    { 
        Thread.Sleep(100);
        Application.DoEvents();
    }
    new Thread(() => ReadFiles(file)).Start();
    GlobalCls.ThreadNo++;
    // Task.Run(() => ReadFiles(file));      
}

问题是,读取几千个文件后,读取速度越来越慢!

知道为什么吗?读取数百万个小文件的最快方法是什么?谢谢。


看起来您正在将所有文件的内容加载到内存中,然后再将它们写回单个文件。这可以解释为什么这个过程随着时间的推移变得更慢。

优化该过程的一种方法是将读取部分与写入部分分开,并并行进行。这称为生产者-消费者模式。它可以通过以下方式实现Parallel类,或线程,或任务,但我将演示基于强大的实现TPL 数据流库 https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library,特别适合这样的工作。

private static async Task MergeFiles(IEnumerable<string> sourceFilePaths,
    string targetFilePath, CancellationToken cancellationToken = default,
    IProgress<int> progress = null)
{
    var readerBlock = new TransformBlock<string, string>(async filePath =>
    {
        return File.ReadAllText(filePath); // Read the small file
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 2, // Reading is parallelizable
        BoundedCapacity = 100, // No more than 100 file-paths buffered
        CancellationToken = cancellationToken, // Cancel at any time
    });

    StreamWriter streamWriter = null;

    int filesProcessed = 0;
    var writerBlock = new ActionBlock<string>(text =>
    {
        streamWriter.Write(text); // Append to the target file
        filesProcessed++;
        if (filesProcessed % 10 == 0) progress?.Report(filesProcessed);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1, // We can't parallelize the writer
        BoundedCapacity = 100, // No more than 100 file-contents buffered
        CancellationToken = cancellationToken, // Cancel at any time
    });

    readerBlock.LinkTo(writerBlock,
        new DataflowLinkOptions() { PropagateCompletion = true });

    // This is a tricky part. We use BoundedCapacity, so we must propagate manually
    // a possible failure of the writer to the reader, otherwise a deadlock may occur.
    PropagateFailure(writerBlock, readerBlock);

    // Open the output stream
    using (streamWriter = new StreamWriter(targetFilePath))
    {
        // Feed the reader with the file paths
        foreach (var filePath in sourceFilePaths)
        {
            var accepted = await readerBlock.SendAsync(filePath,
                cancellationToken); // Cancel at any time
            if (!accepted) break; // This will happen if the reader fails
        }
        readerBlock.Complete();
        await writerBlock.Completion;
    }

    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
    {
        try { await block1.Completion.ConfigureAwait(false); }
        catch (Exception ex)
        {
            if (block1.Completion.IsCanceled) return; // On cancellation do nothing
            block2.Fault(ex);
        }
    }
}

使用示例:

var cts = new CancellationTokenSource();
var progress = new Progress<int>(value =>
{
    // Safe to update the UI
    Console.WriteLine($"Files processed: {value:#,0}");
});
var sourceFilePaths = Directory.EnumerateFiles(@"C:\SourceFolder", "*.log",
    SearchOption.AllDirectories); // Include subdirectories
await MergeFiles(sourceFilePaths, @"C:\AllLogs.log", cts.Token, progress);

The BoundedCapacity https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblockoptions.boundedcapacity用于控制内存使用。

如果磁盘驱动器是SSD,您可以尝试使用MaxDegreeOfParallelism https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.executiondataflowblockoptions.maxdegreeofparallelism大于2。

为了获得最佳性能,您可以考虑写入与包含源文件的驱动器不同的磁盘驱动器。

TPL 数据流库可用作一套 https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/适用于 .NET Framework,并且内置于 .NET Core。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 C# 读取数百万个小文件 的相关文章

  • 起订量要求?违背了目的?

    是否需要虚拟化您想要模拟的所有属性访问器就违背了模拟的目的 我的意思是 如果我必须修改我的对象并虚拟化我想要模拟的每个访问器 我难道不能继承我的类并自己模拟它吗 你的问题非常有效 但如果你仔细想想 没有其他方法可以模拟课程 如果你采用一个接
  • 为什么 VB.NET 和 C# 中针对值检查 null 存在差异?

    In VB NET http en wikipedia org wiki Visual Basic NET有时候是这样的 Dim x As System Nullable Of Decimal Nothing Dim y As System
  • 将图像文件从网址复制到本地文件夹?

    我有该图像的网址 例如 http testsite com web abc jpg http testsite com web abc jpg 我想将该 URL 复制到 c images 中的本地文件夹中 而且当我将该文件复制到文件夹中时
  • 使用API​​隐藏程序标题栏

    它可以使用 c 和 windows api 删除窗口控制台标题栏 如果是的话如何 请 这个简单的应用程序隐藏并显示其所在控制台的标题栏 它会立即将控制台标题更改为 guid 以查找窗口句柄 然后 它使用 ToggleTitleBar 使用找
  • Java - 同步方法导致程序大幅减慢

    我正在尝试了解线程和同步 我做了这个测试程序 public class Test static List
  • Qt 计算和比较密码哈希

    目前正在 Qt 中为测验程序构建面向 Web 的身份验证服务 据我了解 在数据库中存储用户密码时 必须对其进行隐藏 以防落入坏人之手 流行的方法似乎是添加的过程Salt https en wikipedia org wiki Salt cr
  • 如何在 Asp.net Gridview 列中添加复选框单击事件

    我在 asp 中有一个 gridview 其中我添加了第一列作为复选框列 现在我想选择此列并获取该行的 id 值 但我不知道该怎么做 这是我的 Aspx 代码
  • Paradox 表 - Oledb 异常:外部表不是预期的格式

    我正在使用 Oledb 从 Paradox 表中读取一些数据 我遇到的问题是 当我将代码复制到控制台应用程序时 代码可以工作 但在 WinForms 中却不行 两者都以 x86 进行调试 我实际上只是复制代码 在 WinForms 应用程序
  • 矩阵向量变换

    我正在编写一个代码来制作软件蒙皮器 骨骼 皮肤动画 并且我正处于 优化 阶段 蒙皮器工作得很好 并且在 Core 上 1 09 毫秒内对 4900 个三角形网格与 22 个骨骼进行蒙皮Duo 2 Ghz 笔记本 我需要知道的是 1 有人可以
  • 操纵 setter 以避免 null

    通常我们有 public string code get set 如果最终有人将代码设置为 null 我需要避免空引用异常 我尝试这个想法 有什么帮助吗 public string code get set if code null cod
  • 编译器错误“错误:在文件范围内可变地修改了‘字符串’”

    考虑 include
  • 允许使用什么类型的内容作为 C 预处理器宏的参数?

    老实说 我很了解 C 编程语言的语法 但对 C 预处理器的语法几乎一无所知 尽管我有时在编程实践中使用它 所以问题来了 假设我们有一个简单的宏 它扩展为空 define macro param 可以放入宏调用构造中的语法有哪些限制 调用宏时
  • 从事务范围调用 WCF 服务方法

    我有这样的代码 using TransactionScope scope TransactionScopeFactory CreateTransactionScope some methodes calls for which scope
  • 错误左值需要作为赋值C++的左操作数

    整个程序基本上只允许用户移动光标 如果用户位于给定的坐标范围 2 2 内 则允许用户键入输入 我刚刚提供了一些我认为足以解决问题的代码 我不知道是什么导致了这个问题 你能解释一下为什么会发生吗 void goToXY int int 创建一
  • 设计 Javascript 前端 <-> C++ 后端通信

    在我最近的将来 我将不得不制作一个具有 C 后端和 Web 前端的系统 要求 目前 我对此了解不多 我认为前端将触发数据传输 而不是后端 所以不需要类似 Comet 的东西 由于在该领域的经验可能很少 我非常感谢您对我所做的设计决策的评论
  • 不兼容的类型 - 是因为数组已经是指针吗?

    在下面的代码中 我创建一个基于书籍结构的对象 并让它保存多个 书籍 我设置的是一个数组 即定义 启动的对象 然而 每当我去测试我对指针的了解 实践有帮助 并尝试创建一个指向创建的对象的指针时 它都会给我错误 C Users Justin D
  • “int i=1,2,3”和“int i=(1,2,3)”之间的区别 - 使用逗号运算符的变量声明[重复]

    这个问题在这里已经有答案了 int i 1 2 3 int i 1 2 3 int i i 1 2 3 这些说法有什么区别 我无法找出任何具体原因 Statement 1 Result Compile error 运算符的优先级高于 运算符
  • 纯虚函数可能没有内联定义。为什么?

    纯虚函数是那些虚函数并且具有纯说明符 0 第 10 4 条第 2 款C 03 的内容告诉我们什么是抽象类 顺便说一句 如下 注意 函数声明不能 同时提供纯说明符和定义 尾注 示例 struct C virtual void f 0 ill
  • 如何将对象转换为传递给函数的类型?

    这不会编译 但我想做的只是将对象转换为传递给函数的 t public void My Func Object input Type t t object ab TypeDescriptor GetConverter t ConvertFro
  • C++ [Windows] 可执行文件所在文件夹的路径[重复]

    这个问题在这里已经有答案了 我需要访问一些文件fstream在我的 Windows 上的 C 应用程序中 这些文件都位于我的exe文件所在文件夹的子文件夹中 获取当前可执行文件的文件夹路径的最简单且更重要的 最安全的方法是什么 Use 获取

随机推荐