如何聚合来自异步生产者的数据并将其写入文件?

2023-12-31

我正在学习 C# 中的异步/等待模式。目前我正在尝试解决这样的问题:

  • 有一个生产者(硬件设备)每秒生成 1000 个数据包。我需要将这些数据记录到文件中。

  • 该设备只有一个ReadAsync()一次报告单个数据包的方法。

  • 我需要缓冲数据包并按照它们生成到文件的顺序将它们写入文件,每秒一次。

  • 如果在下一批数据包准备写入时写入过程未及时完成,则写入操作将失败。

到目前为止,我已经写了如下内容。它有效,但我不确定这是否是解决问题的最佳方法。有什么意见或建议吗?解决此类生产者/消费者问题(消费者需要聚合从生产者接收的数据)的最佳实践是什么?

static async Task TestLogger(Device device, int seconds)
{
    const int bufLength = 1000;
    bool firstIteration = true;
    Task writerTask = null;

    using (var writer = new StreamWriter("test.log")))
    {
        do
        {
            var buffer = new byte[bufLength][];

            for (int i = 0; i < bufLength; i++)
            {
                buffer[i] = await device.ReadAsync();
            }

            if (!firstIteration)
            {
                if (!writerTask.IsCompleted)
                    throw new Exception("Write Time Out!");
            }

            writerTask = Task.Run(() =>
                {
                    foreach (var b in buffer)
                        writer.WriteLine(ToHexString(b));
                });

            firstIteration = false;
        } while (--seconds > 0);
    }
}

您可以使用以下想法,前提是刷新的标准是数据包的数量(最多 1000 个)。我没有测试它。它利用了斯蒂芬·克利里的AsyncProducerConsumerQueue<T> http://nitoasyncex.codeplex.com/wikipage?title=AsyncProducerConsumerQueue精选于这个问题 https://stackoverflow.com/questions/21225361/is-there-anything-like-asynchronous-blockingcollectiont.

AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;

// producer
async Task ReceiveAsync(CancellationToken token)
{
    while (true)
    {
       var list = new List<byte>();
       while (true)
       {
           token.ThrowIfCancellationRequested(token);
           var packet = await _device.ReadAsync(token);
           list.Add(packet);
           if (list.Count == 1000)
               break;
       }
       // push next batch
       await _queue.EnqueueAsync(list.ToArray(), token);
    }
}

// consumer
async Task LogAsync(CancellationToken token)
{
    Task previousFlush = Task.FromResult(0); 
    CancellationTokenSource cts = null;
    while (true)
    {
       token.ThrowIfCancellationRequested(token);
       // get next batch
       var nextBatch = await _queue.DequeueAsync(token);
       if (!previousFlush.IsCompleted)
       {
           cts.Cancel(); // cancel the previous flush if not ready
           throw new Exception("failed to flush on time.");
       }
       await previousFlush; // it's completed, observe for any errors
       // start flushing
       cts = CancellationTokenSource.CreateLinkedTokenSource(token);
       previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
    }
}

如果您不想让记录器失败,而是希望取消刷新并继续进行下一批,则可以通过对此代码进行最小的更改来实现。

