从教程开始学习Rabbitmq

2023-05-16

Rabbitmq 入门概念

首先来介绍下Rabbitmq的一些概念:

 

  • Producer:生产者,生产者负责发送信息(messages)
  • Queue:队列,队列是RabbitMQ中的信箱,唯一区别是信箱里的是信件,而队列中的是数据
  • Consuming:消费者,消费者负责接受消息。

这三个概念就足以支撑MQ的一个简单模式了。请看如下代码:

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

这里的Connection 对象负责和mq联络,因此我们需要告诉创建Connection对象的工厂mq的地址(Host)。

Cannel是大多数完成工作用到的API所在的位置。

 

queueDeclare 负责声明一个队列,队列的概念上面已经解释过了。

 

如果我们想发送消息,只需要调用basicPublish方法即可:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

 

工作队列模式:

Work Queue

当实现了简单的发送和接受后,我们来看工作队列方式:

我们的消费者可能不止一个,这种时候mq将消息分给消费者。

 

分的方式就有说法了,默认情况下,mq采用Round-robin方式分发消息。

比如有1 2 3 4 5 6 7 8条消息,两个消费者,那么1 3 5 7 会分给消费者1,而2 4 6 8分给消费者2.

这是一种很平均的方式,但也会有问题,比如1 3  5 7这几个任务每个都很耗时,而2 4 6 8不耗时,这就导致了消费者2处于一种很闲而消费者1很忙的情况。

 

好在有一个参数可以调节mq的工作方式,这个参数用于worker中,worker通过channel调用basicQos方法,

该方法可以指定我这个工作者每次取几个消息,例如设置为1,那么当我还有消息没有处理完的时候,就不能再往我这里发送消息。

 

int prefetchCount = 1;
channel.basicQos(prefetchCount);

 

mq怎么知道你是否处理完了?

boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

autoAck置为true是,表示消费者一收到消息就立马返回ack,mq就会认为你已经处理完了。  

如果你不喜欢这种方式,想要求当我真正处理完了,觉得OK了才真正的告诉mq我处理完了。这需要将autoAck置为false。

具体如下:

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

 

注意basicAck这个方法,这里就是自定义位置来回应mq,告诉mq我已经处理完了。

忘记回应ack是很危险的!

忘记回复ACK很可能造成内存泄漏,而且mq默认的工作方式就是autoAck = false的。意味着你不回复ack,mq就不会删除这个消息,而且mq的消息是没有超时限制的,所以会一直保存。

 

