rabbitmq 客户端golang实战

2023-11-06

rabbitmq消息模式

rabbitmq中进行消息控制的组建可以分为以下几部分:

  1. exchange:rabbitmq中的路由部件,控制消息的转发路径;
  2. queue:rabbitmq的消息队列,可以有多个消费者从队列中读取消息;
  3. consumer:消息的消费者;

rabbitmq在使用过程中可以单独使用queue进行消息传递(例如celery就可以使用单个queue进行多对多的消息传递),也利用exchange与queue构建多种消息模式,主要包括fanout、direct和topic方式,模式的使用方式在此放一张图,不再此做详细解释。

rabbitmq

我在使用的rabbitmq的过程中,主要是进行消息的广播及主题订阅:

[producer] -> [exchange] ->fanout-> [queue of consumer] -> [consumer]
       |                             /|\
       ------->[exchange] ->topic------

不同的设备连接到rabbitmq中创建自己的queue,将queue绑定的两个不同的exchange,分别接收广播消息及主题消息。通过配置queue的持久化及消息过期时间,则可以在设备短暂下线的情况下,将消息缓存在queue中,之后上线后再从queue中读取消息。

rabbitmq客户端

rabbitmq客户端本质上是实现amqp协议的通信过程,golang的基础package使用的是github.com/streadway/amqp

在此主要对客户端构建中的一些问题进行陈述,详细的客户端构建代码请参见:rabbitmq_client.go

创建queue

exchange和queue实际上都是通过amqp协议进行创建的,如果在创建过程时,rabbitmq中已经有相同名称的exchange或queue但属性不则会创建失败。通常情况下exchange的属性不会变化,但是queue可能会修改过期时间、消息TTL等属性,因此实现过程中,若queue创建不成功则进行删除后再创建(在我的应用场景中queue与消费者绑定,因此不存在误删在使用中的queue的问题):

func (clt *Client) queInit(server *broker, ifFresh bool) (err error) {

	var num int
	ch := clt.ch

	if ifFresh {
		num, err = ch.QueueDelete(
			server.quePrefix+"."+clt.device,
			false,
			false,
			false,
		)
		if err != nil {
			return
		}
		log.Println("[RABBITMQ_CLIENT]", clt.device, "queue deleted with", num, "message purged")
	}

	args := make(amqp.Table)
	args["x-message-ttl"] = messageTTL
	args["x-expires"] = queueExpire
	q, err := ch.QueueDeclare(
		server.quePrefix+"."+clt.device, // name
		true,  // durable
		false, // delete when usused
		false, // exclusive
		false, // no-wait
		args,  // arguments
	)
    // 注意在此配置的两个参数,详细用意请参见 http://next.rabbitmq.com/ttl.html
	if err != nil {
		return
	}

	for _, topic := range server.topics {
		err = ch.QueueBind(
			q.Name,
			topic.keyPrefix+"."+clt.device,
			topic.chanName,
			false,
			nil,
		)
		if err != nil {
			return
		}
	}

	clt.que = q
	return
}
消息接收

对于消费者消息的接收过程如下所示:

msgs, err := clt.ch.Consume(
		clt.que.Name, // queue
		clt.device,   // consumer
		false,        // auto ack
		false,        // exclusive
		false,        // no local
		false,        // no wait
		nil,          // args
	)
	if err != nil {
		clt.Close()
		log.Println("[RABBITMQ_CLIENT]", "Start consume ERROR:", err)
		return nil
	}

	clt.msgs = msgs
	clt.pubChan = make(chan *publishMsg, 4)

	go func() {
		cc := make(chan *amqp.Error)
		e := <-clt.ch.NotifyClose(cc)
		log.Println("[RABBITMQ_CLIENT]", "channel close error:", e.Error())
		clt.cancel()
	}()

	go func() {
		for d := range msgs {
			msg := d.Body
			msgProcess(d.Exchange, msg)
			d.Ack(false)
		}
	}()

通过ch.Consume调用可以得到一个接收消息的msgs channel,在此没有配置auto ack,而是在消息处理结束之后,通过调用d.Ack(false)反馈ACK,这样可以保证消息在被处理之后,再进行确认。消费过程中,还调用ch.NotifyClose(cc)amqp.Channel的关闭进行侦听。

注意:在一个gorontinue中同时对msgs和notifyClose两个channel进行读取可能会导致死锁。因为msgs被关闭就会结束相应的gorontinue,此时notifyClose因为没有接收者,而在amqp.channel关闭的过程中出现死锁。

消息发送

