一种消息和任务队列——beanstalkd

2023-10-26

 

beanstalkd 是一个轻量级消息中间件,其主要特性:
  • 基于管道  (tube) 和任务 (job) 的工作队列 (work-queue):d
      管道(tube),tube类似于消息主题(topic),在一个beanstalkd中可以支持多个tube,每个tube都有自己的producer和consumer;
          任务(job),beanstalkd用job代替了message的概念,与消息不同,job有一系列状态: 
  • 内部实现采用了 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,具有有很高的性能。
  • 尽管是内存队列,beanstalkd 提供了 binlog 机制,当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。
  • 优先级(priority):job可以有0~2^32个优先级,0代表最高优先级,beanstalkd使用最大最小堆处理job的优先级排序,因此reserve命令的时间复杂度是O(logN);
  • 延时(delay),有两种方式可以执行延时任务:producer发布任务时指定延时;或者当任务处理完毕后, consumer再次将任务放入队列延时执行 (RELEASE with <delay>);
  • 超时重发(time-to-run),Beanstalkd 把job返回给consumer以后:consumer必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把job交给另外的消费者节点执行。如果consumer预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR ;
  • 任务预留(buried),如果job因为某些原因无法执行, consumer可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。
 
 
 
 

job的状态

  • READY,需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;
  • DELAYED,延迟执行的任务, 当消费者处理任务后,可以用将消息再次放回 DELAYED 队列延迟执行;
  • RESERVED,已经被消费者获取, 正在执行的任务,Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
  • BURIED,保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;
  • DELETED,消息被彻底删除。Beanstalkd 不再维持这些消息。
 
如下,是一个典型任务的生命周期:
producer执行put命令将job放入队列,consumer执行reserve命令从队列取出job,执行完毕后发送delete命令告诉beanstalkd删除该job。
如果没有执行delete命令,beanstalkd将在一个TTR周期(默认120s)后重新将该job加入队列;
 
 put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*
 
下面是一个使用 beanstalkc(python客户端)操作beanstalkd的例子:
#!/usr/bin/env python
import beanstalkc

beanstalk=beanstalkc.Connection(host="127.0.0.1",port=11300)

# pruducer
beanstalk.put('hello')
beanstalk.put('world')

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body
job1.delete()

job2=beanstalk.reserve(timeout=1)
print "job2: " + job2.body
job2.delete()

job3=beanstalk.reserve(timeout=1)       # Error,队列中已经没有job了
print "job3: " + job3.body
job3.delete()

 

 

tube的管理

beanstalkd通过tube维护多个队列,每个tube都是一个独立的queue,可以使用use命令切换tube,如果切换的tube不存在,会自动创建一个:
print beanstalk.tubes()     # default
print beanstalk.using()     # default 

beanstalk.use('queue1')
print beanstalk.using()     # queue1

beanstalk.use('queue2')     # queue2
print beanstalk.using()

print beanstalk.tubes()     # default, queue2

如上面的例子,在最后的tubes()命令打印所有tube的时候,并没有看到queue1,这是因为没有任何client 在using或者watching的tube会自动消失。

可以使用watch命令让client同时处理多个tube,而不用担心tube会被销毁:

print beanstalk.tubes()     # default
print beanstalk.using()     # default

beanstalk.use('queue1')
beanstalk.watch('queue1')
print beanstalk.using()     # queue1

beanstalk.use('queue2')     # queue2
print beanstalk.using()

print beanstalk.tubes()     # default, queue1, queue2
print beanstalk.watching()  # default, queue1

 

watch的tube如果不存在,会被自动创建,可以用ignore命令取消关注tube:

beanstalk.watch('queue3')
print beanstalk.watching()  # default, queue3
beanstalk.ignore('queue3')
print beanstalk.watching()  # default

 

注意,watch和use是两个独立的动作,use一个tube不代表watching它了,反之watch一个tube也不代表using它;
 
 
 

beanstalkc命令

如下,是一个job更完整的状态变迁和生命周期:
 
put with delay               release with delay
  ----------------> [DELAYED] <------------.
                        |                   |
                        | (time passes)     |
                        |                   |
   put                  v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                       ^  ^                |  |
                       |   \  release      |  |
                       |    `-------------'   |
                       |                      |
                       | kick                 |
                       |                      |
                       |       bury           |
                    [BURIED] <---------------'
                       |
                       |  delete
                        `--------> *poof*
 
