在rabbitmq配置spring boot中在AMQP中配置多个Vhost

2024-04-08

我正在实现一个项目,我必须在rabbitmq中的不同虚拟主机之间发送消息。使用 SimpleRoutingConnectionFactory 但得到 java.lang.IllegalStateException: 无法确定查找键的目标 ConnectionFactory [null]。 任何知道如何实现以下内容的人都是我的配置类代码。

@Configuration
@EnableRabbit
public class RabbitMQConfiguration {

@Autowired
ConnectionProperties connect;

// client1 exchanges
@Bean
public TopicExchange client1Exchange() {
    TopicExchange ex = new TopicExchange("ex_client1");
    ex.setAdminsThatShouldDeclare(client1());
    return ex;
}

// client2 exchange
@Bean
public TopicExchange client2Exchange() {
    TopicExchange ex = new TopicExchange("ex_client2");
    ex.setAdminsThatShouldDeclare(client2Admin());
    return ex;
}

@Bean
public Queue client1Queue() {
    Queue queue = new Queue("client1_queue");
    queue.setAdminsThatShouldDeclare(client1());
    return queue;
}

@Bean
public Binding client1Binding() {
    Binding binding = BindingBuilder.bind(client1Queue())
            .to(client1Exchange())
            .with("client1_key");
    binding.setAdminsThatShouldDeclare(client1());
    return binding;
}


@Bean
public Queue client2Queue() {
    Queue queue = new Queue("client2_queue");
    queue.setAdminsThatShouldDeclare(client2());
    return queue;
}

@Bean
public Binding client2Binding() {
    Binding binding = BindingBuilder.bind(client2Queue())
            .to(client2Exchange())
            .with("client2_key");
    binding.setAdminsThatShouldDeclare(client2());
    return binding;
}

@Bean
@Primary
public ConnectionFactory connectionFactory() {
    SimpleRoutingConnectionFactory connectionFactory = new SimpleRoutingConnectionFactory();
    Map<Object, ConnectionFactory> targetConnectionFactories = new HashMap<>();
    targetConnectionFactories.put("client1", client1ConnectionFactory());
    targetConnectionFactories.put("client2", client2ConnectionFactory());
    connectionFactory.setTargetConnectionFactories(targetConnectionFactories);
    return connectionFactory;
}

@Bean
public ConnectionFactory client1ConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(connect.getRabbitMQHost());
    connectionFactory.setVirtualHost(connect.getRabbitMQClient1VHost());
    connectionFactory.setUsername(connect.getRabbitMQClient1User());
    connectionFactory.setPassword(connect.getRabbitMQClient1Pass());
    return connectionFactory;
}

@Bean
public ConnectionFactory client2ConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(connect.getRabbitMQHost());
    connectionFactory.setVirtualHost(connect.getRabbitMQClient2VHost());
    connectionFactory.setUsername(connect.getRabbitClient2User());
    connectionFactory.setPassword(connect.getRabbitClient2Pass());
    return connectionFactory;
}

// You can comment all methods below and remove interface's implementation to use the default serialization / deserialization
@Bean
public RabbitTemplate rabbitTemplate() {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
    return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
    return new MappingJackson2MessageConverter();
}

@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(consumerJackson2MessageConverter());
    return factory;
}

@Bean
public TaskExecutor rabbitListenerExecutor() {
    int threads = Integer.valueOf(connect.getMinConsumers()) * 2; // threads = min consumers* no of queues
    final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(threads);
    executor.setMaxPoolSize(threads);
    executor.setThreadNamePrefix("RabbitThreadListener");
    executor.afterPropertiesSet();
    return executor;
}

@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrentConsumers(Integer.valueOf(connect.getMinConsumers()));
    factory.setPrefetchCount(Integer.valueOf(connect.getPrefetchCount()));
    factory.setTaskExecutor(rabbitListenerExecutor());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}

@Bean
public RabbitAdmin client1() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(client1ConnectionFactory());
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

@Bean
public RabbitAdmin client2() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(client2ConnectionFactory());
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

}

我得到这个堆栈跟踪

