1.Spring中rabbitmq配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
<!--连接服务配置-->
<rabbit:connection-factory id="connectionFactory" addresses="${ramq.host}"
username="${ramq.username}" password="${ramq.password}"
virtual-host="${ramq.virtualhost}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!--本地配置-->
<!--<rabbit:connection-factory id="connectionFactory" addresses="127.0.0.1:5672"-->
<!--username="guest" password="guest" />-->
<!--<rabbit:admin connection-factory="connectionFactory" />-->
<!--声明erp的topic交换机-->
<rabbit:topic-exchange name="amq.topic">
<rabbit:bindings>
<!--下单-->
<rabbit:binding pattern="xxxx.xxxx" queue="queue.name" ></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 定义消费者监听器 -->
<!-- 创建一个bean实例,bean实例中声明处理请求的类 -->
<bean id="consumerLitener" class="xxxx.LoginMqListener"></bean>
<!--下订单消息处理-->
<bean id="orderMqListener" class="xxxx.OrderMqListener"/>
<!--定义相关queue beg-->
<!--下单-->
<rabbit:queue name="queue.name" durable="true" auto-delete="false" exclusive="false" />
<!--定义相关queue end-->
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="auto" message-converter="jackson2JsonMessageConverter">
<!-- queues属性从那个队列中接收消息,ref属性是当存在消息是使用哪个类去处理 -->
<!--<rabbit:listener queues="queue_dengsz1" ref="consumerLitener"/>-->
<!--区分topic订阅 method="listenOrderMsg"-->
<!--订单事件监听-->
<rabbit:listener queue-names="queue.name"
ref="orderMqListener"/>
</rabbit:listener-container>
<!-- 订阅登陆消息队列 end -->
<!-- 定义消息转换器 -->
<bean id="jackson2JsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
</beans>
2.spring中代码调用
@Service@Slf4jpublic class OrderCancelListener implements MessageListener { @Autowired
private MqOrderService mqOrderService; @Override
public void onMessage(Message message) { if (null == message.getBody()) {
log.error("rabbitmq订单取消msg.getBody()为空"); return;
}
try {
String content = new String(message.getBody());
log.info("rabbitmq订单取消消息beg:{}", content);
JSONObject jsonObject = JSONObject.parseObject(content);
Object object = jsonObject.get("Properties");
log.info("rabbitmq订单取消end,content="+new String(message.getBody()));
} catch (Exception e) {
log.error("rabbitmq订单取消error,msg=[{}]",message.toString(),e);
return;
} return;
}
}
3.SpringBoot中rabbbitmq配置
#配置mq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#虚拟host可以不进行设置spring.rabbitmq.virtual-host=/
#设置回调确认#spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
4.Springboot中声明交换机和队列
@Configurationpublic class TopicRabbitConfig { //绑定键
public final static String man = "topic.order";//0
public final static String woman = "topic.cancel.order";//1
public final static String order = "buriedPoint.ordered";//2
public final static String pay = "PaySuccessWebOrder";//3
public final static String enterClass = "enterClass";//4
public final static String exitClass = "exitClass";//5
public final static String buriedPoint = "buriedPoint.orderCanceld";//6
public final static String testPoint = "test.Point";//6
/*消息确认队列*/
public final static String CONFIRM="topic.messages"; @Bean
public Queue confirmQueue() { return new Queue(TopicRabbitConfig.CONFIRM);
} @Bean
public Queue firstQueue() { return new Queue(TopicRabbitConfig.man);
} @Bean
public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman);
} @Bean
public Queue Queue3() { return new Queue(TopicRabbitConfig.order);
} @Bean
public Queue QueueEnterClass() { return new Queue(TopicRabbitConfig.enterClass);
} @Bean
public Queue QueueExitClass() { return new Queue(TopicRabbitConfig.exitClass);
} @Bean
public Queue QueuePay() { return new Queue(TopicRabbitConfig.pay);
} @Bean
public Queue QueueBuried() { return new Queue(TopicRabbitConfig.buriedPoint);
} @Bean
TopicExchange exchange() { return new TopicExchange("topicExchange");
} @Bean
TopicExchange exchange2() { return new TopicExchange("topicExchange2");
} //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
} //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
//这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage3() { return BindingBuilder.bind(Queue3()).to(exchange()).with(order);
} //新增
@Bean
Binding bindingExchangeMessage4() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(pay);
} @Bean
Binding bindingExchangeMessage5() { return BindingBuilder.bind(QueueEnterClass()).to(exchange()).with(enterClass);
} @Bean
Binding bindingExchangeMessage6() { return BindingBuilder.bind(QueueExitClass()).to(exchange()).with(exitClass);
} @Bean
Binding bindingExchangeMessage7() { return BindingBuilder.bind(QueuePay()).to(exchange()).with(pay);
} @Bean
Binding bindingExchangeMessage() { return BindingBuilder.bind(QueueBuried()).to(exchange()).with(buriedPoint);
} @Bean
Binding bindingExchangeMessage8() { return BindingBuilder.bind(QueueBuried()).to(exchange2()).with(testPoint);
} //进行绑定确认queue
@Bean
Binding bindingConfirmExchange() { return BindingBuilder.bind(confirmQueue()).to(exchange()).with(CONFIRM);
}
}
5.写消息
@RestControllerpublic class SendMessageController { @Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendDirectMessage") public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime); //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); return "ok";
} @GetMapping("/sendTopicMessage1") public String sendTopicMessage1() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: M A N ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("passport_exchange", "topic.order", manMap); return "ok";
} @GetMapping("/sendTopicMessage2") public String sendTopicMessage2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.cancel.order", womanMap); return "ok";
} @GetMapping("/sendTopicMessage3") public String sendTopicMessage3() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "regServiceTest.buriedPoint", womanMap); return "ok";
} /**
* @Author: 郭佳
* @Params:
* @Description:接收topic消息
* @Return:
* @Date: 2020-12-22 10:42
*/
@GetMapping("/sendTopic") public String sendTopic(String topic) {
String messageId = String.valueOf(UUID.randomUUID());// String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", topic);
womanMap.put("createTime", createTime);
womanMap.put("msg", "这是个测试"); //TestDirectExchange topicExchange
rabbitTemplate.convertAndSend("TestFanoutExchange", topic, womanMap);// rabbitTemplate.convertAndSend("topicExchange2", topic, womanMap);
return "ok";
}
}
5.读消息
@Componentpublic class DirectReceiver { //消息确认消费
@RabbitListener(queues = "topic.messages")// @RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " + message);
} // @RabbitHandler
@RabbitListener(queues = "TestFanoutQueue")//监听的队列名称 TestDirectQueue
public void processFanout(Map testMessage) {
System.out.println("FanoutReceiver消费者收到消息 : " + testMessage.toString());
} // @RabbitHandler
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public void process(Map testMessage) {
System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());
} @RabbitListener(queues = "topic.man") public void receiveMessage2(Map testMessage) {
System.out.println("我是监听topic.man 的,满足 topic.# 的都过来 , " + testMessage.toString());
} @RabbitListener(queues = "topic.woman") public void receiveMessage3(Map testMessage) {
System.out.println("我是监听topic.woman 的,满足 topic.# 的都过来 , " + testMessage.toString());
}
}
原文始发于微信公众号(Java时间屋):Spring和Springboot中配置rabbitMq配置
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/251934.html