RabbitMQ - 通道关闭:连接错误(SpringXD 重复关闭rabbitmq 连接。)

2024-01-06

我度过了一个糟糕的夜晚,试图弄清楚 RabbitMQ 和 SpringXD 到底发生了什么,不幸的是没有成功。

问题: SpringXD反复关闭RabbitMQ连接, 或报告与通道缓存大小相关的警告。

SpringXD 日志的片段(在流初始化/自动装配期间):

 2016-05-03T07:42:43+0200 1.3.0.RELEASE WARN
 DeploymentsPathChildrenCache-0 listener.SimpleMessageListenerContainer
 - CachingConnectionFactory's channelCacheSize can not be less than the 
 number of concurrentConsumers so it was reset to match: 4

...

 2016-05-03T07:54:17+0200 1.3.0.RELEASE ERROR AMQP Connection
 192.168.120.125:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

 2016-05-03T17:38:58+0200 1.3.0.RELEASE ERROR AMQP Connection
 192.168.120.125:5672 connection.CachingConnectionFactory - Channel shutdown: connection error; protocol method:
 method<connection.close>(reply-code=504, reply-text=CHANNEL_ERROR - 
 second 'channel.open' seen, class-id=20, method-id=10)

RabbitMQ 日志片段:

 =WARNING REPORT==== 3-May-2016::08:08:09 === closing AMQP connection <0.22276.61> (192.168.120.125:59350 -> 192.168.120.125:5672): client
 unexpectedly closed TCP connection

 =ERROR REPORT==== 3-May-2016::08:08:11 === closing AMQP connection 0.15409.61> (192.168.120.125:58527 -> 192.168.120.125:5672):
 {writer,send_failed,{error,closed}}

状态阻塞错误很少见

 =ERROR REPORT==== 3-May-2016::17:38:58 === Error on AMQP connection <0.20542.25> (192.168.120.125:59421 -> 192.168.120.125:5672, vhost:
'/', user: 'xd', state: blocked), channel 7: operation channel.open
caused a connection exception channel_error: "second 'channel.open'
 seen"

我的设置(6 个节点)

- springxd 1.3.0 distributed (zookeeper)  
- RabbitMQ 3.6.0, Erlang R16B03-1 cluster


    ackMode:                   AUTO ## or NONE
    autoBindDLQ:               false
    backOffInitialInterval:    1000
    backOffMaxInterval:        10000
    backOffMultiplier:         2.0
    batchBufferLimit:          10000
    batchingEnabled:           false
    batchSize:                 200
    batchTimeout:              5000
    compress:                  false
    concurrency:               4
    deliveryMode:              NON_PERSISTENT ## or PERSISTENT
    durableSubscription:       false
    maxAttempts:               10
    maxConcurrency:            10
    prefix:                    xdbus.
    prefetch:                  1000
    replyHeaderPatterns:       STANDARD_REPLY_HEADERS,*
    republishToDLQ:            false
    requestHeaderPatterns:     STANDARD_REQUEST_HEADERS,*
    requeue:                   true
    transacted:                false
    txSize:                    1000

春天: 兔子MQ:

addresses:
priv1:5672,priv2:5672,priv3:5672,
priv4:5672,priv5:5672,priv6:5672

adminAddresses:  
http://priv1:15672, http://priv2:15672, http://priv3:15672, http://priv4:15672, http://priv5:15672,http://priv6:15672

nodes: 
rabbit@priv1,rabbit@priv2,rabbit@priv3,
rabbit@priv4,rabbit@priv5,rabbit@priv6

username: xd
password: xxxx
virtual_host: /
useSSL: false

ha-xdbus 政策:

 - ^xdbus\. all  
 - ha-mode: exactly
 - ha-params:   2 
 - queue-master-locator:    min-masters

兔子会议

[
 {rabbit, 
[
     {tcp_listeners, [5672]},
     {queue_master_locator, "min-masters"}
]
}
].

当 ackMode 为 NONE 时,会发生以下情况:

最终消费者的数量下降到零,并且我有一个僵尸流无法从该状态恢复,这反过来又导致不必要的排队。

当 ackMode 为 AUTO 时,会发生以下情况:

有些消息永远没有得到确认。

SpringXD 流和持久队列

Rabbit 模块被用作源或接收器,没有自定义自动接线。

典型的流定义如下:

食入:

event_generator | rabbit --mappedRequestHeaders=XDRoutingKey --routingKey='headers[''XDRoutingKey'']'

处理/接收器:

