RocketMQ 消息过滤

2023-11-18

1. 简介

RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时 再做消息过滤的。
RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实 现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然 后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。
其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基 于Tag的消息过滤正式基于这个字段值的。
在这里插入图片描述
主要支持如下3种的过滤方式

(1) Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。

  1. Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给 Broker端。
  2. Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个 MessageFilter,然后传给Store。
  3. Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤。
  4. 在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,在消息消费端拉
    取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消 息消费。

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。

(2) SQL92的过滤方式:

仅对push的消费者起作用。Tag方式虽然效率高,但是支持的过滤逻辑比较简单。
SQL表达式可以更加灵活的支持复杂过滤逻辑,这种方式的大致做法和上面的Tag过滤方式一样, 只是在Store层的具体过滤过程不太一样
真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。
每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。
SQL92的表达式上下文为消息的属性。

SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

broker.conf配置如下

# 启用SQL92
enablePropertyFilter=true

(3) Filter Server方式。
这是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据 Java函数的逻辑对消息进行过滤。
要使用Filter Server,首先要在启动Broker前在配置文件里加上 filterServer-Nums=3 这样的配 置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的 Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消 息再传给远端的Consumer。
这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过 检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。

2. 基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

3. 使用样例

1、生产者样例

发送消息时,你能通过putUserProperty来设置消息的属性

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

2、消费者样例

用MessageSelector.bySql来使用sql筛选消息

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ 消息过滤 的相关文章

  • 消息队列 RocketMQ:(九)消息重试

    文章目录 消息队列 RocketMQ 一 概述 消息队列 RocketMQ 二 系统架构 消息队列 RocketMQ 三 发送普通消息 三种方式 消息队列 RocketMQ 四 顺序消息 消息队列 RocketMQ 五 延时消息 消息队列
  • rocketmq搭建2m2s踩坑版

    搭建过程中遇到了些问题百度了很久终于东拼西凑成功解决了 看看成果 console完美运行 代码生产消费也是正常的 言归正传开始搭建 0 服务器环境介绍 没有将master与对应的slave安装在同一个节点 如果在一个节点挂了master就挂
  • 分布式开放消息系统(RocketMQ)的原理与实践

    分布式开放消息系统 RocketMQ 的原理与实践 作者 CHEN川 关注 2016 02 25 15 43 字数 6784 阅读 135462 评论 49 喜欢 351 赞赏 7 一年前为了一次内部分享而写的这篇文章 没想到会有这么多人阅
  • rocketMQ记录

    https segmentfault com a 1190000017841402 停止命令 sh bin mqshutdown namesrv sh bin mqshutdown broker
  • MQClientException: CODE: 208  DESC: query message by key finished, but no message.

    2019 05 15 10 19 31 401 INFO closeChannel close the connection to remote address 127 0 0 1 10911 result true 2019 05 15
  • 面试题篇-09-RocketMQ相关面试题

    文章目录 1 RocketMQ如何保证消息有序消费 2 RocketMQ和Kafka 快的飞起 底层存储有什么不同 2 1 什么是 零拷贝 2 1 1 传统IO数据读写 2 1 2 mmap优化 3次切换 3次拷贝 2 1 3 SendFi
  • Docker实战:docker compose 搭建Rocketmq

    1 配置文件准备 1 1 新建目录 home docker data rocketmq conf mkdir home docker data rocketmq conf 1 2 在上面目录下新建文件broker conf文件 内容如下 b
  • RocketMQ源码(十三)—消费者DefaultMQPushConsumer启动主要流程源码

    此前我们学习了Broker和Producer的启动源码 以及Producer发送消息源码和Broker接收存储消息的源码 现在 我们来学习Consumer的启动以及消费消息的源码 Consumer的启动源码和Producer的启动源码还是有
  • 消息中间件 RocketMQ 源码解析:Message拉取&消费(上)

    摘要 原创出处 http www iocoder cn RocketMQ message pull and consume first 芋道源码 欢迎转载 保留摘要 谢谢 本文主要基于 RocketMQ 4 0 x 正式版 1 概述 2 C
  • RocektMQ社区"每周直播分享第8期"如约而至

    各位同学 RocektMQ社区 每周直播分享第8期 如约而至 分享题目 RocketMQ消息的过滤和重试实现原理详解 直播方式 钉钉群直播方式 群号 21791227 分享时间 2019 01 24 20 00 21 30 本周四 分享讲师
  • centos安装rocketmq

    centos安装rocketmq 1 下载rocketmq二进制包 2 解压二进制包 3 修改broker conf 4 修改runbroker sh和runserver sh的JVM参数 5 启动NameServer和Broker 6 安
  • RocketMQ-名词和架构

    RocketMQ rocketMQ是做什么的我就不用解释了吧 以及他的背景 本文主要是为了让大家明白RocketMQ的工作原理 架构图 上图 双箭头代表是双向通信 ProducerGroup和ConsumerGroup以及Broker集群
  • 【RocketMQ】消息重试、重试次数设置、死信队列

    文章目录 1 死信队列 1 1 死信特性 1 2 查看死信消息 2 重试次数参数 2 1 Producer端重试 2 2 Consumer端重试 3 1 异常重试 3 2 超时重试 参考 1 死信队列 上一篇 RocketMQ 消息重试中我
  • RocketMQ经典高频面试题大全(附答案)

    编程界的小学生 0 彩蛋 1 说说你们公司线上生产环境用的是什么消息中间件 2 多个mq如何选型 3 为什么要使用MQ 4 RocketMQ由哪些角色组成 每个角色作用和特点是什么 5 RocketMQ中的Topic和JMS的queue有什
  • RocketMQ占用内存过大的解决方法

    目录 一 问题描述 二 解决方法 1 runserver sh 修改 2 runbroker sh 修改 一 问题描述 RocketMQ 启动后 一下子把内存撑爆了 二 解决方法 修改启动参数 分别对 bin 目录下的 runserver
  • 关于rocketmq 中日志文件路径的配置

    前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住分享一下给大家 点击跳转到网站 rocketmq 中的数据和日志文件默认都是存储在user home路径下面的 往往我们都需要修改这些路径到指定文件夹以便管理 服务端日志 网
  • RocketMQ-源码解读与调试

    源码环境搭建 源码拉取 RocketMQ的官方Git仓库地址 GitHub apache rocketmq Mirror of Apache RocketMQ 可以用git把项目clone下来或者直接下载代码包 也可以到RocketMQ的官
  • RocketMQ消费者可以手动消费但无法主动消费问题,或生成者发送超时

    1 大多数是配置问题 修改rocketmq文件夹broker conf 2 配置与集群IP或本地IPV4一样 重启 在RocketMQ独享实例中支持IPv4和IPv6双栈 主要是通过在网络层面上同时支持IPv4和IPv6协议栈来实现的 Ro
  • 基于Jmeter实现Rocketmq消息发送

    在互联网企业技术架构中 MQ占据了越来越重要的地位 系统解耦 异步通信 削峰填谷 数据顺序保证等场景中 到处都能看到MQ的身影 而测试工程师在工作中 也经常需要和mq打交道 比如构造测试数据 触发某些业务场景 以及针对mq的性能测试等 目前
  • RocketMQ源码(26)—DefaultMQPushConsumer事务消息源码【一万字】

    事务消息是RocketMQ的一大特性 其被用来实现分布式事务 关于RocketMQ的事务消息的相关原理的介绍见这篇博客 RocketMQ的分布式事务机制 事务消息 关于事务消息的基本案例看这里 消息事务样例 本文主要介绍RocketMQ的事

