springboot整合activemq

导读:本篇文章讲解 springboot整合activemq,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

一. 简介

           springboot提供了对activemq的集成,只需要几个简单的注解就可以使用,方便。

           该案例可以同时使用queue和topic, 测试无问题。有疑问欢迎留言,必回复。

           activemq的环境搭建可以参考我的另一篇文章: linux下安装activemq

           源码已上传到github上,路径为: https://github.com/1956025812/activemqdemo 

           项目down下来运行的时候记得要改mq连接信息哈~

 

二. 整合步骤

 

2.1  pom.xml

            需要引入:spring-boot-starter-activemq和activemq-pool和fastjson

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.yzx</groupId>
    <artifactId>activemqdemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>activemqdemo</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- activemq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <!-- activemq -->

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

2.2 application.yml

server:
  port: 8080

spring:
  activemq:
    broker-url: tcp://IP:61616
    user: admin
    password: admin
    pool:
      enabled: true
    packages:
      trust-all: true   # 如果使用ObjectMessage传输对象,必须要加上这个信任包,否则会报ClassNotFound异常
  jms:
    pub-sub-domain: true  # 启动主题消息

2.3 ActiveMqConfig

         注: 如果需要同时使用queue和topic,则需要引用该配置中指定的containerFactory.

package com.yzx.activemqdemo.demo1;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

@Configuration
public class ActiveMqConfig {


    // queue模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }


    // topic模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

}

2.4  MqProducer

            注: 消息生产类,需要使用的时候直接注入即可。

                     目前支持:字符串, 字符串集合, 对象, 对象集合的队列和主题

package com.yzx.activemqdemo.demo1;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import java.io.Serializable;
import java.util.List;

@Service
public class MqProducer {


    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;


    /**
     * 发送字符串消息队列
     *
     * @param queueName 队列名称
     * @param message   字符串
     */
    public void sendStringQueue(String queueName, String message) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);
    }


    /**
     * 发送字符串集合消息队列
     *
     * @param queueName 队列名称
     * @param list      字符串集合
     */
    public void sendStringListQueue(String queueName, List<String> list) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list);
    }


    /**
     * 发送对象消息队列
     *
     * @param queueName 队列名称
     * @param obj       对象
     */
    public void sendObjQueue(String queueName, Serializable obj) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj);
    }


    /**
     * 发送对象集合消息队列
     *
     * @param queueName 队列名称
     * @param objList   对象集合
     */
    public void sendObjListQueue(String queueName, List<Serializable> objList) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList);
    }


    /**
     * 发送字符串消息主题
     *
     * @param topicName 主题名称
     * @param message   字符串
     */
    public void sendStringTopic(String topicName, String message) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message);
    }


    /**
     * 发送字符串集合消息主题
     *
     * @param topicName 主题名称
     * @param list      字符串集合
     */
    public void sendStringListTopic(String topicName, List<String> list) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list);
    }


    /**
     * 发送对象消息主题
     *
     * @param topicName 主题名称
     * @param obj       对象
     */
    public void sendObjTopic(String topicName, Serializable obj) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj);
    }


    /**
     * 发送对象集合消息主题
     *
     * @param topicName 主题名称
     * @param objList   对象集合
     */
    public void sendObjListTopic(String topicName, List<Serializable> objList) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList);
    }

}

2.5 QueueConsumer

          注: 队列消费类,只需要使用@JmsListener注解就可以指定监听的通道,方便。

package com.yzx.activemqdemo.demo1;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;

@Component
public class QueueConsumer {

    @JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveStringQueue(String msg) {
        System.out.println("接收到消息...." + msg);
    }


    @JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveStringListQueue(List<String> list) {
        System.out.println("接收到集合队列消息...." + list);
    }


    @JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {
        System.out.println("接收到对象队列消息...." + objectMessage.getObject());
    }


    @JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {
        System.out.println("接收到的对象队列消息..." + objectMessage.getObject());
    }


}

2.6 TopicConsumer

      注: A主题消费者,用来接收主题类消息。目前搭了俩个消费者,一个A,一个B。

2.6.1 ATopicConsumer

package com.yzx.activemqdemo.demo1;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;

@Component
public class ATopicConsumer {

    @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringTopic(String msg) {
        System.out.println("ATopicConsumer接收到消息...." + msg);
    }


    @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringListTopic(List<String> list) {
        System.out.println("ATopicConsumer接收到集合主题消息...." + list);
    }


    @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("ATopicConsumer接收到对象主题消息...." + objectMessage.getObject());
    }


    @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("ATopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());
    }

}

2.6.2 BTopicConsumer

package com.yzx.activemqdemo.demo1;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;

@Component
public class BTopicConsumer {

    @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringTopic(String msg) {
        System.out.println("BTopicConsumer接收到消息...." + msg);
    }


    @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringListTopic(List<String> list) {
        System.out.println("BTopicConsumer接收到集合主题消息...." + list);
    }


    @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("BTopicConsumer接收到对象主题消息...." + objectMessage.getObject());
    }


    @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("BTopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());
    }

}

2.7 User类

       注: 用来测试对象的类,复写了toString方法。必须实现Serializable接口。

package com.yzx.activemqdemo.demo1;

import java.io.Serializable;

public class User implements Serializable {

    private String id;
    private String name;
    private Integer age;

    public User() {
    }

