MassTransit 将消息速率上限限制为 10

2024-01-10

我设置了一个与 RabbitMQ 配合使用的公共交通消费者服务,但我不知道如何提高消费者的速度 - 它似乎硬性限制为每秒接收 10 条消息。

我已经尝试过此处列出的步骤:https://groups.google.com/forum/#!msg/masstransit-discuss/plP4n2sixrY/xfORgTPqcwsJ https://groups.google.com/forum/#!msg/masstransit-discuss/plP4n2sixrY/xfORgTPqcwsJ,但没有成功 - 将预取和并发消费者设置为 25 除了增加已确认的消息之外没有任何作用,但它不会增加下载消息的速率。

我的配置如下:

ServiceBusFactory.ConfigureDefaultSettings(x =>
    {
        x.SetConcurrentReceiverLimit(25);
        x.SetConcurrentConsumerLimit(25);
    });

_bus = ServiceBusFactory.New(
    sbc =>
        {
            sbc.UseRabbitMq(x => 
                x.ConfigureHost(
                    "rabbitmq://localhost/Dev/consume?prefetch=25",
                    y =>
                        {
                            y.SetUsername(config.Username);
                            y.SetPassword(config.Password);
                        }));
            sbc.UseLog4Net();
            sbc.ReceiveFrom("rabbitmq://localhost/Dev/consume?prefetch=25");
            sbc.Subscribe(x => RegisterConsumers(x, container));
            sbc.UseJsonSerializer();
            sbc.SetConcurrentConsumerLimit(25);
        });

我在两个地方设置并发消费者限制,因为我不确定是否需要在默认设置或总线配置中设置它,并且消费者是通过统一注册的 - 我省略了消费者订阅,因为所有订阅者都是接收。

我有点困惑是否还需要设置其他内容,或者是否需要更改设置配置的顺序。

非常感谢任何帮助。


在与这个问题度过了一个浪漫的夜晚并尝试了克里斯建议的不同方法之后,我发现有完后还有你必须做的事情才能让它正常工作。

具体来说,yes,您需要在消费者队列地址上设置预取:

sbc.UseRabbitMq(
                f =>
                    f.ConfigureHost(
                        new Uri( "rabbitmq://guest:guest@localhost/masstransit_consumer" ),
                        c =>
                        {
                        } )
                );

int pf = 20; // prefetch

// set consumer prefetch (required!)
sbc.ReceiveFrom( string.Format( "rabbitmq://guest:guest@localhost/masstransit_consumer?prefetch={0}", pf ) );

但这还不够。

密钥可以在代码中找到mtstress克里斯在他的答案下面的评论中提到了工具。事实证明该工具调用:

int _t, _ct;
ThreadPool.GetMinThreads( out _t, out _ct );
ThreadPool.SetMinThreads( pf, _ct );

将其添加到我的代码中可以解决该问题。我想知道为什么 MSMQ 传输不需要这样做......

更新#1

经过进一步调查,我发现了可能的罪魁祸首。它位于ServiceBusBuilderImpl.

有一种方法可以提高限制,即ConfigureThreadPool.

这里的问题是它调用CalculateRequiredThreads它应该返回所需的线程数。不幸的是后者返回一个negative在我的客户端 Windows 7 和 Windows Server 上都有价值。就这样ConfigureThreadPool实际上什么都不做,因为调用时负值会被忽略ThreadPool.SetMin/MaxThreads.

这个负值怎么办?看来CalculateRequiredThreads calls ThreadPool.GetMinThreads and ThreadPool.GetAvailableThreads并使用公式得出所需的线程数:

var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);

这里的问题是,在我的机器上,这实际上是:

40 (my limit) + 8 (workerThreads) - 1023 (availableThreads) 

这当然会返回

-975

结论是:上面来自大众交通内部的代码似乎是错误的。当我提前手动提高限制时,ConfigureMinThreads尊重它(因为它仅在高于读取值时设置限制)。

如果没有提前手动设置限制,则无法设置限制,因此代码会执行与默认线程池限制一样多的线程(在我的机器上似乎是 8)。

显然有人认为这个公式会产生

40 + 8 - 8

在默认场景下。为什么GetMinThreads and GetAvailableThreads返回此类不相关的值尚未确定......

更新#2

