RocketMQ中ACL权限控制

2023-05-16

1、什么是ACL?

ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?

  • 用户
    用户是访问控制的基础要素,也不难理解,RocketMQ ACL必然也会引入用户的概念,即支持用户名、密码。
  • 资源
    资源,需要保护的对象,在RocketMQ中,消息发送涉及的Topic、消息消费涉及的消费组,应该进行保护,故可以抽象成资源。
  • 权限
    针对资源,能进行的操作,
  • 角色
    RocketMQ中,只定义两种角色:是否是管理员。

另外,RocketMQ还支持按照客户端IP进行白名单设置。

2、ACL基本流程图

在讲解如何使用ACL之前,我们先简单看一下RocketMQ ACL的请求流程:
在这里插入图片描述
对于上述具体的实现,将在后续文章中重点讲解,本文的目的只是希望给读者一个大概的了解。

3、如何配置ACL

3.1 acl配置文件

acl默认的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目录下。下面对其配置项一一介绍。

3.1.1 globalWhiteRemoteAddresses

全局白名单,其类型为数组,即支持多个配置。其支持的配置格式如下:


  • 表示不设置白名单,该条规则默认返回false。
  • "*"
    表示全部匹配,该条规则直接返回true,将会阻断其他规则的判断,请慎重使用。
  • 192.168.0.{100,101}
    多地址配置模式,ip地址的最后一组,使用{},大括号中多个ip地址,用英文逗号(,)隔开。
  • 192.168.1.100,192.168.2.100
    直接使用,分隔,配置多个ip地址。
  • 192.168..或192.168.100-200.10-20
    每个IP段使用 "*" 或"-"表示范围。

3.1.2 accounts

配置用户信息,该类型为数组类型。拥有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。

3.1.2.1 accessKey

登录用户名,长度必须大于6个字符。

3.1.2.2 secretKey

登录密码。长度必须大于6个字符。

3.1.2.3 whiteRemoteAddress

用户级别的IP地址白名单。其类型为一个字符串,其配置规则与globalWhiteRemoteAddresses,但只能配置一条规则。

3.1.2.4 admin

boolean类型,设置是否是admin。如下权限只有admin=true时才有权限执行。

  • UPDATE_AND_CREATE_TOPIC
    更新或创建主题。
  • UPDATE_BROKER_CONFIG
    更新Broker配置。
  • DELETE_TOPIC_IN_BROKER
    删除主题。
  • UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
    更新或创建订阅组信息。
  • DELETE_SUBSCRIPTIONGROUP
    删除订阅组信息。

3.1.2.5 defaultTopicPerm

默认topic权限。该值默认为DENY(拒绝)。

3.1.2.6 defaultGroupPerm

默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。

3.1.2.7 topicPerms

设置topic的权限。其类型为数组,其可选择值在下节介绍。

3.1.2.8 groupPerms

设置消费组的权限。其类型为数组,其可选择值在下节介绍。可以为每一消费组配置不一样的权限。

3.2 RocketMQ ACL权限可选值

  • DENY
    拒绝。
  • PUB
    拥有发送权限。
  • SUB
    拥有订阅权限。

3.3、权限验证流程

上面定义了全局白名单、用户级别的白名单,用户级别的权限,为了更好的配置ACL权限规则,下面给出权限匹配逻辑。
在这里插入图片描述

4、使用示例

4.1 Broker端安装

首先,需要在broker.conf文件中,增加参数aclEnable=true。并拷贝distribution/conf/plain_acl.yml文件到${ROCKETMQ_HOME}/conf目录。

broker.conf的配置文件如下:

brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort=10915
storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store
storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store/commitlog
namesrvAddr=127.0.0.1:9876
autoCreateTopicEnable=false
aclEnable=true

plain_acl.yml文件内容如下:

globalWhiteRemoteAddresses:

accounts:
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  topicPerms:
  - TopicTest=PUB
  groupPerms:
  # the group should convert to retry topic
  - oms_consumer_group=DENY

- accessKey: admin
  secretKey: 12345678
  whiteRemoteAddress:
  # if it is admin, it could access all resources
  admin: true

从上面的配置可知,用户RocketMQ只能发送TopicTest的消息,其他topic无权限发送;拒绝oms_consumer_group消费组的消息消费,其他消费组默认可消费。

4.2 消息发送端示例

public class AclProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook());
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("TopicTest3" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
    }
}

运行效果如图所示:
在这里插入图片描述

4.3 消息消费端示例

public class AclConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4", getAclRPCHook(),new AllocateMessageQueueAveragely());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
    }
}

发现并不没有消费消息,符合预期。

参考Demo:消费者

