Spring和Springboot中配置rabbitMq配置

1.Spring中rabbitmq配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
            http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">

    <!--连接服务配置-->
    <rabbit:connection-factory id="connectionFactory" addresses="${ramq.host}"
                               username="${ramq.username}" password="${ramq.password}"
                               virtual-host="${ramq.virtualhost}"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!--本地配置-->
    <!--<rabbit:connection-factory id="connectionFactory"  addresses="127.0.0.1:5672"-->
    <!--username="guest" password="guest" />-->
    <!--<rabbit:admin connection-factory="connectionFactory" />-->
    <!--声明erp的topic交换机-->
    <rabbit:topic-exchange name="amq.topic">
        <rabbit:bindings>
   
            <!--下单-->
            <rabbit:binding pattern="xxxx.xxxx"  queue="queue.name" ></rabbit:binding>
           
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!-- 定义消费者监听器 -->
    <!-- 创建一个bean实例,bean实例中声明处理请求的类 -->
    <bean id="consumerLitener" class="xxxx.LoginMqListener"></bean>
 
    <!--下订单消息处理-->
    <bean id="orderMqListener" class="xxxx.OrderMqListener"/>
   
    <!--定义相关queue beg-->
   
    <!--下单-->
    <rabbit:queue name="queue.name"   durable="true" auto-delete="false" exclusive="false"  />
  
    <!--定义相关queue end-->

    <rabbit:listener-container
            connection-factory="connectionFactory" acknowledge="auto" message-converter="jackson2JsonMessageConverter">
        <!-- queues属性从那个队列中接收消息,ref属性是当存在消息是使用哪个类去处理 -->
        <!--<rabbit:listener queues="queue_dengsz1" ref="consumerLitener"/>-->
        <!--区分topic订阅 method="listenOrderMsg"-->
     
        <!--订单事件监听-->
        <rabbit:listener  queue-names="queue.name"
                          ref="orderMqListener"/>
       
    </rabbit:listener-container>
    <!-- 订阅登陆消息队列 end   -->


    <!-- 定义消息转换器 -->
    <bean id="jackson2JsonMessageConverter"
          class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
</beans>

2.spring中代码调用

@Service@Slf4jpublic class OrderCancelListener implements MessageListener {    @Autowired
    private MqOrderService mqOrderService;    @Override
    public void onMessage(Message message) {        if (null == message.getBody()) {
            log.error("rabbitmq订单取消msg.getBody()为空");            return;
        }  
        try {
            String content = new String(message.getBody());
            log.info("rabbitmq订单取消消息beg:{}", content);
            JSONObject jsonObject = JSONObject.parseObject(content);
            Object object = jsonObject.get("Properties");
           
            log.info("rabbitmq订单取消end,content="+new String(message.getBody()));
        } catch (Exception e) {
            log.error("rabbitmq订单取消error,msg=[{}]",message.toString(),e);         
            return;
        }        return;

    }
}

3.SpringBoot中rabbbitmq配置

#配置mq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#虚拟host可以不进行设置spring.rabbitmq.virtual-host=/
#设置回调确认#spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated

4.Springboot中声明交换机和队列