随机推荐

  • 关于ScrollView嵌套多个RecyclerView滑动冲突,可以很流畅的滑动

    首先不建议过多的嵌套 可以采取其他方式替换 当ScrollIView内部只有一个RecyclerView的时候
  • 【HTML】基础常用标签汇总

    目录 前言 最基础的标签 常用标签 文本相关 文本格式化标签 图像标签 超链接 表格 列表 无序列表 有序列表 自定义列表 表单 基本结构 表单域 表单元素控件 前言 总所周知 一个网站 web 分为三部分 1 HTML 结构 2 CSS
  • 2022年最新Vue+electron项目创建

    一 前言 大多数文章都停留在electron vue中 但是这个库不怎么更新了 最近的更新是九月前 看了很多文章都换其他的工具来构建了 这里推荐GITHUB一个大佬写的基于vue cli的插件 构建项目非常简单 插件的GITHUB源代码 话
  • 力扣刷题26. 删除有序数组中的重复项

    采用双指针法 思路如下 class Solution public int removeDuplicates int nums if nums length 0 return 0 int p 0 int q 1 while q
  • 【华为OD机试真题2023B卷 JAVA&JS】选修课

    华为OD2023 B卷 机试题库全覆盖 刷题指南点这里 选修课 知识点字符串哈希表排序 时间限制 1s 空间限制 256MB 限定语言 不限 题目描述 现有两门选修课 每门选修课都有一部分学生选修 每个学生都有选修课的成绩 需要你找出同时选
  • OCaml学习笔记(二)——Introduction to Objective Camel

    Chapter2 Simple Expressions 2 1 注释语句 OCaml语言中注释部分写在 和 之间 可以相互嵌套 注释部分当作空格处理 2 2 基本表达式 OCaml中每个有效的表达式都有一个类型 某个类型的表达式不能用作其他
  • 使用 Microchip SAM9X60 OTP 存储板卡的MAC地址和序列号

    1 介绍 SAM9X60 处理器有部分OTP One Time Programming Aera 可用于存储user data 这样的话我们就可以将板卡 MAC Address和 SN 序列号写到固定的OTP User Area中 为什么要
  • Python--根据计算公式:BMI = 体重(kg) / 身高(m)^2判断身体状况

    Python 根据计算公式 BMI 体重 kg 身高 m 2判断身体状况 身体质量指数 BMI Body Mass Index 是国际上常用的衡量人体肥胖程度和是否健康的重要标准 主要用于统计分析 肥胖程度的判断不能采用体重的绝对值 它天然
  • ajax将响应结果显示到iframe,JavaScript:iframe / Ajax / JSON

    iframe 在Ajax流行之前大量使用 iframe中的src属性指定的就是一个独立的页面url地址 iframe中呈现的就是这个页面的内容 通过改变src的值 我们就可以轻松的改变iframe中的内容 类似的 刷新验证码也是同样的手段
  • 嵌入式Linux移植5. LED驱动移植 添加LED驱动到内核,测试程序成功运行

    第一次尝试开发移植Linux上的驱动 还是和当年单片机一样 从最简单的LED小灯开始 走走流程 试验结果 LED驱动已经编译进系统 产生 o文件 LED测试程序已经编译成功 产生可执行文件 但是由于疫情手头没板子 没法上板测试 开学之后测试
  • 电脑安装 MIUI+

    windows电脑 打开命令框 输入下面命令 winget install Xiaomi MIUI 如果是小米笔记本 还可以通过 小米帮助中心 小米商城 mi com 进行下载安装
  • hive与hbase之间数据的同步

    一 前言 数据同步是很多公司在做数据迁移时的一个痛点 当然互联网公司有自己的同步机制或者工具 但是困惑了我这几天的需求 还是没有得到解决 事已至此 来写这篇博客记录一下自己最近的研究成果 二 如何同步 hive如何与Hbase直接实现数据同
  • ABAP & DOI NACE OUTPUT

    ABAP上传文件的TCODE SMW0 OAOR 现在遇到的业务需求是在VA03的订单通过SAP标准的OUTPUT配置打印出order confirmation以后 再打印公司的文字条款内容 WORD格式的两页纸 最开始想到的实现方式是在N
  • feign和ribbon有什么区别

    ribbon和feign都是用于调用其他服务的 不过方式不同 1 启动类使用的注解不同 ribbon用的是 RibbonClient feign用的是 EnableFeignClients 2 服务的指定位置不同 ribbon是在 Ribb
  • 【ssh】xshell的替代--WindTerm

    目录 WindTerm WindTerm 简介 如何关闭锁屏密码 3 功能 3 1 选中自动复制 右键粘贴复制的内容 3 2 打开软件自动连接 3 4 设置文件下载初始目录 4 可直接编辑bash命令行 5 界面管理 资源管理器 文件管理器
  • C++中struct的用法

    废话 struct是个很有用的东西呢 进入正题 struct的用处是定义一个新的类型 而这个类型里面可以是各种各样的东西 比如 struct node 定义一个新的类型叫node int a int b 110 char c double
  • ARM - UART4/串口软件实现字符串/字符的收发

    实验任务 1 在键盘输入一个字符 字符 1 并且打印在串口工具上 键盘输入 a gt 串口工具打印 b 2 串口工具输入一个字符串 按下回车键 会显示输入的字符串 头文件 ifndef UART4 H define UART4 H incl
  • 《MySQL是怎样运行的》——读书笔记

    MySQL是怎样运行的 小孩子4919 MySQL B 树 1 数据页 数据页之间双向链接 数据页内record单向链表 数据页内record分为多个组 每个组的最大记录组成数据槽 数据槽采用数组方式在页内存储 2 索引 索引记录为页的最小
  • nacos 安装并配置外部数据库

    参考链接 nacos 安装并配置外部数据库 亲测2 0 1 2 0 3 有效 zwb 121 博客园 Nacos 快速开始 下载链接 https github com alibaba nacos releases 启动服务器 Linux U
  • RocketMQ 消息过滤

    1 简介 RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件 是在Consumer端订阅消息时 再做消息过滤的 RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实 现的