@Override
    public void run(String... args) throws Exception {
        System.out.println("通过实现CommandLineRunner接口,在spring boot项目启动后打印参数");

        // AccessKey 身份验证
        AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
        consumer = new DefaultMQPushConsumer(consumerGroupId, rpcHook,new AllocateMessageQueueAveragely());
        //consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        //设置 TCP 接入域名
        consumer.setNamesrvAddr(nameserverMQ);
        //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setVipChannelEnabled(false);

        consumer.setConsumeTimeout(3);
        consumer.setMessageModel(MessageModel.BROADCASTING);//广播订阅方式
        //订阅主题和 标签( * 代表所有标签)下信息
        String later = "_" + rocketmqAreacode + "_" + hikvisionId;
        consumer.subscribe(MQIOTDEV001 + later,  MqServerTypeConstant.YCCK);

        // //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // msgs中只收集同一个topic,同一个tag,并且key相同的message
            // 会把不同的消息分别放置到不同的队列中
            try {
                for (Message msg : msgs) {
                   
                    //消费者获取消息 这里只输出 不做后面逻辑处理
                    String body = new String(msg.getBody(), "utf-8");
                    log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);


                    } catch (Exception e) {
                        log.error("0x80517800消费远程操控接口失败"+e.getMessage());
                    }
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("消费者 启动成功=======");

    }

消费者:

public void sendControl(BaseResponse baseResponse) {
    	try {

            String later = "_" + rocketmqAreacode + "_" + hikvisionId;
//        		DefaultMQPushConsumer dconsumer = new DefaultMQPushConsumer("my_test_group");
            AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(remoteProperties.getAccessKey(),remoteProperties.getSecretKey()));
            DefaultMQProducer producer = new DefaultMQProducer(producerGroupIOTDEV001, rpcHook);
            producer.setNamesrvAddr(nameserverMQ);
            producer.setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());
            producer.start();

            Message msg = new Message(
                    MQIOTDEV001 + later, MqServerTypeConstant.YCCK,
                    // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
                    MqServerTypeConstant.YCCK,
                    // Message Body
                    // 任何二进制形式的数据, MQ不做任何干预,
                    // 需要Producer与Consumer协商好一致的序列化和反序列化方式
                    JsonUtils.object2Json(baseResponse).getBytes("UTF-8"));

            //同步发送消息
            producer.send(msg);
            // 在callback返回之前即可取得msgId。
            // 在应用退出前,销毁Producer对象。注意:如果不销毁也没有问题
            log.info("推送反控响应信息到rocketmq:同步发送消息");
            producer.shutdown();


        }catch (Exception e) {
            log.error("0x80517800推送响应信息到rocketmq失败:"+e.getMessage());
            e.printStackTrace();
        }
    }

 

