如何并行处理MSMQ消息

2024-02-16

我正在编写一个 Windows 服务来使用 MSMQ 消息。该服务将存在高活动期(80k 消息快速传入)和长时间不活动期(可能数天没有新消息)。

处理消息非常受网络限制,因此我从并行性中获得了很大的好处。但在不活动期间,我不想占用一堆线程来等待不会很快到来的消息。

MSMQ 接口似乎非常关注同步工作流程 - 获取一条消息,处理它,获取另一条消息,等等。我应该如何构建我的代码,以便我可以在高活动期间利用并行性,而不是捆绑一堆消息无活动期间的线程数量?使用 TPL 的奖励积分。伪代码将不胜感激。


多年来我已经分配了 MSMQ(包括移动实现),您对“同步工作流程”的描述是正确的。这并不是说您不能获取各种消息信封并通过 TPL 在不同的内核上处理它们……限制因素是读取/写入队列……本质上是串行操作。例如,您无法一次发送 8 条消息(具有 8 核的计算机)。

我有类似的需求(不使用 System.Messaging 命名空间),并在我读过的 Campbell 和 Johnson 所著的《Parallel Programming with Microsoft.NET》一书中的一些帮助下解决了这个问题。

查看他们的“并行任务”章节,特别是使用全局队列与每线程本地队列配合进行工作处理(即 TPL)的部分,该队列使用“工作窃取”算法来执行负载平衡。我的解决方案部分是根据他们的例子进行建模的。我的系统的最终版本在性能上有巨大的差异(从每秒 23 条消息到超过 200 条)。

根据系统从 0 到 80,000 所需的时间,您需要采用相同的设计并将其分布在多个服务器(每个服务器具有多个处理器和多个内核)。理论上,我的设置需要不到 7 分钟的时间来完成所有 80K,因此通过添加第二台计算机,可以将时间减少到大约 3 分 20 秒,等等。诀窍是窃取工作的逻辑。

深思……