在amqp的消息发送过程中,其对于消息的确认机制略有些蛋疼。因为在发送的时候不可配置发送的消息id,但在接收确认时,消息id是按照自然数递增的,也就是说发送者需要按照自然数递增的顺序自己维护发送的消息id。相关代码如下所示:

func (clt *Client) publishProc() {
	ticker := time.NewTicker(tickTime)
	deliveryMap := make(map[uint64]*publishMsg)

	defer func() {
		atomic.AddInt32(&clt.onPublish, -1)
		ticker.Stop()
		for _, msg := range deliveryMap {
			msg.ackErr = errCancel
			msg.cancel()
		}
	}()

	var deliveryTag uint64 = 1
	var ackTag uint64 = 1
	var pMsg *publishMsg
	for {
		select {

		case <-clt.ctx.Done():
			return

		case pMsg = <-clt.pubChan:
			pMsg.startTime = time.Now()
			err := clt.sendPublish(pMsg.topicId, pMsg.keySuffix, pMsg.msg, pMsg.expire)
			if err != nil {
				pMsg.ackErr = err
				pMsg.cancel()
			}
			deliveryMap[deliveryTag] = pMsg
			deliveryTag++

		case c, ok := <-clt.confirm:
			if !ok {
				log.Println("[RABBITMQ_CLIENT]", "client Publish notify channel error")
				return
			}
			pMsg = deliveryMap[c.DeliveryTag]
			// fmt.Println("DeliveryTag:", c.DeliveryTag)
			delete(deliveryMap, c.DeliveryTag)
			if c.Ack {
				pMsg.ackErr = nil
				pMsg.cancel()
			} else {
				pMsg.ackErr = errNack
				pMsg.cancel()
			}
		case <-ticker.C:
			now := time.Now()
			for {
				if len(deliveryMap) == 0 {
					break
				}
				pMsg = deliveryMap[ackTag]
				if pMsg != nil {
					if now.Sub(pMsg.startTime.Add(pubTime)) > 0 {
						pMsg.ackErr = errTimeout
						pMsg.cancel()
						delete(deliveryMap, ackTag)
					} else {
						break
					}
				}
				ackTag++
			}
		}
	}
}

发送过程的构造要点:

  1. 使用一个map[uint64]*publishMsg存储已经发送的消息,map的键为消息的id;
  2. 接收到确认消息后,通过消息的反馈机制反馈确认信息,并从map中删除消息;
  3. 在每一个tick,按照递增的id检查map中是否有超时消息,通过消息的反馈机制反馈超时信息;
  4. 在协程退出时向每个消息发送反馈信息,并删除消息。

需要注意的是,消息反馈并没有使用channel,因为消息的接收者可能因为超时不再侦听channel,从而导致发送过程出现阻塞。可以用长度不为0的反馈channel使得发送过程不阻塞,但是着需要等待gc后才能释放反馈channel的内存。因此在此并没有使用channel接收反馈,而是通过context的事件来告知发送方消息发送过程结束,反馈信息则提前写在publishMsgackErr中。

总结

作为golang的入门级选手,在实现rabbitmq客户端过程中还是踩了一些坑,最后的实现还是可以算是高效可靠。rabbitmq的库本身有心跳机制来维持与服务器之间的连接,但依据实现mqtt客户端的经验,还是自己实现了心跳来保障客户端上层连接的可靠性。因此在接收和发送两方面,该客户端实现还是经受住了考验,欢迎大家参考。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rabbitmq 客户端golang实战 的相关文章