关于RocketMQ ACL的使用就介绍到这里了,下一篇将介绍RocketMQ ACL实现原理。

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ中ACL权限控制 的相关文章

  • RocketMQ占用内存过大的解决方法

    目录 一 问题描述 二 解决方法 1 runserver sh 修改 2 runbroker sh 修改 一 问题描述 RocketMQ 启动后 一下子把内存撑爆了 二 解决方法 修改启动参数 分别对 bin 目录下的 runserver
  • rocketMq启动broker报错找不到或无法加载主类 Files\Java\jdk1.8.0_171\lib\dt.jar;C:\Program]

    假如弹出提示框提示 错误 找不到或无法加载主类 xxxxxx 1 打开runbroker cmd 将 CLASSPATH 加上英文双引号 切勿别加中文双引号 2 打开runserver cmd 同理 将 CLASSPATH 加上英文双引号
  • Failed to execute goal on project rocketmq-console-ng: Could not resolve dependencies for project

    Apache RocketMQ安装部署 Failed to execute goal on project rocketmq console ng Could not resolve dependencies for project org
  • RocketMQ的消息优先级

    有些场景 需要应用程序处理几种类型的消息 不同消息的优先级不同 RocketMQ是个先入先出的队列 不支持消息级别或者Topic级别的优先级 业务中简单的优先级需求 可以通过间接的方式解决 下面列举三种优先级相关需求的具体处理方法 第一种
  • RocketMQ消费者可以手动消费但无法主动消费问题,或生成者发送超时

    1 大多数是配置问题 修改rocketmq文件夹broker conf 2 配置与集群IP或本地IPV4一样 重启 在RocketMQ独享实例中支持IPv4和IPv6双栈 主要是通过在网络层面上同时支持IPv4和IPv6协议栈来实现的 Ro
  • 基于Jmeter实现Rocketmq消息发送

    在互联网企业技术架构中 MQ占据了越来越重要的地位 系统解耦 异步通信 削峰填谷 数据顺序保证等场景中 到处都能看到MQ的身影 而测试工程师在工作中 也经常需要和mq打交道 比如构造测试数据 触发某些业务场景 以及针对mq的性能测试等 目前
  • 如何使用 ACL 相关表获取用户可以访问的对象列表

    我正在设计一个对用户管理 权限有很多要求的系统 因此我决定使用 Spring Security ACL 来管理域对象级别的权限 尽管如此 使用 ACL 来维护用户和实体之间的关系迫使我们依赖它在 UI 上呈现数据 Spring Securi
  • Cakephp 2.0 行/记录级 Acl

    我正在摆弄 cakephp 2 0 的访问列表 到目前为止 我按照文档创建了一个非常简单的示例 我已经建立了一个用户表和最重要的功能 如索引 添加 登录ecc 并且与组表相关 每个用户属于一个组 我还创建了一个 房屋 表 其中包含不同的内容
  • 哪些源代码控制系统具有文件级权限?

    我可能会建议放弃 VSS 因为它无法在文件级别授予和拒绝权限 问题是什么源代码控制系统允许这样做 Update我将 SVN 答案标记为 正确 答案 因为它收到了最多的反馈 然而 没有正确的答案 我将根据您的所有反馈向管理层提出建议 subv
  • 应用程序引擎上的云存储和安全下载策略。 GCS acl 或 blobstore

    我的应用程序引擎应用程序创建云存储文件 这些文件将由第三方下载 这些文件包含个人医疗信息 首选下载方式是什么 使用带有用户 READER acl 的直接 GCS 下载链接 或者在应用程序引擎应用程序中使用 blob 存储下载处理程序 两种解
  • 使用 NSIS 安装程序向注册表项授予权限的有效方法是什么?

    我正在尝试使用访问控制插件 http nsis sourceforge net AccessControl plug in在 NSIS 中设置注册表项的权限 它不起作用 安装程序运行后 所有用户组没有完全控制权 我在下面创建了一个示例 这里
  • ACL 2024投递指南

    诸神缄默不语 个人CSDN博文目录 显然写这篇博客是因为我要投ACL了 TL DR ACL 2024是ARR提交制 workshop归workshop 最晚提交时间是2024年2月15号 在此之前可以随时撤回 4月15号出结果 4月20号确
  • Windows 在哪里存储 ACL,ACL 是否跟随文件从一台计算机传输到另一台计算机?

    我们的应用程序使用的组件需要在可执行文件的目录中包含许可证文件 该文件恰好是 NET WinForms 应用程序 尽管我认为这对这个问题并不重要 当安装在某些 XP Pro 计算机上时 迄今为止仅数百台计算机中的三台 该组件会引发许可证异常
  • 限制某些用户而不是其他用户对 Cloudfront(S3) 文件的访问的简单示例

    我刚刚开始了解 AWS S3 和 Cloudfront 的权限 所以请放心 两个主要问题 我想允许某些用户访问 例如 已登录的用户 但是not其他的 我假设我需要使用 ACL 而不是存储桶策略 因为前者更具可定制性 您可以使用查询参数识别
  • 授予读取 Amazon S3 存储桶内子目录的权限

    我以前从未使用过 AWS S3 我们用它来自动备份客户的通话录音 我们的一位客户出于审计目的需要访问他们的录音 我使用客户端 Cyber Duck 作为访问文件的方式 我只想让他们访问他们的文件 我们的文件结构如下 recordings 1
  • PowerShell Set-Acl New-Object:找不到“FileSystemAccessRule”的重载和参数计数:“4”

    我制定了脚本的所有部分 用于创建目录名称 根据预定义的目录结构创建目录 根据附加到硬编码名称的项目编号创建 AD 组 然后将该组添加到特定的目录 目录 并设置该组的 ACL 我似乎无法绕过该错误 New Object Cannot find
  • Neo4j 使用属性过滤器通过多个关系定向路径

    作为 Cypher 和 Neo4j 的新手 我在为我的用例构建查询时遇到问题 我正在构建一个简单的 ACL 访问控制列表 并正在寻找一条通过权限关系向上层次结构的路径 一张图或许能更好地解释它 Key Users gt Blue Group
  • 使用python检查文件夹/文件ntfs权限

    正如问题标题可能暗示的那样 我非常想知道如何检查给定文件或文件夹的 ntfs 权限 提示 这些是您在 安全 选项卡中看到的权限 基本上 我需要的是获取文件或目录的路径 在本地计算机上 或者最好在远程计算机上的共享上 并获取用户 组的列表以及
  • Symfony2 跨多个应用程序共享用户

    我有多个 symfony2 应用程序 它们共享公共实体 但使用不同的数据库设置 每个数据库都有表user user role and role 问题是 我希望该用户能够登录app1通过访问www myproject com app1 log
  • 在.Net 下为低完整性进程添加写访问权限

    我正在创建一个用于文件创建的 FileSecurity 该文件对于低完整性进程也应该具有写入访问权限 FileSecurity fileAcl new FileSecurity add everyone IdentityReference

随机推荐