1.消费者
channel.basicQos(consumer.getEndpoint().getPrefetchSize(), consumer.getEndpoint().getPrefetchCount(),
consumer.getEndpoint().isPrefetchGlobal());
public class Consumer {
private final static String QUEUE_NAME = MQConfig.MOMENTS_QUEUE;
public static void main(String[] args) {
initModule();
}
public static void initModule() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
try {
connection = connectionFactory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos(0, 3, true);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
MqMessageDispatcher.doDispatch(new String(body, "UTF-8"), channel, envelope);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
2.线程消费
public class MqMessageDispatcher {
public static Logger logger = LoggerFactory.getLogger(MqMessageDispatcher.class);
public static ExecutorService msgHandleService = Executors.newFixedThreadPool(2);
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
msgHandleService.shutdown();
}
});
}
public static void doDispatch(String message, Channel channel, Envelope envelope) {
msgHandleService.execute(new MessageHandleTask(message, channel, envelope));
}
private static class MessageHandleTask implements Runnable {
String message;
Channel channel;
Envelope envelope;
public MessageHandleTask(String message, Channel channel, Envelope envelope) {
this.message = message;
this.channel = channel;
this.envelope = envelope;
}
@Override
public void run() {
long start = System.currentTimeMillis();
logger.info("Received message: " + message);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (IOException e) {
System.err.println("fail to confirm message:" + message);
}
}
}
}
3.mqconfig
@Configuration
public class MQConfig {
public static final String MOMENTS_EXCHANGE = "moments-exchange";
public static final String MOMENTS_QUEUE = "moments-queue";
@Bean
public Queue momentsQueue() {
Map<String, Object> arguments = new HashMap<>();
Queue queue = new Queue(MOMENTS_QUEUE,true, false, false,arguments);
return queue;
}
@Bean
public FanoutExchange momentsExchange() {
return new FanoutExchange(MOMENTS_EXCHANGE);
}
@Bean
public Binding MomentsBinding() {
return BindingBuilder.bind(momentsQueue()).to(momentsExchange());
}
}
4.生产者
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(JSONObject message) {
System.out.println("推送消息:" + JSON.toJSONString(message));
rabbitTemplate.convertAndSend(MQConfig.MOMENTS_EXCHANGE, "", message);
}
public void sendMessage(String message) {
System.out.println("推送消息:" + message);
rabbitTemplate.convertAndSend(MQConfig.MOMENTS_EXCHANGE, "", message);
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)