回复 @l3arnon 评论:

  1. 数据包不是字节,而是字节[]。 2. 你没有使用OP的ToHexString。 3. AsyncProducerConsumerQueue 的健壮性要差得多 比 .Net 的 TPL 数据流进行了测试。 4.等待 previousFlush 错误 就在抛出异常之后,这使得该行变得多余。 等等。简而言之:我认为可能的附加值并不能证明这一点 非常复杂的解决方案。
  1. “数据包不是字节,它是字节[]” - 数据包is一个字节,从OP的代码中可以明显看出:buffer[i] = await device.ReadAsync()。然后,一批数据包byte[].
  2. “你还没有使用OP的ToHexString。” - 目标是展示如何使用Stream.WriteAsync which natively接受取消令牌,而不是WriteLineAsync这不允许取消。使用起来很简单ToHexString with Stream.WriteAsync并仍然利用取消支持:

    var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
        Environment.NewLine);
    _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
    
  3. “AsyncProducerConsumerQueue 远不如 .Net 的 TPL 数据流健壮且经过测试” - 我不认为这是一个确定的事实。但是,如果OP担心的话,他可以使用常规的BlockingCollection,这不会阻塞生产者线程。在等待下一批时阻塞消费者线程是可以的,因为写入是并行完成的。与此相反,您的 TPL Dataflow 版本带有一个多余的CPU 和锁密集型操作:将数据从生产者管道移动到写入者管道logAction.Post(packet),逐字节。我的代码没有这样做。

  4. “在抛出异常后,您会等待 previousFlush 错误,这使得该行变得多余。” - 这条线不是多余的。也许,你忽略了这一点:previousFlush.IsCompleted can be true when previousFlush.IsFaulted or previousFlush.IsCancelled也是true. So, await previousFlush在那里观察任何错误是相关的完全的任务(例如,写入失败),否则将会丢失。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何聚合来自异步生产者的数据并将其写入文件? 的相关文章

  • 并行运行多个任务

    我有一个代理列表 每个代理都会访问不同的站点并从站点中提取所需的数据 目前它一次只做一个 但我希望同时运行 10 20 个任务 这样它就可以一次性从 20 个站点下载 而不是只下载一个 这是我目前正在做的事情 private async T
  • 如何在 C++ 中为指针“this”赋值

    在函数中 如何分配this一个新的价值 您可以分配对象this点于 this XY 但你不能分配直接值this this XY Error Expression is not assignable
  • Visual Studio 2013 调试器显示 std::string 的奇怪值

    我有一个大型的 cmake 生成的解决方案 其中包含许多项目 由于某种原因 我无法查看字符串的内容 因为根据调试器 Bx Buf含有一些垃圾 text c str 正确返回 Hello 该问题不仅仅发生在本地字符串上 返回的函数std st
  • 在 C# 中解析 JS Date.toIsoString

    我需要将 JS 日期存储为 ISO 8601 日期 我目前正在从格式为 2019 06 22T00 00 00 000Z 的表单中获取日期 正如 JS 的 toIsoString 方法所期望的那样 当这个日期传递到我的 API 控制器时 我
  • 维护 VS Test Project 中单元测试方法之间的上下文

    我想按顺序运行以下单元测试 使用随机数字的名称 密码等创建新客户 检索刚刚创建的客户并断言其属性包含相同的随机数 对同一用户调用 ForgotPassword 函数 并使用相同的随机数作为用户名 清楚地看到 我需要生成一次随机数 并在 3
  • 用于 C++ 中图像分析的 OpenCV 二进制图像掩模

    我正在尝试分析一些图像 这些图像的外部周围有很多噪声 但内部有一个清晰的圆形中心 中心是我感兴趣的部分 但外部噪声正在影响我对图像的二进制阈值处理 为了忽略噪音 我尝试设置一个已知中心位置和半径的圆形蒙版 从而使该圆之外的所有像素都更改为黑
  • 注入包含接口的所有已注册实现的 Enumerable

    给出以下接口 public interface IMyProcessor void Process 我希望能够注册多个实现 并让我的 DI 容器将它们的可枚举注入到这样的类中 public class MyProcessorLibrary
  • 托管 ODP.NET 驱动程序未显示在“数据源”对话框中

    在我的计算机上安装托管 ODP NET 后 ODP NET 托管驱动程序没有出现在里面选择数据源Visual Studio 2013 Professional 中的对话框 它应该是这样的 这就是它在我的机器上的实际外观 我已按照 轻松驱动
  • 当我尝试传递临时地址作为参数时,它是一个 UB 吗?

    对于以下 C 代码 include
  • 如何从 Powerpoint 2010 导出电影?

    如何使用 MS Office PIA 主互操作程序集 或其他方式以编程方式将嵌入视频从 powerpoint 2010 导出到外部文件 在演示文稿中嵌入视频是 Powerpoint 2010 中的一项新功能 我找不到解决方案 PPTX 文件
  • 为什么连续抛出 2 个异常不会生成无法访问的代码警告?

    为什么以下代码行不会创建编译器警告 void Main throw new Exception throw new Exception 据我所知 编译器应该通知您无法到达第二个抛出异常 这显然是一个编译器错误 它是在 C 3 0 中引入的
  • 使用任一默认捕获模式时,这是通过复制捕获还是 (*this) 通过引用捕获?是一样的吗?

    当我看到以下工作时我有点困惑 struct A void g void f g 但后来我发现this https stackoverflow com a 16323119 5825294答案非常详细地解释了它是如何工作的 本质上 它归结为t
  • MINIX内部碎片2

    我正在用 C 语言编写一些软件 它递归地列出给定目录中的所有文件 现在我需要计算出内部碎片 我花了很长时间研究这个问题 发现 ext2 上的内部碎片只发生在最后一个块中 我知道理论上你应该能够从索引节点号获得第一个和最后一个块地址 但我不知
  • MPI - 发送和接收列

    我需要从一个进程发送矩阵列并从另一个进程接收它 我尝试运行以下程序 但得到了一个奇怪的结果 至少我这么认为 仅复制矩阵的第一个元素 某些矩阵元素会发生意外变化 include
  • C++网络序列化[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在寻找一种将 C 数据包序列化为网络流的解决方案 我在这里看到很多帖子提到人们 ACE 谷歌协议缓
  • 具有多个父项的 Qt 树模型

    我想构建一棵树 其中一个元素可以引用另一个元素 我想要构建的树是 像这样的东西 A B C D E F P this is a pointer to C D first child of C E second child of C I fo
  • 在一个解决方案中调用不同项目的方法

    1 个解决方案中有 3 个项目 我对第一个项目中的主文件进行的主要操作 但是我需要调用第三个项目中的方法并使用类 例如 第三个项目有 public DataClasses1DataContext base global WindowsFor
  • 使用未命名命名空间而不是静态命名空间

    我可以假设在未命名命名空间中声明的对象相当于static namespace int x 1 static int x 2 FWIK 在这两种情况下 x将具有静态存储期限和内部链接 声明为的对象的所有规则也是如此static适用于未命名名称
  • 如何配置 qt Creator 以显示 C++ 代码而不是反汇编程序?

    昨天我做了很多事情 比如更新 GCC Clang 和重新安装 Qt Creator 今天 在逐步调试我的代码时 调试器显示的是反汇编代码 而不是我编写的 C 代码 紧迫F10 or F11 调试器正在进入汇编代码而不是 cpp nor h我
  • Adobe Illustrator 中的折线简化如何工作?

    我正在开发一个记录笔划的应用程序 您可以使用定点设备来绘制笔划 在上图中 我绘制了一个笔划 其中包含 453 个数据点 我的目标是大幅减少数据点的数量 同时仍然保持原始笔画的形状 对于那些感兴趣的人 上图笔画的坐标可以作为GitHub 上的

随机推荐

  • Keras 不使用 Theano

    1 我创建一个虚拟环境 mkvirtualenv kerasTH 2 我使用安装keraspip install keras 3 这就是输出pip list Package Version h5py 2 10 0 joblib 0 16 0
  • 如何在 Andengine Base 游戏活动中使用比率分辨率策略时覆盖 onSetContentView

    我正在开发 学习构建 一款游戏andengine GLES2 我在用基础游戏活动 并且我覆盖了setContent查看放置我的 admob 广告的视图 除了解决政策之外 一切都正常 比率决议政策是我正在使用的那个CAMERA WIDTH 8
  • gdb:退出程序而不退出gdb

    我正在使用 gdb 调试程序 首先 我load我的可执行文件 然后我continue运行程序 我有时想中断程序的执行 所以我这样做Ctrl C 我的问题是这关闭了both我的程序和gdb 如何在不退出 gdb 的情况下退出程序 您是否尝试过
  • rufus 调度程序未在生产中运行

    我有一个在 nginx 和乘客下运行的 Rails 服务器 我的sheduler rb看起来像这样 require rufus scheduler my awesome job Rufus Scheduler new my awesome
  • 有没有办法在结果查询中显示“是”或“否”而不是 0 或 1? [复制]

    这个问题在这里已经有答案了 我正在尝试创建一个查询 当我选择一行且一列为 0 时 它显示 否 如果为 1 则显示 是 这是一个示例表 SELECT FROM NUMBERS RESULT PRODUCT HAS APPLES 0 GRAPE
  • 当2的幂时如何将除法变成按位移位?

    我有以下需要经常做的划分 int index pos 64 在 CPU 级别 除法可能会很昂贵 我希望有一种方法可以通过按位移位来做到这一点 我还想了解如何从除法到移位 换句话说 我不想只记住按位表达式 int index pos gt g
  • 重命名后项目崩溃

    如果我在 XCode 中的导航器视图中更改 iOS 项目的名称 Xcode 4 中的 Project gt Rename 相当于什么 https stackoverflow com questions 6077876 what is the
  • 如何使用 jQuery animate() 方法使 div 左右移动?

    请看一下这个 http jsfiddle net tmPfV http jsfiddle net tmPfV 如果您单击右侧 则该框将向右移动 如果您单击左侧 则该框将向左移动 但是 如果您再次单击右键 则什么也没有 我怎样才能让它左右移动
  • 尝试解码数据(将 Abs 导出到 MySQL)

    我有数据库表 DROP TABLE translation en lt CREATE TABLE translation en lt id INTEGER lt translation WIDEMEMO BLOBBlockSize 1024
  • 如何使用 Log4j 和 Storm Framework 将日志写入文件?

    我在 Storm 中使用 log4j 记录到文件时遇到了一些问题 在提交我的拓扑之前 即在我的主要方法中 我编写了一些日志语句并使用以下方法配置了记录器 PropertyConfigurator configure myLog4jPrope
  • 组装键盘IO口

    我看过以下内容topic https stackoverflow com questions 219120 x86 assembly protected mode keyboard access 我有兴趣通过 IN OUT 指令联系键盘并设
  • Spring Web Flow - 如何使用对话范围中已有的值设置单元测试?

    我正在开发一个使用 Spring Web Flow 2 0 的项目 我正在尝试对从决策状态开始的流程进行单元测试 决策状态检查位于conversationScope 我不知道如何将值插入到conversationScope用于单元测试 我努
  • 根据系统动态判断整数类型(c++)

    我正在编写一个程序 以每 32 位 即一次 4 个字节 为单位将数据存储到文件中 我在64位Windows系统中编写代码 但我使用的编译器是32位 mingw32 在当前系统中 int和long的大小是相同的 都是32位 4字节 我目前正在
  • 使用 AWK 将多个文件中的列添加到 csv 表

    我希望通过使用 AWK 从多个文件中获取值来构建 csv 表 我让它处理两个文件 但我无法扩展它 我目前正在获取第二个文件的输出 并附加第三个文件 依此类推 以下是示例文件 file1 file2 file3 file4 100 45 1
  • 无法安装包收缩[关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 我跑了 pip install contractions in jupyter notebook并且无法安装库收缩并显示
  • 模式同义词签名:必需与提供的约束

    我想我明白了 不寻常形式 的约束 https downloads haskell org 7Eghc 8 10 5 docs html users guide glasgow exts html typing of pattern syno
  • Reactjs - 输入默认值已设置但未显示

    注意到一些奇怪的现象 即为输入设置了 defaultValue 但有时刷新页面时它不可见 我尝试过 console log 然后组件在加载数据时重新渲染多次 在最后一次重新渲染时 组件包含所需的值 如屏幕截图所示 但未显示 知道为什么吗 谢
  • 关闭 GDB 中设置断点的确认[重复]

    这个问题在这里已经有答案了 在共享库上设置断点 gdb b file c 278 No symbol table is loaded Use the file command Make breakpoint pending on futur
  • 文本编辑器告诉光标位置的索引

    我需要一个文本编辑器来告诉我光标的位置 这样我就可以确定要加载到字符串中的文本范围 不幸的是 我尝试过的文本编辑器 TextWrangler Aquamacs EditPad 只告诉我光标所在的行号以及该行上的字符索引 我需要从文件开头到该
  • 如何聚合来自异步生产者的数据并将其写入文件?

    我正在学习 C 中的异步 等待模式 目前我正在尝试解决这样的问题 有一个生产者 硬件设备 每秒生成 1000 个数据包 我需要将这些数据记录到文件中 该设备只有一个ReadAsync 一次报告单个数据包的方法 我需要缓冲数据包并按照它们生成