改变

    static int CalculateRequiredThreads( int consumerThreads )
    {
        int workerThreads;
        int completionPortThreads;
        ThreadPool.GetMinThreads( out workerThreads, out completionPortThreads );
        int availableWorkerThreads;
        int availableCompletionPortThreads;
        ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
        var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
        return requiredThreads;
    }

to

    static int CalculateRequiredThreads( int consumerThreads )
    {
        int workerThreads;
        int completionPortThreads;
        ThreadPool.GetMaxThreads( out workerThreads, out completionPortThreads );
        int availableWorkerThreads;
        int availableCompletionPortThreads;
        ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
        var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
        return requiredThreads;
    }

解决问题。此处均返回 1023,并且公式的输出是正确的预期线程数。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MassTransit 将消息速率上限限制为 10 的相关文章

  • Celery 3.0.1 中的框架错误

    我最近从 2 3 0 升级到 Celery 3 0 1 所有任务都运行良好 很遗憾 我经常收到 帧错误 异常 我还运行主管来重新启动线程 但由于这些线程从未真正被杀死 主管无法知道 celery 需要重新启动 有没有人见过这个 2012 0
  • 当我为rabbitmq-management创建用户时,发生了错误

    当我为rabbitmq创建用户时 root localhost rabbitmqctl add user admin admin 发生错误 消息 Creating user admin Error undef crypto hash sha
  • 如何在 Node js 中保持分叉的子进程处于活动状态

    我想创建一个像带有node的foreverjs一样运行的rabbitmq cli 它可以生成 child process 并使其在后台运行 并且可以随时与 child process 进行通信 我面临的问题是 当主 cli 程序退出时 ch
  • 何时使用 RabbitMQ 铲子以及何时使用 Federation 插件?

    对于我工作的公司 我们希望使用 RabbitMQ 作为我们的主要消息总线 我们的想法是 每个应用程序都使用自己的虚拟主机进行内部通信 并且通过 shovel 或联合插件 我们可以在多个虚拟主机 甚至可能是多台机器 非集群 之间共享某些类型的
  • 在 RabbitMQ 监听器中隐藏运行时异常

    在某些故意发生的情况下 我使用了一些异常来拒绝消息 但在控制台中显示了乍一看似乎不太正常的异常 如何在登录控制台 文件时隐藏该特定异常 我正在使用 spring boot 和默认记录器 public static class Undispa
  • 每次发布后我应该关闭通道/连接吗?

    我在 Node js 中使用 amqplib 但我不清楚代码中的最佳实践 基本上 我当前的代码调用amqp connect 当 Node 服务器启动时 然后为每个生产者和每个消费者使用不同的通道 而不会真正关闭它们中的任何一个 我想知道这是
  • 在rabbitmq配置spring boot中在AMQP中配置多个Vhost

    我正在实现一个项目 我必须在rabbitmq中的不同虚拟主机之间发送消息 使用 SimpleRoutingConnectionFactory 但得到 java lang IllegalStateException 无法确定查找键的目标 Co
  • 为什么需要消息队列来与 Web 套接字聊天?

    我在互联网上看到了很多使用 Web 套接字和 RabbitMQ 进行聊天的示例 https github com videlalvaro rabbitmq chat https github com videlalvaro rabbitmq
  • 死信交换 RabbitMQ 丢弃消息

    我正在尝试在 RabbitMQ 中实现 dlx 队列 场景很简单 我有 2 个队列 1 活着 2 死亡 x dead letter exchange 立即 x message ttl 5000 以及 立即 交换 这必然是 1 活着 我尝试运
  • RabbitMQ 失败,错误:无法连接到节点rabbit@TPAJ05421843:nodedown

    在 Windows 7 Enterprise 计算机上 我全新安装了 Erlang 17 4 和 RabbitMQ 3 4 3 x64 安装成功且顺利 我还没有尝试创建我的第一个队列或交换器 但我已经看到了麻烦 这个问题类似于另一个SO帖子
  • 使用 RabbitMq 锁定和批量获取消息

    我正在尝试以一种更非常规的方式使用 RabbitMq 尽管此时我可以根据需要选择任何其他消息队列实现 消费者不会将 Rabbit 推送消息留给我的消费者 而是连接到一个队列并获取一批 N 条消息 在此期间它会消费一些消息 并可能拒绝一些消息
  • 在 docker-compose 文件中提供rabbitmq.conf会给出“sed:无法重命名/etc/rabbitmq/sedMaHqMa:设备或资源繁忙”

    我的 docker compose 看起来像这样 version 3 2 services mq hostname HOST NAME ports 5671 5671 5672 5672 15671 15671 15672 15672 en
  • 在 Windows 10 和 PHP 7.3 中安装 AMQP

    我想在 Windows 10 中使用 PHP 7 3 安装 AMQP 以便在 symfony 4 中使用 Windows 不使用任何 apache iis nginx 并直接由 symfony 运行 一切还好 直到 我决定在项目中使用rab
  • RabbitMQ Java 客户端自动重新连接

    当我的应用程序失去与 RabbitMQ 的连接时 我将其连接工厂设置为自动尝试并重新连接 ConnectionFactory factory new ConnectionFactory factory setUsername usernam
  • 基于多线程的 RabbitMQ 消费者

    我们有一个 Windows 服务 它监听单个 RabbitMQ 队列并处理消息 我们希望扩展相同的 Windows 服务 以便它可以监听 RabbitMQ 的多个队列并处理消息 不确定使用多线程是否可以实现这一点 因为每个线程都必须侦听 阻
  • 生产者/消费者的不同语言

    我想知道是否可以通过 AMQP 和 RabbitMQ 对生产者和消费者使用不同的语言 例如 Java 代表生产者 python php 代表消费者 或者反之亦然 是的 AMQP 与语言无关 这意味着只要您有可以连接到 AMQP 的客户端sa
  • 服务器在 pika.exceptions.StreamLostError: Stream 连接丢失后关闭

    我的队列中有一些图像 我将每个图像传递到我的 Flask 服务器 在其中完成图像处理 并在我的rabbitmq 服务器中收到响应 收到响应后 我收到此错误 pika exceptions StreamLostError 流连接丢失 104
  • rabbitmq 的 REST API

    有没有办法从 ajax 向 RabbitMQ 发送数据 我的应用程序由数千个 Web 客户端 用 js 编写 和 WCF REST 服务组成 现在我试图弄清楚如何为我的应用程序创建可扩展点 这个想法是有一个rabbitmq实例 它从放置在一
  • 如何在公共交通中记录失败的消息?

    我正在寻找一个好的解决方案来在超出重试限制后立即记录失败消息 而无需处理错误队列 到目前为止我发现了什么 我可以继承InMemory入站消息跟踪器并覆盖是否超出重试限制 但此时除了 id 之外 没有关于消息本身的信息 我可以实施IInbou
  • Celery 广播 vs RabbitMQ 扇出

    我最近一直在使用 Celery 但我不喜欢它 它的配置很混乱 过于复杂并且文档记录很少 我想用 Celery 从单个生产者向多个消费者发送广播消息 让我困惑的是 Celery 术语和底层传输 RabbitMQ 术语之间的差异 在 Rabbi