后面会继续介绍发布订阅模式~

 

 

 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从教程开始学习Rabbitmq 的相关文章

  • Spring RabbitMQ 教程导致连接被拒绝错误

    我是一名经验丰富的 Java 程序员 并且是第一次尝试 Spring Rabbit MQ 我遵循了 messages rabbitMQ 教程exactly使用 Maven http spring io guides gs messaging
  • 每次发送消息时是否需要重建RabbitMQ连接

    我有一个 Spring 3 应用程序 它通过非 RabbitMQ 接收器接收消息 处理它们并通过 RabbitMQ 转发 每次发送消息时都会建立一个新的 RabbitMQ 连接 这似乎有点浪费 我只是想知道这是否真的有必要 或者是否有原因导
  • RPC 模型中的correlationId 和临时队列 - AMQP

    我正在读书RPC模型 http www rabbitmq com tutorials tutorial six java html在 AMQP 中使用 RabbitMQ 本教程创建了一个临时队列 并且还correlationId 临时队列是
  • 如何优雅地结束 spring @Schedule 任务?

    我正在尝试让 Spring Boot 服务优雅地结束 它有一个方法 Scheduled注解 该服务使用 spring data 作为数据库 使用 spring cloud stream 作为 RabbitMQ 在计划的方法结束之前 数据库和
  • Camel 中的无限循环 - Rabbitmq

    我有一个小型服务器路由 它从queue in 获取消息并放入queue out 当我输入一条消息时queue in 服务器无限循环运行 我想知道我在配置方面缺少什么 这看起来是一条非常简单的路线 小服务器路由
  • Rails:如何从服务或队列中监听/拉取?

    大多数 Rails 应用程序的工作方式都是等待来自客户端的请求 然后发挥其作用 但是 如果我想将 Rails 应用程序用作微服务架构的一部分 例如 并进行一些异步通信 服务 A 将事件发送到 Kafka 或 RabbitMQ 队列 而服务
  • 如何在 celery 内为每个用户生成队列?

    因此 我尝试将 Web 请求中的阻塞内容移至后台任务并利用队列 我对消息传递和发布 订阅也很陌生 用户将数据推送到那里并进行处理 稍后用户会收到相关通知 我为此做了一个 celery 设置 发现它不能满足我为每个用户分配自己的任务的专用队列
  • 我应该在 Django 项目中使用 Celery 还是 Carrot?

    我有点困惑我应该使用哪一个 我认为两者都可以 但其中一个比另一个更好或更合适吗 http github com ask carrot tree master http github com ask carrot tree master ht
  • 使用Camel的spring-rabbitmq组件时如何自动声明交换?

    我正在尝试从 Camel 3 x 迁移到 Camel 4 x 版本 因此我需要从rabbitmq替换组件spring rabbitmq With rabbitmq我正在使用的组件declare https camel apache org
  • 谁能告诉我 python 中的 pika 和 kombu 消息传递库有什么区别?

    我想在我的应用程序中使用消息传递库与rabbitmq交互 谁能解释一下 pika 和 kombu 库之间的区别吗 Kombu 和 pika 是两个不同的 Python 库 它们从根本上服务于相同的目的 向消息代理发布消息和使用消息代理发送消
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • rabbitmq-erlang-client,使用 rebar 友好的 pkg,在开发环境上工作在 rebar 版本上失败

    我成功地将rabbitmq erlang client的rebar友好包用于一个简单的Hello World rebarized和OTP 兼容 应用程序 并且在开发环境中工作正常 我能够启动 erl 控制台并执行我的操作applicatio
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • RabbitMQ - 升级到新版本并收到很多“PRECONDITION_FAILED Unknown Delivery Tag 1”

    刚刚升级到新版本的 RabbitMQ 2 3 1 现在出现以下错误 PRECONDITION FAILED unknown delivery tag 1 随后通道关闭 这适用于较旧的 RabbitMQ 无需客户端更改 在应用程序行为方面 当
  • 何时使用 RabbitMQ 而不是 Kafka? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我被要求评估 RabbitMQ 而不是 Kafka 但发现很难找到消息队列比 Kafka 更合适的情况 有谁知道消息队列在吞吐量 耐用性 延迟或
  • MongoDB 架构设计 - 实时聊天

    我正在启动一个项目 我认为该项目特别适合 MongoDB 因为它提供的速度和可扩展性 我目前感兴趣的模块是与实时聊天有关的 如果我要在传统的 RDBMS 中执行此操作 我会将其分为 频道 一个频道有很多用户 用户 一个用户有一个频道但有多条
  • 如何在多租户系统中的 RabbitMQ 中使队列私有/安全?

    我已阅读开始使用 http www rabbitmq com getstarted htmlRabbitMQ 提供的指南 甚至还贡献了第六个示例暴风雨 amqp https github com paolo losi stormed amq
  • Celery 与rabbitmq 创建结果多个队列

    我已经用 RabbitMQ 安装了 Celery 问题是 对于返回的每个结果 Celery 都会在 Rabbit 中创建队列 并在交换 celeryresults 中使用任务 ID 我仍然想得到结果 但在一个队列上 我的芹菜配置 from
  • 即使设置了 cookie,RabbitMQ 身份验证也会失败

    我最近在运行 lattePanda 的 Windows 10 上安装了带有 ErlanOTP 的rabbitmq 我运行rabbitmqctl status并收到以下错误 C Program Files RabbitMQ Server ra
  • AMQP如何克服直接使用TCP的困难?

    AMQP如何克服直接使用TCP发送消息时的困难 或者更具体地说 在发布 订阅场景中 在 AMQP 中 有一个代理 该代理接收消息 然后完成将消息路由到交换器和队列的困难部分 您还可以设置持久队列 即使客户端断开连接 也可以为客户端保存消息