快速编辑:顺便说一句,计算机是一台 Dell T7500 工作站,配备双四核 Xeons @ 3GHz、24 GB RAM、Windows 7 Ultimate 64 位版本。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何并行处理MSMQ消息 的相关文章

  • 带有 return 语句的 Julia @parallel for 循环

    如何在满足条件时立即返回所有工作人员的函数中编写并行 for 循环 IE 像这样的东西 function test n sync parallel for i in 1 1000 statement if condition return
  • 将 R 包函数导出到 R 包内的并行集群

    有一些功能 比如function1 在我正在开发的 R 包中 它依赖于辅助函数 例如h function1 and h function2 在我的包裹里 我正在并行化重复调用function1在我的包中的另一个函数中 目前 在我的包中我正在
  • nUnit Assert.That委托并发问题

    我的代码中遇到了一些暂时的死锁 无法解决它 简单的代码 我无法创建一个简单的调用链来重现代码InvokeChangeEvent Test public async void Test sut InvokeChangeEvent foo fi
  • 消息在事务处理时未到达 MSMQ

    我在本地计算机中创建了一个私有 MSMQ 我使用以下 C 代码将消息发送到队列 当我将队列更改为事务性队列时 消息未到达 MSMQ 但是 Send 方法中没有抛出异常 我需要做出什么改变才能使其发挥作用 using System using
  • 使用 Scoop 编程 DEAP

    我在 python 中使用 DEAP 库来解决多目标优化问题 我想使用多个处理器来完成这项任务 但是 我遇到了一些麻烦 为了提供一些背景信息 我将 networkx 与 DEAP 结合使用 我还定义了适应度函数 交叉和变异函数 由于某些原因
  • MessageQueueException (0x80004005):对消息队列系统的访问被拒绝

    我有一个现有的应用程序 可以在 Windows 2003 服务器上正常运行 我已将其移至 Windows 2008r2 当应用程序尝试访问队列时 出现以下错误 我的应用程序池的 Indentity 用户可以完全控制我的消息队列 有谁对如何解
  • 所有任务完成后继续任务

    在某些类中 我想使用 Task 异步加载 2 个集合并停止 busyindicator 我尝试这样的事情 var uiScheduler TaskScheduler FromCurrentSynchronizationContext Wai
  • OpenMP 线程映射到物理内核

    于是我在网上查了一段时间没有结果 我是 OpenMP 的新手 所以不确定这里的术语 但是有没有办法从 OMPThread 由 omp get thread num 给出 和线程将运行的物理核心找出特定机器的映射 我还对 OMP 分配线程的精
  • msmq - 触发器 - 独立可执行调用不起作用

    过去几天我一直在尝试使用 msmq 触发器来调用 exe 文件 它永远不会被调用 这些是我遵循的步骤 创建了一个提及队列路径并检查了查看的触发器 启用 选中 创建了一个不带任何条件的规则 以便每当获得新消息时都会触发触发器 并选择提到独立可
  • 如何向 addrange select 语句添加异步“await”?

    我有一个这样的函数 public async Task
  • 以编程方式更新 ClickOnce 应用程序的部署清单会导致缺少 4.0 中所需的 <兼容框架> 元素

    我正在致力于自动化 NET 4 0 ClickOnce WPF 应用程序的安装程序 该应用程序需要在应用程序配置文件 我经历了寻找必须遵循的具体步骤的棘手过程Mage exe http msdn microsoft com en us li
  • 我可以在 R 中并行读取 1 个大 CSV 文件吗? [复制]

    这个问题在这里已经有答案了 我有一个很大的 csv 文件 需要很长时间才能阅读 我可以使用 parallel 或相关的包在 R 中并行读取此内容吗 我尝试过使用 mclapply 但它不起作用 根据OP的评论 fread来自data tab
  • wait task.delay 有助于加快 UI 刷新速度,但是如何实现呢?

    我有一个视图模型 它正在获取一行记录并显示在 Windows Phone UI 上 这个获取数据的视图模型方法正在执行大量任务 所有任务都标记为等待操作 如下所示 async Task GetData var dataCollection
  • 处理异步并行任务的多个异常

    Problem 多个任务并行运行 所有任务 没有任务或其中任何任务都可能抛出异常 当所有任务完成后 必须报告所有可能发生的异常 通过日志 电子邮件 控制台输出 等等 预期行为 我可以通过 linq 使用异步 lambda 构建所有任务 然后
  • 从delphi应用程序调用.net4.0 com服务器后出现错误异常

    我们正在将代码库从 BDS2006 迁移到 Rad Studio XE 我们发现了一些非常奇怪的行为 如果我们在从 Net4 0 中实现的 COM 服务器创建一些对象后进行无效的浮点运算 即除以零 我们不会没有得到正常异常 即 EDivis
  • 为什么 mex 文件中的 OpenMP 仅产生 1 个线程?

    我是 OpenMP 新手 我有以下代码 使用配置了 MSVS2010 的 Matlab mex 可以正常编译 计算机有 8 个可用处理器 我也使用 matlabpool 检查过 include mex h include
  • 在 C# 中按元素相乘数组具有意想不到的性能

    我想找到按元素相乘两个数组的最佳方法 这是更广泛项目的一部分 其中性能而不是唯一的考虑因素 我今天开始用 C Linqpad 编写一些函数 因此它还没有以任何方式进行优化 下面代码的输出如下 Environment ProcessorCou
  • 如何设置消息队列的所有者?

    System Messaging MessageQueue 类不提供设置队列所有权的方法 如何以编程方式设置 MSMQ 消息队列的所有者 简短的答案是 p invoke 对 windows api 函数的调用MQSetQueueSecuri
  • 是否可以在 OpenCL 中并行运行求和计算?

    我是 OpenCL 的新手 不过 我了解 C C 基础知识和 OOP 我的问题如下 是否可以以某种方式并行运行求和计算任务 理论上可能吗 下面我将描述我尝试做的事情 任务例如是 double values new double 1000 l
  • 安装 Framework 4.5 时以 .NET Framework 4 为目标

    我的计算机上安装了 VS2010 和 VS2012 并安装了 NET Framework 4 0 然后将其升级到 NET Framework 4 5 但是 我仍在开发需要在 NET Framework 4 0 上运行的应用程序 该项目表示

随机推荐