随机推荐

  • 探亲问题(无向图任意两点是否可连通)——C语言

    下提供队列实现的代码 include
  • Microsoft Dynamics的五种关键能力

    1 集成通信与协作 您需要在工程部门 制造部门和分包商间进行同步通信 例如 如果工程部门改变了设计 运营部门应该立即知道有关的详细情况 您的通信解决方案需要同 ERP 系统相集成 以确保分包商能够支持工程部门在敏捷性方面的提高 产品数据管理
  • 集成开发环境:IDE

    集成开发环境 IDE IDE Integrated Development Environment 是用于提供程序开发环境的应用程序 一般包括代码编辑器 编译器 调试器和图形用户界面等工具 集成了代码编写功能 分析功能 编译功能 调试功能等
  • 机器学习(2)——鸢尾花数据集

    在上次房价数据集中做出一些改进 对鸢尾花数据集进行预测 需要导入的库 from sklearn datasets import load iris 导入鸢尾花数据集 from sklearn linear model import Logi
  • spark安装运行在webUI界面不显示worker

    spark conf spark env sh 文件中需要显式地设置一些环境变量 不用系统默认值 亲测 ubuntu16 04系统 spark env sh中手动配置 export JAVA HOME lt gt jdk1 8export
  • 33. 实战:实现某网站店铺信息的查询与批量抓取(附源码)

    目录 前言 目的 思路 代码实现 1 请求URL 获取源代码 2 解析源代码 获取数据 3 完善保存数据的函数save data 4 理清main函数逻辑 循环传递每一页有效信息的参数 完整代码 运行效果 总结 前言 近日 我们每周四都能刷
  • C#类与结构体的区别

    C 中类 class 与结构体 stract 的区别 1 类是引用类型 结构体是值类型 2 结构体不支持继承 但可以实现接口 类即支持继承也能实现接口 3 结构体中不可以声明无参的构造函数 4 结构体不能定义析构函数 5 结构体不可用作其他
  • 关于json数据的写入(write())必须为str类型及写入后双引号“变为‘号问题

    1 原始json数据 text 黎城县东崖底中心校学生用床购置项目成交公告 label 1 duoyu 0 text 淮南师范学院采购2017年智库项目 科研建设项目 学科及科技创新平台项目 1包 中标公示 label 1 duoyu 0
  • SpringBoot对接微信小程序支付功能开发(二,支付回调功能)

    接着上一篇 SpringBoot对接微信小程序支付功能开发 一 下单功能 在上一篇下单功能中我们有传支付结果回调地址 下面是回调接口实现 package com office miniapp controller import cn hut
  • Blender材质贴图入门图文教程

    推荐 将 NSDT场景编辑器 加入你的3D开发工具链 大家好 今天跟大家分享Blender材质贴图入门图文教程 一套blender的PBR材质包 和HDRI环境纹理贴图 在文末领取 希望能助到大家更高效完成场景练习 据我了解 越来越多人开始
  • Redis、Redission实现分布式锁

    Redis实现 使用spring data redis提供的接口实现redis分布式锁
  • kali使用aircrack无线攻击wifi超详细步骤(包含监听网卡启动,获得握手包,密码本生成)

    平台及工具 linux kali平台 aircrack ng 工具 免驱监听网卡 详细操作 1 首先启动kali 插入网卡 输入ifconfig查看kali是否检测到监听网卡 注意监听网卡要免驱动的 ifconfig 查看自身网卡信息 如图
  • React lazyLoad懒加载

    在React中使用lazy懒加载 效果图 目录结构 index js import React from react import ReactDOM from react dom import App from App import Bro
  • PID算法与PID自整定算法

    PID算法与PID自整定算法 本文是由于研发恒温槽项目故需要了解PID控制算法和PID自整定算法 为方便本人日后需要故作此记录 直接粘贴代码吧 这是PID位置式控温算法 函数名 void Pid positional float speed
  • 【Qt教程】4.1 - Qt5 文件系统 QFile文件读写操作

    1 Qt文件系统简介 QFile 文件系统是应用程序必不可少的部分 Qt作为一个通用开发库 提供了跨平台的文件操作能力 Qt通过 QIODevice 提供了对I O设备的抽象 使这些设备具有读写字节块的能力 在所有的I O设备中 文件I O
  • java forName() 方法

    forName 方法会进行类加载 将MyClass装在到JVM上 静态代码块 在类加载时执行 且只执行一次 如果你只想执行一个类的静态代码块 其它代码不执行 可以使用forName 方法 package leetcode0606 refle
  • Mybatis:传参+提交事务(自动or手动)+sql多表关联查询(两种方法)

    目录 一 参数两种类型 二 传参的几种方法 三 提交事务 四 sql多表关联查询 两种方法 一 参数两种类型 1 参数 预编译方式 更安全 只用于向sql中传值 select from admin where account account
  • Buuctf(Easy Calc 1)

    一 解题步骤 1 发现了一个可以得到计算结果的输入框 说明这题可能是一道命令执行 或者注入题目 我们输几个数字发现可以得到正确答案 但输入字母就会报错 我们看一下html源码 进行代码审计
  • java给byte赋值_关于JAVA中Byte数据类型二进制赋值运算报错问题

    自从JDK7更新之后 新增了二进制变量的表示 支持将整数类型用二进制来表示 用0b开头 例如 byte b byte 0b1000 0001 short s short 0b1000 0000 0000 0001 新手在这个时候会遇到一个问
  • rabbitmq 客户端golang实战

    rabbitmq消息模式 rabbitmq中进行消息控制的组建可以分为以下几部分 exchange rabbitmq中的路由部件 控制消息的转发路径 queue rabbitmq的消息队列 可以有多个消费者从队列中读取消息 consumer