spring整合RocketMQ

2023-11-16

1、看官方javademo
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip
下载下来
spring-rokectmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="rocketmqProduct" class="org.apache.rocketmq.client.producer.DefaultMQProducer" init-method="start"
          destroy-method="shutdown">
        <property name="producerGroup" value="producer1"/>
        <property name="namesrvAddr" value="192.168.0.103:9876"/>
    </bean>

    <bean id="consumerSpringListener" class="com.ju.biz.mq.rokectmq.quickstartspring.ConsumerSpringListener" />

    <bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown">
        <property name="consumerGroup" value="concurrent_consumer"/>
        <property name="namesrvAddr" value="192.168.0.103:9876"/>
        <property name="messageListener" ref="consumerSpringListener"/>
        <property name="subscription">
            <map>
                <entry key="TopicTest">
                    <value>*</value>
                </entry>
            </map>
        </property>
    </bean>
</beans>

ProducerSpring

package com.ju.biz.mq.rokectmq.quickstartspring;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * Created by tao on 2018/5/24.
 */
public class ProducerSpring {
    public static void main(String[] args) throws InterruptedException {
        ApplicationContext context = new ClassPathXmlApplicationContext("/spring/spring-rokectmq.xml");
        DefaultMQProducer producer = (DefaultMQProducer) context.getBean("rocketmqProduct");
        for (int i = 0; i < 1000; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
    }
}

ConsumerSpringListener

package com.ju.biz.mq.rokectmq.quickstartspring;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by tao on 2018/5/24.
 */
public class ConsumerSpringListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spring整合RocketMQ 的相关文章

  • 13 SpringBoot整合RocketMQ实现过滤消息-根据SQL表达式过滤消息

    SQL表达式方式可以根据发送消息时输入的属性进行一些计算 RocketMQ的SQL表达式语法 只定义了一些基本的语法功能 数字比较 如 gt gt lt lt BETWEEN 字符比较 如 lt gt IN IS NULL or IS NO
  • 消息队列 RocketMQ:(九)消息重试

    文章目录 消息队列 RocketMQ 一 概述 消息队列 RocketMQ 二 系统架构 消息队列 RocketMQ 三 发送普通消息 三种方式 消息队列 RocketMQ 四 顺序消息 消息队列 RocketMQ 五 延时消息 消息队列
  • rocketmq搭建2m2s踩坑版

    搭建过程中遇到了些问题百度了很久终于东拼西凑成功解决了 看看成果 console完美运行 代码生产消费也是正常的 言归正传开始搭建 0 服务器环境介绍 没有将master与对应的slave安装在同一个节点 如果在一个节点挂了master就挂
  • Centos7下基于jdk11 安装RocketMQ

    1 简介 RocketMQ是阿里巴巴中间件团队自研的一款高性能 高吞吐量 低延迟 高可用 高可靠 具备金融级稳定性 的分布式消息中间件 开源后并于2016年捐赠给Apache社区孵化 目前已经成为了 Apache顶级项目 当前在国内被广泛的
  • RocketMQ基础概念

    目录 1 简介 2 架构 3 核心概念 1 简介 RocketMQ 是一款开源的分布式消息中间件 最初由阿里巴巴集团开发并开源 它旨在为分布式系统提供可靠 高性能 可扩展的消息通信能力 RocketMQ和RabbitMQ KAFKA一起并列
  • rocketMQ记录

    https segmentfault com a 1190000017841402 停止命令 sh bin mqshutdown namesrv sh bin mqshutdown broker
  • rocketmq安装、启动

    1 下载 gt wget http mirror bit edu cn apache rocketmq 4 4 0 rocketmq all 4 4 0 source release zip gt unzip rocketmq all 4
  • rockemq创建topic

    sh mqadmin updateTopic n sms pro 007 9876 sms pro 008 9876 c DefaultCluster t smsFrontSmsMq 10 w 4 r 4
  • RocketMQ Pull和Push

    在rocketmq里 consumer被分为2类 MQPullConsumer和MQPushConsumer 其实本质都是拉模式 pull 即consumer轮询从broker拉取消息 区别是 push方式里 consumer把轮询过程封装
  • 消息中间件 RocketMQ 源码解析:Message拉取&消费(上)

    摘要 原创出处 http www iocoder cn RocketMQ message pull and consume first 芋道源码 欢迎转载 保留摘要 谢谢 本文主要基于 RocketMQ 4 0 x 正式版 1 概述 2 C
  • RocketMQ第五篇 RocketMQ API基本使用

    目录 生产者Product 消费者Consumer 前面已经学习了Rocket的基本知识 以及搭建MQ单机版和集群环境 下面开始进行实际开发 根据前面下载的RocketMQ源码 开展讲解RocketMQ 的基本使用 生产者Product 在
  • RocketMq存储设计——Index file

    RocketMq存储设计 Index file index file设计 rocket mq存储设计
  • 【RocketMQ】消息重试、重试次数设置、死信队列

    文章目录 1 死信队列 1 1 死信特性 1 2 查看死信消息 2 重试次数参数 2 1 Producer端重试 2 2 Consumer端重试 3 1 异常重试 3 2 超时重试 参考 1 死信队列 上一篇 RocketMQ 消息重试中我
  • 22道常见RocketMQ面试题以及答案

    面试宝典到手 搞定面试 不再是难题 系列文章传送地址 请点击本链接 1 RocketMQ是什么 2 RocketMQ有什么作用 3 RoctetMQ的架构 4 RoctetMQ的优缺点 8 消息过滤 如何实现 9 消息去重 如果由于网络等原
  • RocketMQ占用内存过大的解决方法

    目录 一 问题描述 二 解决方法 1 runserver sh 修改 2 runbroker sh 修改 一 问题描述 RocketMQ 启动后 一下子把内存撑爆了 二 解决方法 修改启动参数 分别对 bin 目录下的 runserver
  • 1 RocketMQ简介

    简介 RocketMQ是由阿里捐赠给Apache的一款低延迟 高并发 高可用 高可靠的分布式消息中间件 经历了淘宝双十一的洗礼 RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力 同时也具备互联网应用所需的海量消息堆积 高吞吐
  • RocketMQ-高级原理

    本节讲解下当MQ消息消费失败 或者发送不成功时如何处理消息 消息发送不成功一般存在于几种情况 网络原因 服务宕机 或者broker配置 消息发送失败 如果是由于broker配置原因 可以通过报错提示排查原因 无法查到路由信息 一般考虑到ro
  • 分布式消息队列RocketMQ--事务消息--解决分布式事务的最佳实践

    分布式消息队列RocketMQ 事务消息 解决分布式事务的最佳实践 标签 事务消息exactlyRocketMQKafka分布式消息队列 2016 12 23 22 08 7789人阅读 评论 8 收藏 举报 分类 分布式消息队列Rocke
  • RocketMQ概论

    目录 前言 1 概述 2 下载安装 集群搭建 3 消息模型 4 如何保证吞吐量 4 1 消息存储 4 1 1顺序读写 4 1 2 异步刷盘 4 1 3 零拷贝 4 2 网络传输 前言 RocketMQ的代码示例在安装目录下有全套详细demo
  • RocketMQ源码(26)—DefaultMQPushConsumer事务消息源码【一万字】

    事务消息是RocketMQ的一大特性 其被用来实现分布式事务 关于RocketMQ的事务消息的相关原理的介绍见这篇博客 RocketMQ的分布式事务机制 事务消息 关于事务消息的基本案例看这里 消息事务样例 本文主要介绍RocketMQ的事

随机推荐