o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, 
processing can restart if the connection factory supports it
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119)
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:90)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:505)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1335)
    at java.lang.Thread.run(Thread.java:748)

The RoutingConnectionFactory一般用于发布消息。

在侦听器容器中使用路由工厂时,必须配置查找键以匹配容器中配置的队列名称。

From 文档 https://docs.spring.io/spring-amqp//reference/html/_reference.html#routing-connection-factory:

同样从版本 1.4 开始,您可以在侦听器容器中配置路由连接工厂。在这种情况下,队列名称列表将用作查找键。例如,如果您将容器配置为setQueueNames("foo", "bar"),查找键将是"[foo,bar]"(没有空间)。

所以;如果一个RabbitListener监听队列foo路由查找键必须是[foo]。 (您可以使用不同的密钥多次添加相同的 CF)。

或者您可以简单地创建多个容器工厂,每个容器工厂都有一个具体的 CF 而不是路由 CF。

EDIT

假设你有

@RabbitListener(queues = "myQueue", connectionFactory = "myRabbitListenerContainerFactory")
public void listen(...) {
    ...
}

If myQueue is in client1的虚拟主机,那么您需要在路由器 CF 映射中添加一个条目,因此...

targetConnectionFactories.put("[myQueue]", client1ConnectionFactory());

...因为为侦听器生成的侦听器容器将在其查找键中使用队列名称。

或者,创建2个集装箱工厂;每个都直接与 client1 和 client2 CF 连接,而不是与路由 CF...