随机推荐

  • AntDB内存管理之内存上下文之如何使用内存上下文

    5 如何使用内存上下文 使用内存上下文之前 我们需要先对其进行创建 AntDB启动时已经创建并初始化好了部分内存上下文 例如 TopMemoryContext 这个TopMemoryContext是所有内存上下文的父节点或者祖先节点 一般我
  • 开发人员指南从以太坊迁移到 Solana

    这篇文章是关于什么的 以太坊是近期最重要的创新之一 历史上第一次 我们有了一个为社会协调而建立的去中心化全球平台 它有可能彻底改变许多行业 尽管重要 但以太坊的运行环境 以太坊虚拟机 EVM 目前的状态并不是为消费级应用而构建的 它是一个单
  • 这些专利知识你知道吗?

    专利作为一种重要的知识产权保护形式 专利不仅成为了企业核心竞争力的重要组成部分 也成为了国家创新发展的重要支撑 专利是指国家专利主管机关授予发明创造申请人的一种专有权 这种专有权具有独占性 排他性和法律强制性 能够为持有者带来经济利益和竞争
  • 如何给 unplugin-vue-components/vite 写一个简单的 resolver

    大部分工作 unplugin vue components 都已经处理好了 我们只需要接收组件名来判断是否是自己的组件 然后处理对应的导入逻辑 一共 3 个字段 as 重命名类似 import componentNameReName fro
  • The Planets:Venus

    靶场下载 The Planets Venus VulnHub 信息收集 arp scan l Interface eth0 type EN10MB MAC 00 0c 29 43 7c b1 IPv4 192 168 1 60 Starti
  • 详解Nacos和Eureka的区别

    在微服务架构中 服务发现是一个重要的环节 它能够帮助微服务实例进行相互通信 Nacos和Eureka是两种广泛使用的开源服务发现组件 它们在功能和实现上存在一些差异 本文将详细解析Nacos和Eureka在服务发现方面的主要区别 Nacos
  • MQ发送消息和监听消息

    private static List routingKey routingKey的名字 与业务关联 1 发送 rabbitTemplate convertAndSend routingKey 发送的内容 可以是业务代码定义好的实体类 2
  • 如何解读服务器的配置和架构?

    在当今数字化时代 服务器作为企业或组织的重要基础设施 其配置和架构对于保障业务的稳定运行至关重要 如何解读服务器的配置和架构 成为了一个备受关注的话题 本文将围绕服务器配置和架构的解读进行深入探讨 帮助读者更好地理解服务器的性能 扩展性和安
  • 从不同维度的调研数据,看企业数字化转型

    数字化转型逐渐成为企业增长和价值创造的新引擎 然而 在复杂的背景下 企业数字化转型也面临着前所未有的挑战和机遇 未来 我们还能做些什么 怎么做 这成为了各企业高管当前亟需厘清的问题 企业做数字化转型的原因 总体来看 大部分受访企业做数字化转
  • Java中SpringBoot组件集成接入【slf4j日志文档】

    Java中SpringBoot组件集成接入 slf4j日志文档 1 slf4j简介 2 maven依赖 3 配置 4 使用 5 展示 6 参考文章 1 slf4j简介 SLF4J Simple Logg
  • iceberg集成hive,insert失败问题排查与解决

    背景 创建iceberg表成功 CREATE TABLE iceberg test1 i int STORED BY org apache iceberg mr hive HiveIcebergStorageHandler insert数据
  • ❤ Vue3 使用

    Vue3 使用 Vue3之toRefs的使用 作用 toRefs 可以将一个响应式的对象 转换成普通对象 但是转换后的普通对象的每一个属性值都是响应式的 这样我们可以使用es6的对象解构或者三点运算符等操作 代码
  • 自定义编写zabbix_agent脚本

    vi usr lib systemd system zabbix agent servicce Unit Description Zabbix Agent After syslog target After network target S
  • 阿里架构专家力荐:架构修炼宝典,从基础到精通,让您轻松驾驭技术世界

    前言 作为程序员 确定发展方向和路线至关重要 而架构师则是许多人的追求之一 成为架构师并非易事 需要深厚的技术功底 当然 大厂架构师更具吸引力 但进入大厂并担任这一职位需要学习众多技术 或许你现在对此感到迷茫 但市面上已有多条现成的架构技术
  • 阿里技术官亲笔力作:Kafka限量笔记,一本书助你掌握Kafka的精髓

    前言 分布式 堪称程序员江湖中的一把利器 无论面试还是职场 皆是不可或缺的技能 而Kafka 这款分布式发布订阅消息队列的璀璨明珠 其魅力之强大 无与伦比 对于Kafka的奥秘 我们仍需继续探索 要论对Kafka的熟悉程度 恐怕阿里的大佬们
  • R - 计算相似数据集之间的差异(相似性度量)

    我看到很多涉及这个主题的问题 但尚未找到答案 如果我错过了确实回答此问题的问题 请标记此问题并向我们指出该问题 场景 我们有一个基准数据集 我们有插补方法 我们系统地从基准中删除值并使用两种不同的插补方法 因此 我们有一个基准 impula
  • JSDoc - 函数参数应该是对象的属性

    我有一个对象定义了一组操作 如下所示 var actions close action close renderPreview action renderpreview switchToMobile action switchmobile
  • 使用 numpy.polynomial.legendre 时,如何获得将输入转换为勒让德多项式参数的函数?

    import packages we need later import matplotlib pyplot as plt import numpy as np 我在做什么 受此启发question https stackoverflow
  • 为什么我的 js 语音识别不起作用?

    我一直在关注this https javascript plainenglish io how to use speech recognition and speech synthesis in javascript 9bcb213f6dd
  • MassTransit 将消息速率上限限制为 10

    我设置了一个与 RabbitMQ 配合使用的公共交通消费者服务 但我不知道如何提高消费者的速度 它似乎硬性限制为每秒接收 10 条消息 我已经尝试过此处列出的步骤 https groups google com forum msg mass