《RabbitMQ系列教程-第六章-SpringBoot整合RabbitMQ》

追求适度,才能走向成功;人在顶峰,迈步就是下坡;身在低谷,抬足既是登高;弦,绷得太紧会断;人,思虑过度会疯;水至清无鱼,人至真无友,山至高无树;适度,不是中庸,而是一种明智的生活态度。

导读:本篇文章讲解 《RabbitMQ系列教程-第六章-SpringBoot整合RabbitMQ》,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

教程说明



第六章 SpringBoot整合RabbitMQ

6.1 搭建消息生产者

6.1.1 pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lscl</groupId>
    <artifactId>05_springboot_rabbitmq_producer</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <!-- springboot 工程-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>

    <dependencies>
        <!-- rabbitmq场景启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- 测试环境场景启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

6.1.2 application.yml:

spring:
  rabbitmq:
    host: 192.168.40.139
    port: 5672
    username: lscl
    password: admin
    virtual-host: /lscl

6.1.3 引导类:

package com.lscl.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class);
    }
}

6.1.4 配置消息队列:

6.1.4.1 Work模式:

SpringBoot中提供有两种构建Queue的方式,一种是通过new Queue()的方式,另一种采用QueueBuilder构建器构建一个Queue

package com.lscl.rabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class Config_01_Work {

    /**
     * 定义一个队列
     * @return
     */
    @Bean("bootWorkQueue")
    public Queue bootWorkQueue() {
        /*
            第一种方式:
                durable():代表需要持久化
                exclusive(): 代表该队列独占(只允许有一个consumer监听)
                autoDelete(): 代表需要自动删除(没有consumer自动删除)
                withArgument(): 队列的其他参数

         */
//        return QueueBuilder.durable("boot_work_queue").exclusive().autoDelete().withArgument("key", "val").build();
        /*
            第二种方式:
                通过new Queue对象来创建队列
                参数1: 队列名称
                参数2: 是否持久化(默认:true)
                参数3: 是否独占(默认:false)
                参数4: 是否自动删除(默认:false)
                参数5: 队列的其他参数
         */
        return new Queue("boot_work_queue", true, false, false,null);
    }
}

6.1.4.2 Pub/Sub模式:

package com.lscl.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Pub/Sub模式(交换机类型为Fanout)
 */

@Configuration
public class Config_03_Fanout {

    @Bean("bootFanoutQueue1")
    public Queue bootFanoutQueue1(){
        return QueueBuilder.durable("boot_fanout_queue1").build();
    }

    @Bean("bootFanoutQueue2")
    public Queue bootFanoutQueue2(){
        return QueueBuilder.durable("boot_fanout_queue2").build();
    }

    // fanout类型交换机
    @Bean("bootFanoutExchange")
    public Exchange bootFanoutExchange(){
        return ExchangeBuilder.fanoutExchange("boot_fanout_exchange").durable(true).build();
    }

    /**
     * 交换机与队列进行绑定
     */
    @Bean
    public Binding bindFanout1(){

        Queue bootFanoutQueue1 = bootFanoutQueue1();
        Exchange bootFanoutExchange = bootFanoutExchange();

        // fanout类型交换机routing key为 ""
        return BindingBuilder.bind(bootFanoutQueue1).to(bootFanoutExchange).with("").noargs();
    }

    /**
     * 交换机与队列进行绑定
     * @return
     */
    @Bean
    public Binding bindFanout2(){
        Queue bootFanoutQueue2 = bootFanoutQueue2();
        Exchange bootFanoutExchange = bootFanoutExchange();

        return BindingBuilder.bind(bootFanoutQueue2).to(bootFanoutExchange).with("").noargs();
    }

}

6.1.4.3 Routing模式:

package com.lscl.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * routing模式(交换机类型为Direct)
 */
@Configuration
public class Config_02_Direct {

    /**
     * 准备两个队列boot_direct_queue1、boot_direct_queue2
     * @return
     */
    @Bean("bootDirectQueue1")
    public Queue bootDirectQueue1(){
        return QueueBuilder.durable("boot_direct_queue1").build();
    }

