1、环境搭建
引入pom:
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置yaml:
server:
port: 9090
servlet:
context-path: /rabbit
spring:
application:
name: rabbit
# rabbitmq配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /
# 消息开启手动确认
listener:
direct:
acknowledge-mode: manual
常量类配置:
配置使用过程中可能用到的常量
public class RabbitmqContext {
/**
* 工作队列模式
*/
public static String QUEUE_WORK = "queue_work";
/**
* 订阅发布模式
*/
public static String QUEUE_FANOUT_ONE = "queue_fanout_one";
public static String QUEUE_FANOUT_TWO = "queue_fanout_two";
public static String EXCHANGE_FANOUT = "exchange_fanout";
/**
* 路由模式
*/
public static String QUEUE_DIRECT_ONE = "queue_direct_one";
public static String QUEUE_DIRECT_TWO = "queue_direct_two";
public static String EXCHANGE_DIRECT = "exchange_direct";
public static String ROUTING_DIRECT_ONE = "routing_direct_one";
public static String ROUTING_DIRECT_TWO = "routing_direct_two";
public static String ROUTING_DIRECT_THREE = "routing_direct_three";
/**
* 主题模式
*/
public static String QUEUE_TOPIC_ONE = "queue_topic_one";
public static String QUEUE_TOPIC_TWO = "queue_topic_two";
public static String EXCHANGE_TOPIC = "exchange_topic";
public static String ROUTING_TOPIC_ONE = "topic.one";
public static String ROUTING_TOPIC_TWO = "topic.one.two";
/**
* 手动确认机制演示
*/
public static String QUEUE_ACK = "queue_ack";
/**
* 延时队列模式
*/
public static String QUEUE_DELAY = "delay_queue";
public static String EXCHANGE_DELAY = "delay_exchange";
public static String ROUTING_DELAY = "delay";
}
消息监听与配置:
整合SpringBoot后,通过RabbitListener
监听器与Config
配置实现消息的监听(消费);
消息发送:
// 引入RabbitTemplate
@Resource
private RabbitTemplate rabbitTemplate;
// 发送消息
rabbitTemplate.convertAndSend(String routingKey, Object object);
rabbitTemplate.convertSendAndReceive(Object object);
发送消息时,通过**rabbitTemplate.convertAndSend()或者rabbitTemplate.convertSendAndReceive()**方法进行发送,区别在于:
- convertAndSend: 消息没有顺序,不管是否消费者是否确认,会一直发送消息;
- convertSendAndReceive: 按照一定的顺序,只有确定消费者接收到消息,才会发送下一条信息,每条消息之间会有间隔时间;
2、队列模式
Controller:
@RestController
public class RabbitmqController {
@Resource
private RabbitTemplate rabbitTemplate;
@PostMapping("/sendWork")
public Object sendWordQueue() {
String msg = "工作模式消息,ID:";
// 绑定队列发送消息
for (int i = 1; i <= 10; i++) {
String sendMsg = msg + i;
rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_WORK, sendMsg);
}
return "发送成功...";
}
}
Config:
@Configuration
public class WorkConfig {
/**
* 配置队列名称 - 工作模式
*
* @return
*/
@Bean
public Queue queueWork() {
return new Queue(RabbitmqContext.QUEUE_WORK);
}
}
Listener:
@Component
@Slf4j
public class WorkReceiveListener {
/**
* @param msg 接收的文本消息
* @param channel 通道信息
* @param message 附加的参数信息
*/
@RabbitListener(queues = "queue_work")
public void receiveQueueWork1(String msg, Channel channel, Message message) {
log.info("消费者01-接收到消息:" + msg);
}
@RabbitListener(queues = "queue_work")
public void receiveQueueWork2(String msg, Channel channel, Message message) {
log.info("消费者02-接收到消息:" + msg);
}
}
结果:
3、发布订阅模式
Controller:
@RestController
public class RabbitmqController {
@Resource
private RabbitTemplate rabbitTemplate;
@PostMapping("/sendFanout")
public String sendFanout() {
String msg = "发布|订阅模式消息,ID:";
for (int i = 1; i <= 10; i++) {
String sendMsg = msg + i;
// 绑定交换机发送消息,路由key为 ""
rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_FANOUT, "", sendMsg);
}
return "发送成功...";
}
}
Config:
@Configuration
public class FanoutConfig {
/**
* 配置队列名称 - 发布订阅模式
*
* @return
*/
@Bean
public Queue queueFanoutOne() {
return new Queue(RabbitmqContext.QUEUE_FANOUT_ONE);
}
@Bean
public Queue queueFanoutTwo() {
return new Queue(RabbitmqContext.QUEUE_FANOUT_TWO);
}
/**
* 定义FanoutExchange类型的交换机
*
* @return
*/
@Bean
public FanoutExchange exchangeFanout() {
return new FanoutExchange(RabbitmqContext.EXCHANGE_FANOUT);
}
/**
* 将交换机和队列进行绑定
*
* @return
*/
@Bean
public Binding bindingExchangeOne() {
return BindingBuilder.bind(queueFanoutOne()).to(exchangeFanout());
}
@Bean
public Binding bindingExchangeTwo() {
return BindingBuilder.bind(queueFanoutTwo()).to(exchangeFanout());
}
}
Listener:
@Component
@Slf4j
public class FanoutReceiveListener {
/**
* @param msg 接收的文本消息
* @param channel 通道信息
* @param message 附加的参数信息
*/
@RabbitListener(queues = "queue_fanout_one")
public void consumerOne1(String msg, Channel channel, Message message) {
System.out.println("queue_fanout_one队列 消费者1:收到消息:" + msg);
}
@RabbitListener(queues = "queue_fanout_one")
public void consumerOne2(String msg, Channel channel, Message message) {
System.out.println("queue_fanout_one队列 消费者2:收到消息:" + msg);
}
/**
* -------------一个队列绑定两个消费者 --------------------------------
*/
@RabbitListener(queues = "queue_fanout_two")
public void consumerTwo1(String msg, Channel channel, Message message) {
System.out.println("queue_fanout_two队列 消费者1:收到消息:" + msg);
}
@RabbitListener(queues = "queue_fanout_two")
public void consumerTwo2(String msg, Channel channel, Message message) {
System.out.println("queue_fanout_two队列 消费者2:收到消息:" + msg);
}
}
结果:
4、路由模式
路由模式与发布订阅模式相同,就是定义了路由,在将队列与交换机绑定时 以及 发送消息时设置路由名称
Controller:
@RestController
public class RabbitmqController {
@Resource
private RabbitTemplate rabbitTemplate;
@PostMapping("/sendDirect")
public String sendDirect() {
String msg = "路由模式消息,ID:";
for (int i = 1; i <= 12; i++) {
String sendMsg = null;
String routingKey = "";
if (i % 2 == 0) {
sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_TWO;
routingKey = RabbitmqContext.ROUTING_DIRECT_TWO;
} else if (i % 3 == 0) {
sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_THREE;
routingKey = RabbitmqContext.ROUTING_DIRECT_THREE;
} else {
sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_DIRECT_ONE;
routingKey = RabbitmqContext.ROUTING_DIRECT_ONE;
}
// 绑定交换机,并且设置 路由Key 发送消息
rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_DIRECT, routingKey, sendMsg);
}
return "发送成功...";
}
}
Config:
@Configuration
public class DirectConfig {
/**
* 队列一
*
* @return
*/
@Bean
public Queue directQueueOne() {
return new Queue(RabbitmqContext.QUEUE_DIRECT_ONE);
}
/**
* 队列二
*
* @return
*/
@Bean
public Queue directQueueTwo() {
return new Queue(RabbitmqContext.QUEUE_DIRECT_TWO);
}
/**
* 定义交换机 direct类型
*
* @return
*/
@Bean
public DirectExchange myDirectExchange() {
return new DirectExchange(RabbitmqContext.EXCHANGE_DIRECT);
}
/**
* 队列 绑定到交换机 再指定一个路由键
* directQueueOne() 会找到上方定义的队列bean
*
* @return
*/
@Bean
public Binding directExchangeOne() {
return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_ONE);
}
@Bean
public Binding directExchangeTwo() {
return BindingBuilder.bind(directQueueOne()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_TWO);
}
/**
* 队列 绑定到交换机 再指定一个路由键
*
* @return
*/
@Bean
public Binding directExchangeThree() {
return BindingBuilder.bind(directQueueTwo()).to(myDirectExchange()).with(RabbitmqContext.ROUTING_DIRECT_THREE);
}
}
Listener:
@Component
@Slf4j
public class DirectReceiveListener {
/**
* @param msg 接收的文本消息
* @param channel 通道信息
* @param message 附加的参数信息
*/
@RabbitListener(queues = "queue_direct_one")
public void consumerOne1(String msg, Channel channel, Message message) {
System.out.println("queue_direct_one队列 消费者1:收到消息:" + msg);
}
@RabbitListener(queues = "queue_direct_one")
public void consumerTwo(String msg, Channel channel, Message message) {
System.out.println("queue_direct_one队列 消费者2:收到消息:" + msg);
}
@RabbitListener(queues = "queue_direct_two")
public void consumerOne(String msg, Channel channel, Message message) {
System.out.println("queue_direct_two队列 消费者1:收到消息:" + msg);
}
@RabbitListener(queues = "queue_direct_two")
public void consumerDirect(String msg, Channel channel, Message message) {
System.out.println("queue_direct_two队列 消费者2:收到消息:" + msg);
}
}
结果:
5、主题模式
与路由模式的区别在于:可以通过不同规则匹配多个路由;
Controller:
@RestController
public class RabbitmqController {
@Resource
private RabbitTemplate rabbitTemplate;
@PostMapping("/sendTopic")
public String sendTopic() {
String msg = "消息ID:";
for (int i = 1; i <= 10; i++) {
String sendMsg = null;
String routingKey = "";
if (i % 2 == 0) {
sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_TWO;
routingKey = RabbitmqContext.ROUTING_TOPIC_TWO;
} else {
sendMsg = msg + i + ",路由为:" + RabbitmqContext.ROUTING_TOPIC_ONE;
routingKey = RabbitmqContext.ROUTING_TOPIC_ONE;
}
// 绑定交换机,并且设置 路由Key 发送消息
rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_TOPIC, routingKey, sendMsg);
}
return "发送成功...";
}
}
Config:
@Configuration
public class TopicConfig {
/**
* 队列定义
*
* @return
*/
@Bean
public Queue topicQueueOne() {
return new Queue(RabbitmqContext.QUEUE_TOPIC_ONE);
}
/**
* 队列定义
*
* @return
*/
@Bean
public Queue topicQueueTwo() {
return new Queue(RabbitmqContext.QUEUE_TOPIC_TWO);
}
/**
* 定义 TopicExchange 类型交换机
*
* @return
*/
@Bean
public TopicExchange exchangeTopic() {
return new TopicExchange(RabbitmqContext.EXCHANGE_TOPIC);
}
/**
* 队列一绑定到交换机 且设置路由键为 topic.#
*
* @return
*/
@Bean
public Binding bindingTopic1() {
return BindingBuilder.bind(topicQueueOne()).to(exchangeTopic()).with("topic.#");
}
/**
* 队列二绑定到交换机 且设置路由键为 topic.*
*
* @return
*/
@Bean
public Binding bindingTopic2() {
return BindingBuilder.bind(topicQueueTwo()).to(exchangeTopic()).with("topic.*");
}
}
Listener:
@Component
@Slf4j
public class TopicReceiveListener {
/**
* @param msg 接收的文本消息
* @param channel 通道信息
* @param message 附加的参数信息
*/
@RabbitListener(queues = "queue_topic_one")
public void listenOne1(String msg, Channel channel, Message message) {
System.out.println("queue_topic_one队列 消费者1,路由匹配:topic.#,收到消息:" + msg);
}
@RabbitListener(queues = "queue_topic_one")
public void listenOne2(String msg, Channel channel, Message message) {
System.out.println("queue_topic_one队列 消费者2,路由匹配:topic.#,收到消息:" + msg);
}
@RabbitListener(queues = "queue_topic_two")
public void listenTwo1(String msg, Channel channel, Message message) {
System.out.println("queue_topic_two队列 消费者1,路由匹配:topic.*,收到消息:" + msg);
}
@RabbitListener(queues = "queue_topic_two")
public void listenTwo2(String msg, Channel channel, Message message) {
System.out.println("queue_topic_two队列 消费者2,路由匹配:topic.*,收到消息:" + msg);
}
}
结果:
6、消息手动应答机制
说明:
通过配置自定义的SimpleRabbitListenerContainerFactory
,并且在监听器中通过@RabbitListener
注解通过containerFactory
属性设置的SimpleRabbitListenerContainerFactory
,比如:@RabbitListener(containerFactory = “listenerContainerFactory”)
yml配置:
在原有的yaml
基础上追加,以下配置:
spring:
rabbitmq:
# 确认消息已发送到队列 return
publisher-returns: true
# 开启消息确认机制 confirm 异步
publisher-confirm-type: correlated
listener:
direct:
# 消息开启手动确认
acknowledge-mode: manual
# 拒绝消息是否重回队列
default-requeue-rejected: true
配置类:
@Configuration
@Slf4j
public class RabbitConfig {
@Bean("listenerContainerFactory")
public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 如果需要批量确认消息,则做以下设置
// 设置批量
// factory.setBatchListener(true);
// 设置BatchMessageListener生效
// factory.setConsumerBatchEnabled(true);
// 设置批量确认数量
// factory.setBatchSize(10);
//设置消息为手动确认
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
Controller:
@RestController
public class RabbitmqController {
@Resource
private RabbitTemplate rabbitTemplate;
@PostMapping("/sendAck")
public String sendAck() {
Object msg = "这是手动确认机制的消息";
rabbitTemplate.convertAndSend(RabbitmqContext.QUEUE_ACK, msg);
return "发送成功...";
}
}
Config:
@Configuration
public class AckConfig {
/**
* 配置队列名称 - 工作模式
*
* @return
*/
@Bean
public Queue queueConfirm() {
return new Queue(RabbitmqContext.QUEUE_ACK);
}
}
Listener:
@Component
@Slf4j
public class AckReceiveListener {
/**
* @param msg 接收的文本消息
* @param channel 通道信息
* @param message 附加的参数信息
*/
@RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")
public void queueAck(String msg, Channel channel, Message message) {
try {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 批量手动确认机制
*
* @param messages
* @param channel
*/
// @RabbitListener(queues = "queue_ack", containerFactory = "listenerContainerFactory")
// public void queueAck(List<Message> messages, Channel channel) {
// try {
// for (Message message : messages) {
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
// String msg = new String(message.getBody(), StandardCharsets.UTF_8);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// System.out.println("手动确认机制,tag:" + deliveryTag + ",接收到消息:" + msg);
// channel.basicAck(deliveryTag, false);
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
}
7、回调函数-确认机制(发布确认模式)
通过配置消息确认机制,可以监听到:消息发送的情况,消息接收的情况;
yaml:
在原有yaml
的基础上,追加以下配置:
spring:
rabbitmq:
# 开启消息回退 return
publisher-returns: true
# 开启消息确认机制 confirm 异步
publisher-confirm-type: correlated
配置类:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: LiHuaZhi
* @Date: 2021/10/24 23:06
* @Description:
**/
@Configuration
@Slf4j
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
/*
* 触发条件:
* 1、消息推送到server,但是在server里找不到交换机
* 2、消息推送到server,找到交换机了,但是没找到队列
* 3、消息推送到sever,交换机和队列啥都没找到
* 4、消息推送成功
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("ConfirmCallback-------消息成功推送到MQ");
} else {
log.error("ConfirmCallback------消息推送到MQ失败,原因:{}", cause);
}
});
/*
* 触发条件:
* 1、消息推送到server,找到交换机了,但是没找到队列,被交换机退回;
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}", new
String(message.getBody()), exchange, replyText, routingKey);
});
return rabbitTemplate;
}
}
使用:
当消息发送后,会自动回调RabbitConfig
配置类中的rabbitTemplate.setConfirmCallback
方法;
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/18139.html