RocketMQ 实战-SpringBoot整合RocketMQ

导读:本篇文章讲解 RocketMQ 实战-SpringBoot整合RocketMQ,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1. 消息生产者

1.1 maven 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.meta.rocketmq</groupId>
    <artifactId>rocketmq-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <name>rocketmq-producer</name>
    <description>RocketMQ-Producer for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>

        <!-- Rocketmq -->
        <rocketmq.version>2.2.1</rocketmq.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

1.2 启动类

这里给大家分享一个自定义个性化启动类。

package com.meta.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * @author gaoyang
 * @date 2021-12-04 17:16
 */
@Slf4j
@SpringBootApplication
public class RocketmqProducerApplication {

    public static void main(String[] args) throws UnknownHostException {
        final ConfigurableApplicationContext context = new SpringApplicationBuilder()
                .sources(RocketmqProducerApplication.class)
                .run(args);

        final Environment env = context.getEnvironment();
        RocketmqProducerApplication.log.info("\n----------------------------------------------------------\n\t" +
                        "Application '{}' is running! ActiveProfiles is '{}', Access URLs:\n\t" +
                        "Local: \t\thttp://127.0.0.1:{}\n\t" +
                        "External: \thttp://{}:{}\n----------------------------------------------------------",
                env.getProperty("spring.application.name"),
                env.getProperty("spring.profiles.active"),
                env.getProperty("server.port"),
                InetAddress.getLocalHost().getHostAddress(),
                env.getProperty("server.port"));
    }
}

1.3 配置文件

我这里为了方便集群模式是单Master模式。

server.port=8090
server.servlet.context-path=/

spring.application.name=rocketmq-producer
spring.profiles.active=local

# rocketmq 的 nameserver 名称
rocketmq.name-server=192.168.31.247:9876
# 生产组名称
rocketmq.producer.group=producer_grp_01
rocketmq.producer.customized-trace-topic=topic_springboot_01

1.4 消息生产者

package com.meta.rocketmq.producer;

import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author gaoyang
 * @date 2021-12-05 20:43
 */
@Component
public class RocketmqProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送消息的实例
     *
     * @param topic
     * @param msg
     */
    public void sendMessage(String topic, String msg) {
        rocketMQTemplate.convertAndSend(topic, msg);
    }
}

2. 消息消费者

2.1 maven 依赖

与上面生产者一致

2.2 配置文件

server.port=8091
server.servlet.context-path=/

spring.application.name=rocketmq-consumer
spring.profiles.active=local

  # rocketmq 的 nameserver 名称
rocketmq.name-server=192.168.31.247:9876

2.3 消息消费者

package com.meta.rocketmq.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * @author gaoyang
 * @date 2021-12-04 17:16
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_grp_01", topic = "topic_springboot_01")
public class MyRocketmqListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理 broker 推送过来的消息
        log.info("Received message: {}", message);
    }
}

SpringBoot集成RocketMQ,消费者部分的核心就在这个@RocketMQMessageListener注解上。所有消费者的核心功能也都会集成到这个注解中。所以我们还要注意下这个注解里面的属性:

例如:消息过滤可以由里面的selectorType属性和selectorExpression来定制

消息有序消费还是并发消费则由consumeMode属性定制。

消费者是集群部署还是广播部署由messageModel属性定制。

3. Controller

package com.meta.rocketmq.controller;

import com.meta.rocketmq.producer.RocketmqProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author gaoyang
 * @date 2021-12-05 21:00
 */
@RestController
@RequestMapping("/mq/test")
public class RocketmqController {

    @Value("${rocketmq.producer.customized-trace-topic}")
    private String topic;

    @Resource
    private RocketmqProducer rocketmqProducer;

    @RequestMapping("/send/message")
    public String sendMessage(@RequestParam(name = "message", required = true) String message) {
        rocketmqProducer.sendMessage(topic, message);

        return "消息发送完成";
    }
}

4. 分别启动生产者和消费者测试

4.1 发送消息

在这里插入图片描述

4.2 消费者消费消息

在这里插入图片描述

5. 源码地址

https://gitee.com/king-high/rocketmq-demo.git

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/5416.html

(0)
小半的头像小半

相关推荐

极客之家——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!