    @Bean("bootDirectQueue2")
    public Queue bootDirectQueue2(){
        return QueueBuilder.durable("boot_direct_queue2").build();
    }

    // direct类型交换机
    @Bean("bootDirectExchange")
    public Exchange bootDirectExchange(){
        /*
            第一种方式: 通过ExchangeBuilder构建交换机
                durable: 是否持久化
                autoDelete: 是否自动删除
                withArgument: 交换机其他参数
         */
//        return ExchangeBuilder.directExchange("boot_direct_exchange").durable(true).autoDelete().withArgument("key","val").build();

        /*
            第二种方式:
                参数1: 是否持久化(默认false)
                参数2: 是否自动删除(默认false)
                参数3: 其他参数
         */
        return new DirectExchange("boot_direct_exchange",true,false,null);
    }

    /**
     * 交换机与队列进行绑定
     */
    @Bean
    public Binding bindDirect1(){

        Queue bootDirectQueue1 = bootDirectQueue1();
        Exchange bootDirectExchange = bootDirectExchange();


        /*
            第一种方式:
                bind(Queue): 需要绑定的queue
                to(Exchange): 需要绑定到哪个交换机
                with(String): routing key
                noargs(): 进行构建
         */
//        return BindingBuilder.bind(bootDirectQueue1).to(bootDirectExchange).with("article").noargs();

        /*
            第一种方式:
                参数1: 绑定的队列
                参数2: 绑定的类型 Binding.DestinationType.QUEUE: 绑定的类型为queue(交换机不仅可以绑定queue还可以绑定exchange)
                参数3: 哪个交换机需要绑定
                参数4: routing key
                参数5: 其他参数
         */
         return new Binding("boot_direct_queue1", Binding.DestinationType.QUEUE,"boot_direct_exchange","red",null);
    }

    /**
     * 交换机与队列进行绑定
     * @return
     */
    @Bean
    public Binding bindDirect2(){
        Queue bootDirectQueue2 = bootDirectQueue2();
        Exchange bootDirectExchange = bootDirectExchange();

        return BindingBuilder.bind(bootDirectQueue2).to(bootDirectExchange).with("green").noargs();
    }
}

6.1.4.4 Topics模式

package com.lscl.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Topics模式(交换机类型为Topic)
 */
@Configuration
public class Config_04_Topic {

    @Bean("bootTopicQueue1")
    public Queue bootTopicQueue1(){
        return QueueBuilder.durable("boot_topic_queue1").build();
    }

    @Bean("bootTopicQueue2")
    public Queue bootTopicQueue2(){
        return QueueBuilder.durable("boot_topic_queue2").build();
    }

    // topic类型交换机
    @Bean("bootTopicExchange")
    public Exchange bootTopicExchange(){

        return ExchangeBuilder.topicExchange("boot_topic_exchange").durable(true).build();
    }

    /**
     * 交换机与队列进行绑定
     */
    @Bean
    public Binding bindTopic1(){

        Queue bootTopicQueue1 = bootTopicQueue1();
        Exchange bootTopicExchange = bootTopicExchange();

        return BindingBuilder.bind(bootTopicQueue1).to(bootTopicExchange).with("red.#").noargs();
    }

    /**
     * 交换机与队列进行绑定
     * @return
     */
    @Bean
    public Binding bindTopic2(){
        Queue bootTopicQueue2 = bootTopicQueue2();
        Exchange bootTopicExchange = bootTopicExchange();
        return BindingBuilder.bind(bootTopicQueue2).to(bootTopicExchange).with("green.*").noargs();
    }
}

6.1.5 测试类:

package com.lscl.rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = ProducerApplication.class)
@RunWith(SpringRunner.class)
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *  work 模式
     * @throws Exception
     */
    @Test
    public void testWork() throws Exception{
        rabbitTemplate.convertAndSend("boot_work_queue","boot work...");
    }

    /**
     * Routing 模式(交换机类型为Direct)
     * @throws Exception
     */
    @Test
    public void testDirect() throws Exception{
        rabbitTemplate.convertAndSend("boot_direct_exchange","red","boot direct red...");
        rabbitTemplate.convertAndSend("boot_direct_exchange","green","boot direct green...");
    }

    /**
     * Pub/Sub 模式(交换机类型为Fanout)
     * @throws Exception
     */
    @Test
    public void testFanout() throws Exception{
        rabbitTemplate.convertAndSend("boot_fanout_exchange","","boot fanout....");
    }

    /**
     * Topics模式(交换机类型为Topic)
     * @throws Exception
     */
    @Test
    public void tesTopic() throws Exception{
        rabbitTemplate.convertAndSend("boot_topic_exchange","red.xxx.xxx","boot topic red.# ...");
        rabbitTemplate.convertAndSend("boot_topic_exchange","green.xxx","boot topic green.* ....");
    }
}

6.2 搭建消息消费者

6.2.1 pom.xml:

和生产者依赖一致

6.2.2 application.yml:

和生产者一致

6.2.3 引导类:

package com.lscl.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class);
    }
}

6.2.4 监听器:

6.2.4.1 Work 监听:

package com.lscl.rabbitmq.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Listener_01_Work {

    /**
     * 监听boot_work_queue队列的消息
     * @param message
     */
    @RabbitListener(queues = "boot_work_queue")
    public void receive(Message message){
        System.out.println("boot_work_queue: "+new String(message.getBody()));
    }
}

6.2.4.2 Direct 监听:

package com.lscl.rabbitmq.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Listener_02_Direct {

    @RabbitListener(queues = "boot_direct_queue1")
    public void boot_direct_queue1(Message message){
        System.out.println("boot_direct_queue1: "+new String(message.getBody()));
    }

    @RabbitListener(queues = "boot_direct_queue2")
    public void boot_direct_queue2(Message message){
        System.out.println("boot_direct_queue2: "+new String(message.getBody()));
    }

}

6.2.4.3 其他监听

其他几个监听对象的代码都是一致的,只要把监听的队列名换了即可

在这里插入图片描述

6.3 配置Header类型

6.3.1 生产者

6.3.1.1 声明交换机绑定Queue

package com.lscl.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * header模式
 */
@Configuration
public class Config_05_Header {
    // 声明一个queue
    @Bean("bootHeaderQueue")
    public Queue bootHeaderQueue(){
        return QueueBuilder.durable("boot_header_queue").build();
    }

    // header类型交换机
    @Bean("bootHeaderExchange")
    public Exchange bootHeaderExchange(){

        return ExchangeBuilder.headersExchange("boot_header_exchange").durable(true).build();
    }

    // 将exchange和queue进行绑定
    @Bean
    public Binding bindHeader(){

        Queue queue = bootHeaderQueue();
        Exchange exchange = bootHeaderExchange();

        Map<String, Object> headers = new HashMap<>();
        /*
            all:Producer必须匹配所有的键值对
            any:只要Producer匹配任意一个键值对即可
         */
            headers.put("x-match", "any");
            headers.put("key1", "147");
            headers.put("key2", "258");
            headers.put("key3", "369");

        // routing key 为空
        return BindingBuilder.bind(queue).to(exchange).with("").and(headers);
    }
}

6.3.1.2 测试类:

@Autowired
private RabbitMessagingTemplate rabbitMessagingTemplate;

/**
 * header模式
 *
 * @throws Exception
 */
@Test
public void tesHeader() throws Exception {

    // 准备header参数
    Map<String, Object> headers = new HashMap<>();
    headers.put("key1", "147");
    headers.put("key2", "258");
    headers.put("key3", "369");

    // 使用的是rabbitMessagingTemplate 而不是 rabbitTemplate
    rabbitMessagingTemplate.convertAndSend("boot_header_exchange", "", "boot header....", headers);
}

6.3.2 消费者

6.3.2.1 监听器

package com.lscl.rabbitmq.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Listener_05_Header {

    @RabbitListener(queues = "boot_header_queue")
    public void boot_topic_queue1(Message message) {
        System.out.println("boot_header_queue: " + new String(message.getBody()));
    }

}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/131798.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!