rabbit --queues='xdbus.INQUEUE-A' | ENRICHMENT-PROCESSOR-A | elastic-sink
rabbit --queues='xdbus.INQUEUE-B' | ENRICHMENT-PROCESSOR-B | elastic-sink

xdbus.INQUEUE-xxx 是从 Rabbit 管理 GUI 手动创建的。 (耐用的)

全局统计数据(来自 RabbitMQ 管理员)

  • 连接数:190
  • 频道:2263(频道缓存问题也许?)
  • 交易所:20
  • 队列:120
  • 消费者:1850

Finally:

如果有人能回答配置有什么问题(我很确定网络运行良好,因此不存在网络问题,也不存在与最大打开文件限制相关的问题),我将不胜感激。

消息速率从 2K/秒到最大 30k/秒不等,负载相对较小。

Thanks!

Ivan


我们看到了一些高速搅动通道时类似的不稳定性 https://stackoverflow.com/questions/35563064/processing-messages-through-namedchannels-with-prefetch-1/35584333#35584333.

解决方法是增加通道缓存大小以避免高搅动率;目前还不清楚不稳定的地方在哪里,但我不相信它是在 Spring AMQP 中。

然而,一个问题是 XD 没有公开channelCacheSize作为财产。

上面链接中的答案有一个解决方法,即通过替换总线配置 XML 来添加属性。增加缓存大小解决了该用户的问题。

我们有一个打开 JIRA 问题以公开该属性 https://jira.spring.io/browse/XD-3747但尚未实施。

我看到您最初将此作为该问题的“答案”发布。

有人可以更具体地解释一下rabbit-bus.xml应该安装在哪里以及为什么会发生这种情况。

正如它所说,您需要将其放在 xd config 目录下:

xd/config/META-INF/spring-xd/bus/rabbit-bus.xml.

EDIT

使用总线扩展机制的技术代替......

$ cat xd/config/META-INF/spring-xd/bus/ext/cf.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg ref="rabbitFactory" />
        <property name="addresses" value="${spring.rabbitmq.addresses}" />
        <property name="username" value="${spring.rabbitmq.username}" />
        <property name="password" value="${spring.rabbitmq.password}" />
        <property name="virtualHost" value="${spring.rabbitmq.virtual_host}" />
        <property name="channelCacheSize" value="${spring.rabbitmq.channelCacheSize:100}" />
    </bean>

</beans>

编辑:测试结果

预填充队列foo有 100 万条消息。

    concurrency:               10
    prefetch:                  1000
    txSize:                    1000

.

xd:>stream create foo --definition "rin:rabbit --concurrency=10 --maxConcurrency=10 --prefetch=1000 --txSize=1000 | t1:transform | t2:transform | rout:rabbit --routingKey='''bar'''" --deploy
Created and deployed new stream 'foo'

因此,通过此配置,我们最终有 40 个消费者。

我在公交车上从未见过超过 29 个发布频道,而水槽中有 10 个发布频道。

100 万条消息传输自foo to bar不到 5 分钟(通过xdbus.foo.0, xdbus.foo.1 and xdbus.foo.2) - 发布了 400 万条消息。