随机推荐

  • apache服务器使用时网页乱码问题

    查看原文 xff1a http www hellonet8 com 440 html 在apache的配置文件httpd conf中 xff0c 或许我们会用到 AddDefaultCharset UTF 8 来设置所有主机或者某虚拟主机的
  • 将群晖NAS变为本地盘

    本文介绍一个工具 xff0c 可以在 Windows 系统下将群晖NAS的目录变为本地盘 xff0c 好处是在外部访问的时候 xff0c 能够大大改善体验 可以用本地的应用程序直接打开 xff0c 速度依赖网络带宽 xff0c 正常情况下
  • Deepin | 修改网卡名 | 配置IP地址 | DHCP

    文章目录 修改网卡名解决方案 xff1a 配置IP地址解决方案 xff1a DHCP自动获取 修改网卡名 解决方案 xff1a 禁用该可预测命名规则 span class token function sudo span vim etc d
  • windows商店直接安装ubuntu子系统

    文章目录 安装报错WslRegisterDistribution failed with error 0x8007019eWSL安装Linux报错WslRegisterDistribution failed with error 0x803
  • 基于VTK的任意平面切割

    这位 小兵 太传奇了 xff0c 先收藏 xff0c 再学习 xff0c http www cnblogs com dawnWind archive 2013 02 17 3D 06 html 先贴码 以后再 切割介绍 对于一个模型的切割需
  • 百度OCR接口使用

    最近在研究ocr识别 也对比了一些的方法 现在来介绍一下 调用百度提供的ocr接口 小量调用的话 是不收费的 1 首先 你要有一个百度账号 如果已经有的话 登录进去会进入到这样一个界面 点击 34 创建应用 34 创建成功后 返回应用列表
  • Python 元组(tuple)剖析详解

    目录 概念元组的常见操作 概念 元组 span class token punctuation span 是一个有序 span class token punctuation span 可重复的集合 特点 span class token
  • Dom型XSS跨站脚本攻击和防御

    在前面的文章中 xff0c 我们讲了持久型XSS和反射型XSS 我个人觉得这些命名真的很贴切 xff0c 反应了概念的本质 无论是持久型XSS还是反射型XSS 恶意的js脚本内容都需要由服务端返回给用户 xff0c 今天我们要说的Dom型X
  • 编译错误导致浪费10多分钟, 编译错误的提示:xxx does not name a type xxx

    最近 xff0c 我在google protobuf 协议文件xxx pb增加了结构体 类 请求字段 xff0c 生成xxx h和xxx cpp文件 xff0c 然后放到对应目录进行编译 xff0c 奇葩的是 xff0c 编译出错 xff0
  • 树莓派上安装php

    简单东西 xff1a sudo apt get install php 然后 xff1a pi 64 raspberrypi taoge cat test php lt php aa 61 60 echo 39 hello 39 39 xx
  • 结构体和数组

    结构体中可以有数组类型的成员 xff0c 数组的元素也可以是结构体 数组和结构体的初始化是一样的 xff0c 都是把各个元素放在一个大括号里 xff0c 各个成员用逗号分隔 结构体数组使用示例 include lt stdio h gt i
  • iOS聊天室 简单的对话聊天界面(cell自适应高度)

    文章目录 难点思路需要用到的方法的大致解析 xff08 只是简单的介绍 xff0c 如果想要仔细理解推荐再去看看别的博客 xff09 GitHub地址代码效果图 难点 因为聊天长度不一样 xff0c 需要设置自适应高度发送信息后 xff0c
  • navicat链接centos7数据库失败Authentication plugin ‘caching_sha2_password‘ cannot be loaded: dlopen(../Frame

    重新配置云上数据库 mysql u root p use mysql select user host plugin authentication string from user G ALTER USER root 64 IDENTIFI
  • C语言中以字符串形式输出枚举变量

    1 枚举应用说明 每个枚举常量对应一个整形数字 xff0c 很多时候可以像整形一样使用 xff1b 但是如果要求打印枚举变量名的字符串 xff0c 办法也有很多 xff0c 查看网上方法几乎都需要转换 xff0c 要么用数组 xff0c 下
  • 定时器初值的计算方法

    定时器初值的计算方法 1 xff1a 定义 用户时间 xff1a Tuser 寄存器位数 xff1a Rn xff08 n 为 8 16 32分别代表 0xFF 0xFFFF 0xFFFFFFFF xff09 初始值 xff1a TCONH
  • iOS开发之NSAttributedString使用

    本文介绍了NSAttributedString和NSMutableAttributedString的简单用法 一 NSAttributedString介绍 摘自NSAttributedString h文件 span class hljs c
  • 阿里云添加安全组规则

    阿里云添加安全组规则 可以 参考 阿里云官方文档 xff1a https help aliyun com document detail 25471 html spm 61 5176 11065259 1996646101 searchcl
  • latex章节引用

    方法 xff1a 在章节后面直接添加 label 即可 xff0c 引用的时候使用 ref 可以是任意字符 参考 xff1a LaTeX 引用章节 公式和图表 Xovee CSDN博客 latex引用图表
  • Python之pip命令指定安装源和版本

    背景 用pip安装依赖包时默认访问 https pypi Python org simple 该路径经常出现不稳定以及访问速度非常慢的情况 xff0c 国内厂商提供的一些pipy镜像可以加快下载速度 xff0c 目前可用的有 xff1a 清
  • 从教程开始学习Rabbitmq

    Rabbitmq 入门概念 首先来介绍下Rabbitmq的一些概念 xff1a Producer xff1a 生产者 xff0c 生产者负责发送信息 xff08 messages xff09 Queue xff1a 队列 xff0c 队列是