一些例子:
put的时候加上delay参数,可以延迟发布job:
# pruducer
beanstalk.put('hello', delay=10)
beanstalk.put('world')

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # World
job1.delete()

job2=beanstalk.reserve(timeout=0) 
print "job2: " + job2.body  # Error,

 

put命令也支持优先级参数:

# pruducer
beanstalk.put('hello', priority=10)
beanstalk.put('world', priority=9)

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # world
job1.delete()

job2=beanstalk.reserve(timeout=0) 
print "job2: " + job2.body  # hello
job2.delete()

 

 

release命令可以释放job回队列:

# pruducer
beanstalk.put('hello') 
beanstalk.put('world')

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # hello
job1.release()

job2=beanstalk.reserve(timeout=0) 
print "job2: " + job2.body  # hello

 

 bury命令将job放到一个特殊的FIFO队列中,之后不能被reserve命令获取,但可以用kick命令扔回工作队列中,之后就能被消费了:

# pruducer
beanstalk.put('hello')
beanstalk.put('world')

# consumer
job1=beanstalk.reserve(timeout=0)
job1.bury()
print job1.stats()['state'] # buried

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # world
job2.delete()

beanstalk.kick()

job3=beanstalk.reserve(timeout=0)
print "job3: " + job3.body  # hello
job3.delete()

 

peek命令允许查看一个job,但不会reserve它;

# pruducer
beanstalk.put('hello')
beanstalk.put('world')

#print(beanstalk.stats())  

# consumer
job1=beanstalk.reserve(timeout=0)
job1_id=job1.stats()['id'] 
print job1_id
job1_r=beanstalk.peek(job1_id)
print "job1 " + job1_r.body  # hello

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # hello
job2.delete()

job3=beanstalk.reserve(timeout=0) 
print "job3: " + job3.body  # world
job3.delete()

 

======专注高性能web服务器架构和开发=====

https://www.cnblogs.com/chenny7/p/7244913.html



本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

一种消息和任务队列——beanstalkd 的相关文章