没有错误 - 但我的笔记本电脑需要冷却:D

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ - 通道关闭:连接错误(SpringXD 重复关闭rabbitmq 连接。) 的相关文章

  • spring中rabbitmq监听器的异常处理

    使用spring 我是rabbitmq的新手 我想知道我错在哪里 我编写了一个rabbitmq连接工厂和一个包含侦听器的侦听器容器 我还为侦听器容器提供了错误处理程序 但它似乎不起作用 我的春豆
  • 在 Docker 中使用 RabbitMQ 删除容器

    我尝试使用RabbitMQ启动Docker容器 结果下载了镜像 但容器没有启动 我在日志中收到以下消息 error RABBITMQ DEFAULT PASS is set but deprecated error RABBITMQ DEF
  • 没有连接的 AMQP/RabbitMQ 通道什么时候会死亡?

    我有一个简单的 RabbitMQ 测试程序 随机将消息排队 另一个读取它们 所有这些都使用 Spring AMQP 如果消费者死亡 例如 在没有机会关闭其连接或通道的情况下终止进程 则它尚未确认的任何消息似乎将永远保持未确认状态 我看过很多
  • 与 RabbitMQ 相比,Amazon SQS 的性能较慢

    我想在我的 Web 应用程序中集成消息队列中间层 我测试了 Rabbitmq 和 Amazon SQS 但发现 Amazon SQS 速度很慢 我在 Amazon SQS 中每秒收到 80 个请求 而在 Rabbitmq 中每秒收到 200
  • 如何在 celery task.apply_async 中使用优先级

    我有一个testcelery 中的队列 我为它定义了一个任务 celery app task queue test ignore result True def priority test priority print priority 它
  • Celery 3.0.1 中的框架错误

    我最近从 2 3 0 升级到 Celery 3 0 1 所有任务都运行良好 很遗憾 我经常收到 帧错误 异常 我还运行主管来重新启动线程 但由于这些线程从未真正被杀死 主管无法知道 celery 需要重新启动 有没有人见过这个 2012 0
  • 如何使用自动装配的 Spring Boot 监听多个队列?

    我是 Spring Boot 的新手 正在尝试它 目前我已经构建了一些应用程序 我希望能够通过队列相互通信 我目前有一个侦听器对象 可以从特定队列接收消息 Configuration public class Listener final
  • 过期的消息不会从 RabbitMQ 中删除

    我通过生产者向 RabbitMQ 发送一条普通消息 然后发送第二条消息expiration属性分配给一个值 然后使用rabbitmqctl list queues命令我监视消息的状态 我发现如果我先发送一条普通消息 然后发送一条消息expi
  • Akka 的语言和产品替代品是什么?

    现在我正在看游戏框架 https www playframework com 并且非常喜欢它 Play 中提供的功能中最受宣传的部分之一是Akka http akka io 为了更好地理解 Akka 以及如何正确使用它 您能告诉我其他语言或
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • 在 RabbitMQ 监听器中隐藏运行时异常

    在某些故意发生的情况下 我使用了一些异常来拒绝消息 但在控制台中显示了乍一看似乎不太正常的异常 如何在登录控制台 文件时隐藏该特定异常 我正在使用 spring boot 和默认记录器 public static class Undispa
  • 每次发布后我应该关闭通道/连接吗?

    我在 Node js 中使用 amqplib 但我不清楚代码中的最佳实践 基本上 我当前的代码调用amqp connect 当 Node 服务器启动时 然后为每个生产者和每个消费者使用不同的通道 而不会真正关闭它们中的任何一个 我想知道这是
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • 为什么需要消息队列来与 Web 套接字聊天?

    我在互联网上看到了很多使用 Web 套接字和 RabbitMQ 进行聊天的示例 https github com videlalvaro rabbitmq chat https github com videlalvaro rabbitmq
  • MongoDB 架构设计 - 实时聊天

    我正在启动一个项目 我认为该项目特别适合 MongoDB 因为它提供的速度和可扩展性 我目前感兴趣的模块是与实时聊天有关的 如果我要在传统的 RDBMS 中执行此操作 我会将其分为 频道 一个频道有很多用户 用户 一个用户有一个频道但有多条
  • 使用 Celery(RabbitMQ、Django)检索队列长度

    我在 django 项目中使用 Celery 我的代理是 RabbitMQ 我想检索队列的长度 我浏览了 Celery 的代码 但没有找到执行此操作的工具 我在 stackoverflow 上发现了这个问题 从客户端检查 RabbitMQ
  • Celery 与rabbitmq 创建结果多个队列

    我已经用 RabbitMQ 安装了 Celery 问题是 对于返回的每个结果 Celery 都会在 Rabbit 中创建队列 并在交换 celeryresults 中使用任务 ID 我仍然想得到结果 但在一个队列上 我的芹菜配置 from
  • AMQP如何克服直接使用TCP的困难?

    AMQP如何克服直接使用TCP发送消息时的困难 或者更具体地说 在发布 订阅场景中 在 AMQP 中 有一个代理 该代理接收消息 然后完成将消息路由到交换器和队列的困难部分 您还可以设置持久队列 即使客户端断开连接 也可以为客户端保存消息
  • RabbitMQ:如何创建和恢复备份

    我是 RabbitMQ 的新手 我需要一些帮助 如何备份和恢复到RabbitMQ 以及我需要保存哪些重要数据 谢谢 如果您安装了管理插件 您可以在Overview页 在底部你会看到导入 导出定义您可以使用它来下载代理的 JSON 表示形式
  • 将 sensu-client 连接到服务器时 AMQP 连接的 bad_header

    我已经安装了 sensu 和厨师社区食谱 但是 sensu客户端无法连接到服务器 导致rabbitmq连接错误 尝试连接时消息超时 这是详细的客户端日志 来自 sensu client log 的日志 timestamp 2014 07 0

随机推荐