@Bean
public SimpleRabbitListenerContainerFactory client1ListenerContainerFactory() {

@Bean
public SimpleRabbitListenerContainerFactory client2ListenerContainerFactory() {

and

@RabbitListener(queues = "myQueue", connectionFactory = "client1ListenerContainerFactory")
public void listen(...) {
    ...
}

即根本不要对侦听器使用路由 CF - 容器只有一个连接。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在rabbitmq配置spring boot中在AMQP中配置多个Vhost 的相关文章

随机推荐

  • 创建/渲染 scene2d 舞台后重置视口

    在我的游戏中 我正在绘制一个 scene2dStage使用自定义世界坐标系 然后 我想绘制一个调试 UI 上面包含一些文本 例如 FPS 但只需使用屏幕坐标 即文本位于屏幕右上角的位置 我的主要渲染方法如下所示 Override publi
  • 为什么在 Firefox 开发工具中传输的字节大于 size 字节?

    我正在衡量网站的性能 当查看 firefox developer tools 时 我注意到一个奇怪的行为 有一个特定的JS文件 传输的大小为2 831 54 KB 但实际大小为1280kb 根据 Mozilla 的说法 https deve
  • 设置 HTML 文本框的最大字数

    我想设置一个最大数量用户可以在文本框中输入的单词数 not字符数但是words 这个有可能 我做了一些挖掘 发现如何获取用户使用正则表达式输入的单词数 但我不确定如何阻止用户在达到最大值后输入更多字母 var jobValue docume
  • 使用 JSP 时如何使用 Struts 2 ModelDriven 接口访问 POJO 中的属性?

    我有一个实现的动作类ModelDriven界面 这ModelDriven是一个常规 POJO 问题在于它的属性之一是另一个对象 想象一下我的模型驱动is a object calledPersonand my person has an a
  • 如何声明一个返回函数指针的函数?

    想象一个带有参数 double 和 int 的函数 myFunctionA myFunctionA double int 这个函数应该返回一个函数指针 char myPointer 如何在 C 中声明这个函数 typedef是你的朋友 ty
  • jQuery DataTables:如何按自定义参数值而不是单元格内容排序?

    我有一个非常常见的用例 我在价格列中显示格式化的价格 例如 20 000 00 因此 当我尝试对它进行排序时 它会将其视为字符串并且排序效果不佳 10 00 20 000 00 5 000 00 我可以使它按数据参数值 非格式化浮点数 排序
  • mail() 超时问题

    当我通过浏览器执行电子邮件脚本时 会返回超时致命错误 除非我大幅增加执行时间 否则它将正常运行 而不是我正在寻找的解决方案 电子邮件已发送 但需要很长时间 平均 5 分钟 才能到达 我的收件箱 考虑到通过命令行它可以完美地工作 我认为 ph
  • 反应本机视频类型错误:未定义不是对象

    所以我尝试使用 npm 中的 React native video 包来播放 YouTube 视频 export default class App extends Component render return
  • 如何让 NSTextField 在自动布局中随着文本增长?

    Lion 中的自动布局应该可以非常简单地让文本字段 以及标签 随其包含的文本一起增长 文本字段在 Interface Builder 中设置为换行 有什么简单而可靠的方法可以做到这一点 方法intrinsicContentSize in N
  • 如何在 ASP.NET Core 5 中使用 IdentityUser 和 IdentityRole 之间的隐式多对多

    我有这两个实体 public class ApplicationUser IdentityUser
  • AF网络。检查所有操作队列的下载进度

    我有一个使用 AFNetworking 制作的 iOS 应用程序 我有一个单例自定义 httpclient 子类 我用它来使用我的 api 并从服务器下载二进制文件 我需要下载大约 100 个文件 迭代我的 url 数组并为每个 url 创
  • 将多个散景 HTML 图嵌入到 Flask 中

    我在 bokeh 网站和 stack Overflow 上搜索了过去 3 个小时 但没有一个是我真正想要的 我已经生成了我的图 并将它们放在 html 文件中 我想要做的就是将绘图嵌入到仪表板中 形成下图白色区域中的多网格状结构 然而 仅添
  • 如何启用或禁用键盘返回键[重复]

    这个问题在这里已经有答案了 当我们在文本字段中输入字符时 它会启用返回键 但是在我们的需求中 当长度大于5时 我们需要启用返回键 但是现在只要我们输入字符 它就会启用返回键 我们是否需要自定义键盘或者是否有其他可用的解决方案 提前致谢 好吧
  • 如何编写正则表达式从字符串中提取数值?

    我有一根绳子str 9 0 hr or str 9 0 hr 在这种情况下我只想要整数值9 0 语言是 Ruby 1 9 2 I d use number str d d 或者 如果小于 1 0 的值需要前导 0 number str d
  • 光栅化 matplotlib 轴内容(但不包括框架、标签)

    在一篇文章中 我正在生成变形有限元网格图 并使用 matplotlib 的 polycollection 对其进行可视化 图像保存为 pdf 高密度网格会出现问题 这种简单的方法会导致文件太大且渲染过于密集而无法实用 对于这些网格 将每个元
  • 在 R 中使用具有不均匀长度变量的熔化/铸造

    我正在处理一个想要旋转的大型数据框 以便列中的变量成为顶部的行 我发现 reshape 包在这种情况下非常有用 除了强制转换函数默认为 fun aggregate length 之外 大概这是因为我是按 案例 执行这些操作 并且测量的变量数
  • 当返回类型为 Option[Error] 时处理快速失败

    我已经发布了很多关于 Scala 中的故障处理的问题 我真的感谢大家的回答 我理解在处理 Either 和 Scalaz 或 a 时的选择 以便理解 我还有另一个 最后一个 问题 当操作处理外部非功能世界 例如数据库 时 如何执行快速失败的
  • 在 SQL 数据库中保持 RSS 提要唯一的最佳实践

    我正在开发一个项目 该项目显示来自不同站点的 RSS 提要 我将它们保存在数据库中 我的程序每 3 小时获取一次并将它们插入到 SQL 数据库中 我希望提供者有独特的记录 不要显示重复的内容 但问题是一些提供商不提供 GUID 字段 而其他
  • 全局 $sce.trustAsResourceUrl()

    我怎样才能做这样的事情 sce trustAsResourceUrl URL HERE 在全球范围内 就像在主应用程序中一样config or run 函数 以便任何 iFrame img src 等具有URL HERE将工作 文档对此的解
  • 在rabbitmq配置spring boot中在AMQP中配置多个Vhost

    我正在实现一个项目 我必须在rabbitmq中的不同虚拟主机之间发送消息 使用 SimpleRoutingConnectionFactory 但得到 java lang IllegalStateException 无法确定查找键的目标 Co