@Configurationpublic class TopicRabbitConfig {    //绑定键
    public final static String man = "topic.order";//0
    public final static String woman = "topic.cancel.order";//1
    public final static String order = "buriedPoint.ordered";//2
    public final static String pay = "PaySuccessWebOrder";//3
    public final static String enterClass = "enterClass";//4
    public final static String exitClass = "exitClass";//5
    public final static String buriedPoint = "buriedPoint.orderCanceld";//6
    public final static String testPoint = "test.Point";//6
    /*消息确认队列*/
   public final static  String CONFIRM="topic.messages";    @Bean
    public Queue confirmQueue() {        return new Queue(TopicRabbitConfig.CONFIRM);
    }    @Bean
    public Queue firstQueue() {        return new Queue(TopicRabbitConfig.man);
    }    @Bean
    public Queue secondQueue() {        return new Queue(TopicRabbitConfig.woman);
    }    @Bean
    public Queue Queue3() {        return new Queue(TopicRabbitConfig.order);
    }    @Bean
    public Queue QueueEnterClass() {        return new Queue(TopicRabbitConfig.enterClass);
    }    @Bean
    public Queue QueueExitClass() {        return new Queue(TopicRabbitConfig.exitClass);
    }    @Bean
    public Queue QueuePay() {        return new Queue(TopicRabbitConfig.pay);
    }    @Bean
    public Queue QueueBuried() {        return new Queue(TopicRabbitConfig.buriedPoint);
    }    @Bean
    TopicExchange exchange() {        return new TopicExchange("topicExchange");
    }    @Bean
    TopicExchange exchange2() {        return new TopicExchange("topicExchange2");
    }    //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
    // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
    @Bean
    Binding bindingExchangeMessage2() {        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }    //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
    //这样只要是消息携带的路由键是topic.man,才会分发到该队列
    @Bean
    Binding bindingExchangeMessage3() {        return BindingBuilder.bind(Queue3()).to(exchange()).with(order);
    }   //新增
   @Bean
   Binding bindingExchangeMessage4() {       return BindingBuilder.bind(firstQueue()).to(exchange()).with(pay);
   }    @Bean
    Binding bindingExchangeMessage5() {        return BindingBuilder.bind(QueueEnterClass()).to(exchange()).with(enterClass);
    }    @Bean
    Binding bindingExchangeMessage6() {        return BindingBuilder.bind(QueueExitClass()).to(exchange()).with(exitClass);
    }    @Bean
    Binding bindingExchangeMessage7() {        return BindingBuilder.bind(QueuePay()).to(exchange()).with(pay);
    }    @Bean
    Binding bindingExchangeMessage() {        return BindingBuilder.bind(QueueBuried()).to(exchange()).with(buriedPoint);
    }    @Bean
    Binding bindingExchangeMessage8() {        return BindingBuilder.bind(QueueBuried()).to(exchange2()).with(testPoint);
    }    //进行绑定确认queue
    @Bean
    Binding bindingConfirmExchange() {        return BindingBuilder.bind(confirmQueue()).to(exchange()).with(CONFIRM);
    }


}

5.写消息

@RestControllerpublic class SendMessageController {    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @GetMapping("/sendDirectMessage")    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);        return "ok";
    }    @GetMapping("/sendTopicMessage1")    public String sendTopicMessage1() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: M A N ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> manMap = new HashMap<>();
        manMap.put("messageId", messageId);
        manMap.put("messageData", messageData);
        manMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("passport_exchange", "topic.order", manMap);        return "ok";
    }    @GetMapping("/sendTopicMessage2")    public String sendTopicMessage2() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: woman is all ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> womanMap = new HashMap<>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", messageData);
        womanMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.cancel.order", womanMap);        return "ok";
    }    @GetMapping("/sendTopicMessage3")    public String sendTopicMessage3() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: woman is all ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> womanMap = new HashMap<>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", messageData);
        womanMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "regServiceTest.buriedPoint", womanMap);        return "ok";
    }    /**
     * @Author: 郭佳
     * @Params:
     * @Description:接收topic消息
     * @Return:
     * @Date:  2020-12-22 10:42
     */
    @GetMapping("/sendTopic")    public String sendTopic(String topic) {
        String messageId = String.valueOf(UUID.randomUUID());//        String messageData = "message: woman is all ";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> womanMap = new HashMap<>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", topic);
        womanMap.put("createTime", createTime);
        womanMap.put("msg", "这是个测试");        //TestDirectExchange topicExchange
        rabbitTemplate.convertAndSend("TestFanoutExchange", topic, womanMap);//        rabbitTemplate.convertAndSend("topicExchange2", topic, womanMap);

        return "ok";
    }

}

5.读消息

@Componentpublic class DirectReceiver {   //消息确认消费
    @RabbitListener(queues = "topic.messages")//    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver2  : " + message);
    }    //    @RabbitHandler
    @RabbitListener(queues = "TestFanoutQueue")//监听的队列名称 TestDirectQueue
    public void processFanout(Map testMessage) {
        System.out.println("FanoutReceiver消费者收到消息  : " + testMessage.toString());
    }    //    @RabbitHandler
    @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消费者收到消息  : " + testMessage.toString());
    }    @RabbitListener(queues = "topic.man")    public void receiveMessage2(Map testMessage) {
        System.out.println("我是监听topic.man 的,满足 topic.# 的都过来 , " + testMessage.toString());
    }    @RabbitListener(queues = "topic.woman")    public void receiveMessage3(Map testMessage) {
        System.out.println("我是监听topic.woman 的,满足 topic.# 的都过来 , " + testMessage.toString());
    }
}



原文始发于微信公众号(Java时间屋):Spring和Springboot中配置rabbitMq配置

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/251934.html

(0)
java小白的头像java小白

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!