随机推荐

  • C51单片机重要知识点总结

    文章目录 文章目录 00 写在前面 01 C51基本数据类型总结 02 C51数据类型扩充定义 03 关于单片机 04 单片机工作的基本时序 05 单片机复位 06 80C51的中断系统 07 定时器 08 串口通信 09 C语言基础 10
  • Unable to cast object of type ‘System.Byte‘ to type ‘System.Boolean‘

    mysql数据库 存储的一个字段类型为tinyint 查询数据的时候设置的数据类型bool 查询结果报错 解决方法 将为空的数据都设为0 查询资料时 tinyint在查询时会自动对应成bool类型 问题原因时数据不可为空 即使设置成bool
  • Hi3516Dv300 平台使用MIPI Tx点屏

    背景 公司新做了一块3516Dv300的开发板 其中有MIPI Tx接口 刚好公司库房还有好几百块的LCD屏 LCD屏是800x480的 还是原装屏 不用掉怪可惜的了 所以就让硬件的同事化了个转接板 使用的芯片是ICN6211 这货最大分辨
  • python pip 安装 删除缓存(cache)

    今天pip安装包时 一直使用缓存 非常不爽 pip删除缓存 安装操作 pip no cache dir install 包名 If using pip 6 0 or newer try adding the no cache dir opt
  • 什么是1G/2G/3G/4G/5G

    什么是1G 2G 3G 4G 5G 参考 http www 360doc com content 14 1213 22 5458405 432718054 shtml 介绍 1G 表示第一代移动通讯技术 在20世纪80年代前期 第一代模拟制
  • CIMCO Edit2022安装教程(非常详细)从零基础入门到精通,看完这一篇就够了(附安装包)

    软件下载 软件 CIMCO Edit 版本 2022 语言 简体中文 大小 251 79M 安装环境 Win11 Win10 Win8 Win7 硬件要求 CPU 2 0GHz 内存 4G 或更高 下载通道 百度网盘丨64位下载链接 htt
  • CommonJS是啥东西嘞

    AMD AMD要用define定一个模块 define dep1 dep2 function dep1 dep2 return function 包目录 package json包 bin用于可的目 lib用于JavaScript的目 do
  • sqli-libs基础篇总结(1-22)

    1 关于sqli labs 这个是sql注入的靶场 可以在git上下载 2 题目简介 前面的1 22题都是sql注入的基础题目 覆盖范围很广 不过都是针对mysql数据库的 1 4题 union注入 5 8题 布尔盲注 9 10题 延时盲注
  • sql server备份及导出表数据和结构

    一 备份表数据及结构 select into new table name from old tablename 二 导出表数据及结构 1 选中要导出的数据库 gt 任务 gt 生成脚本 或者在任务里面有生成脚本这个选项 好好找找能找到的
  • 高清变脸更快更逼真!比GAN更具潜力的可逆生成模型来了

    昨天上市即破发的小米 今天上午股价大涨近10 这下雷军要笑了 而且可以笑得更灿烂 更灿烂是什么样 来 我们用OpenAI刚刚发布的人工智能技术 给大家展示一下 当然这个最新的技术 不止这点本事 它的 想象力 很强大的说 比如 留胡子的硬汉版
  • 关于eclipse项目栏关闭项目不想再看到

    前言 如果你用是什么IntelliJ IDEA我这篇文章你就不用看了 我的建议还是用IDEA我也喜欢用 但是因为我们老师电脑卡的原因 这个编辑器比较吃配置所以用的eclipse 以前还用的myeclipse虽然我对编辑器没什么要求 但是我用
  • Jmeter常用线程组设置策略

    一 前言 在JMeter压力测试中 我们时常见到的几个场景有 单场景基准测试 单场景并发测试 单场景容量测试 混合场景容量测试 混合场景并发测试以及混合场景稳定性测试 在本篇文章中 我们会用到一些插件 在这边先给大家列出 Custom Th
  • Java多线程详解(线程同步)

    嗨喽 小伙伴们我来了 上一章 我们通过几个例子 点击跳转 介绍了线程安全问题 而说到线程安全就不得不提到线程同步 它是解决线程安全的一种重要方法 本章就来简单地介绍一下线程同步 从上一章的学习我们知道 当多个线程操作一个资源的时候 有可能由
  • 远程代码执行

    远程代码执行 远程代码执行 Remote Code Execute 远程命令执行 Remote Command Execute 1 为啥要远程执行代码 路由器 防火墙 入侵检测等设备的web管理界面 自动化运维的管理系统 提供给用户一个接口
  • antV实现离线中国2D地图并叠加拓扑(一)

    业务背景 中国地图铺满屏幕 屏幕的中间部分动态展示当前区域地图 当前区域有可能是省 市 县等 需要在当前区域展示拓扑站点 并实时弹出小面板展示当前站点详情 实现方式 antv G6实现拓扑图 antv L7绘制地图 本身L7是可以实现动态标
  • java生成6位随机数

    生成6位随机数 不会是5位或者7位 仅只有6位 System out println int Math random 9 1 100000 同理 生成5位随机数 System out println int Math random 9 1
  • 雷军也入局了...

    风口理论的发明者雷总最近也杀入大模型 AI领域了 早在10多天前雷军在微博就发过一段话 这段话其实已经暗示了雷军和他的小米已经在研发大模型产品了 相信要不了多久小米的大模型产品就会面世 这下国内几乎所有互联网巨头都杀入了大模型领域 同时还有
  • 2011年,移动互联网加速蔓延 – 来自2011移动开发者大会

    2011移动开发者大会 这是第二届移动开发者大会了 这一年来移动互联网各个领域蔓延开来 蔓延这个词是开复老师演讲的主题 从事塞班开发的请举手 举手者寥寥 记得在去年移动开发者大会上 举手者还有一些 经过一年的蔓延 塞班虽然仍然占有较大的份额
  • hadoop之YARN

    在YARN中 资源调度器 Scheduler 是ResourceManager中的重要组件 主要负责对整个集群 CPU 内存 的资源进行分配和调度 分配以资源Container的形式分发到各个应用程序中 如MapReduce作业 应用程序与
  • 一种消息和任务队列——beanstalkd

    beanstalkd 是一个轻量级消息中间件 其主要特性 基于管道 tube 和任务 job 的工作队列 work queue d 管道 tube tube类似于消息主题 topic 在一个beanstalkd中可以支持多个tube 每个t