    public User(String id, String name, Integer age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

2.8 启动类

package com.yzx.activemqdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ActivemqdemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ActivemqdemoApplication.class, args);
    }
}

2.9 单元测试类

package com.yzx.activemqdemo;

import com.yzx.activemqdemo.demo1.MqProducer;
import com.yzx.activemqdemo.demo1.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqdemoApplicationTests {

    @Autowired
    private MqProducer mqProducer;


    @Test
    public void testStringQueue() {

        for (int i = 1; i <= 100; i++) {
            System.out.println("第" + i + "次发送字符串队列消息");
            mqProducer.sendStringQueue("stringQueue", "消息:" + i);
        }
    }


    @Test
    public void testStringListQueue() {

        List<String> idList = new ArrayList<>();
        idList.add("id1");
        idList.add("id2");
        idList.add("id3");

        System.out.println("正在发送集合队列消息ing......");
        mqProducer.sendStringListQueue("stringListQueue", idList);
    }


    @Test
    public void testObjQueue() {

        System.out.println("正在发送对象队列消息......");
        mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20));
    }


    @Test
    public void testObjListQueue() {

        System.out.println("正在发送对象集合队列消息......");

        List<Serializable> userList = new ArrayList<>();
        userList.add(new User("1", "小明", 21));
        userList.add(new User("2", "小雪", 22));
        userList.add(new User("3", "小花", 23));

        mqProducer.sendObjListQueue("objListQueue", userList);
    }


    @Test
    public void testStringTopic() {

        for (int i = 1; i <= 100; i++) {
            System.out.println("第" + i + "次发送字符串主题消息");
            mqProducer.sendStringTopic("stringTopic", "消息:" + i);
        }
    }


    @Test
    public void testStringListTopic() {

        List<String> idList = new ArrayList<>();
        idList.add("id1");
        idList.add("id2");
        idList.add("id3");

        System.out.println("正在发送集合主题消息ing......");
        mqProducer.sendStringListTopic("stringListTopic", idList);
    }


    @Test
    public void testObjTopic() {

        System.out.println("正在发送对象主题消息......");
        mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20));
    }


    @Test
    public void testObjListTopic() {

        System.out.println("正在发送对象集合主题消息......");

        List<Serializable> userList = new ArrayList<>();
        userList.add(new User("1", "小明", 21));
        userList.add(new User("2", "小雪", 22));
        userList.add(new User("3", "小花", 23));

        mqProducer.sendObjListTopic("objListTopic", userList);
    }
}

2.10 最终目录结构

springboot整合activemq

三. 测试

3.1 测试队列

3.1.1 测试字符串队列

运行testStringQueue方法,输出内容如下:

第1次发送字符串队列消息
第2次发送字符串队列消息
接收到消息....消息:1
接收到消息....消息:2
第3次发送字符串队列消息
接收到消息....消息:3
.........
接收到消息....消息:99
第100次发送字符串队列消息
接收到消息....消息:100

3.1.2 测试字符串集合队列

运行testStringListQueue方法,输出内容如下:

正在发送集合队列消息ing......
接收到集合队列消息....[id1, id2, id3]

3.1.3 测试对象队列

运行testObjQueue方法,输出内容如下:

正在发送对象队列消息......
接收到对象队列消息....User{id='1', name='小明', age=20}

3.1.4 测试对象集合队列

运行testObjListQueue方法,输出内容如下:

正在发送对象集合队列消息......
接收到的对象队列消息...[User{id='1', name='小明', age=21}, User{id='2', name='小雪', age=22}, User{id='3', name='小花', age=23}]

3.2 测试主题

3.2.1 测试字符串主题

运行testStringTopic方法,输出内容如下:

第1次发送字符串主题消息
第2次发送字符串主题消息
ATopicConsumer接收到消息....消息:1
BTopicConsumer接收到消息....消息:1
第3次发送字符串主题消息
BTopicConsumer接收到消息....消息:2
ATopicConsumer接收到消息....消息:2
.......
第99次发送字符串主题消息
ATopicConsumer接收到消息....消息:98
BTopicConsumer接收到消息....消息:98
第100次发送字符串主题消息
ATopicConsumer接收到消息....消息:99
BTopicConsumer接收到消息....消息:99
BTopicConsumer接收到消息....消息:100
ATopicConsumer接收到消息....消息:100

3.2.2 测试字符串集合主题

运行testStringListTopic方法,输出内容如下:

正在发送集合主题消息ing......
ATopicConsumer接收到集合主题消息....[id1, id2, id3]
BTopicConsumer接收到集合主题消息....[id1, id2, id3]

3.2.3 测试对象主题

运行testObjTopic方法,输出内容如下:

正在发送对象主题消息......
ATopicConsumer接收到对象主题消息....User{id='1', name='小明', age=20}
BTopicConsumer接收到对象主题消息....User{id='1', name='小明', age=20}

3.2.4 测试对象集合主题

运行testObjListTopic方法,输出内容如下:

正在发送对象集合主题消息......
BTopicConsumer接收到的对象集合主题消息...[User{id='1', name='小明', age=21}, User{id='2', name='小雪', age=22}, User{id='3', name='小花', age=23}]
ATopicConsumer接收到的对象集合主题消息...[User{id='1', name='小明', age=21}, User{id='2', name='小雪', age=22}, User{id='3', name='小花', age=23}]

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/17793.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!