1. 消息生产者
1.1 maven 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.meta.rocketmq</groupId>
<artifactId>rocketmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-producer</name>
<description>RocketMQ-Producer for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<!-- Rocketmq -->
<rocketmq.version>2.2.1</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
1.2 启动类
这里给大家分享一个自定义个性化启动类。
package com.meta.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* @author gaoyang
* @date 2021-12-04 17:16
*/
@Slf4j
@SpringBootApplication
public class RocketmqProducerApplication {
public static void main(String[] args) throws UnknownHostException {
final ConfigurableApplicationContext context = new SpringApplicationBuilder()
.sources(RocketmqProducerApplication.class)
.run(args);
final Environment env = context.getEnvironment();
RocketmqProducerApplication.log.info("\n----------------------------------------------------------\n\t" +
"Application '{}' is running! ActiveProfiles is '{}', Access URLs:\n\t" +
"Local: \t\thttp://127.0.0.1:{}\n\t" +
"External: \thttp://{}:{}\n----------------------------------------------------------",
env.getProperty("spring.application.name"),
env.getProperty("spring.profiles.active"),
env.getProperty("server.port"),
InetAddress.getLocalHost().getHostAddress(),
env.getProperty("server.port"));
}
}
1.3 配置文件
我这里为了方便集群模式是单Master模式。
server.port=8090
server.servlet.context-path=/
spring.application.name=rocketmq-producer
spring.profiles.active=local
# rocketmq 的 nameserver 名称
rocketmq.name-server=192.168.31.247:9876
# 生产组名称
rocketmq.producer.group=producer_grp_01
rocketmq.producer.customized-trace-topic=topic_springboot_01
1.4 消息生产者
package com.meta.rocketmq.producer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author gaoyang
* @date 2021-12-05 20:43
*/
@Component
public class RocketmqProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息的实例
*
* @param topic
* @param msg
*/
public void sendMessage(String topic, String msg) {
rocketMQTemplate.convertAndSend(topic, msg);
}
}
2. 消息消费者
2.1 maven 依赖
与上面生产者一致
2.2 配置文件
server.port=8091
server.servlet.context-path=/
spring.application.name=rocketmq-consumer
spring.profiles.active=local
# rocketmq 的 nameserver 名称
rocketmq.name-server=192.168.31.247:9876
2.3 消息消费者
package com.meta.rocketmq.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author gaoyang
* @date 2021-12-04 17:16
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_grp_01", topic = "topic_springboot_01")
public class MyRocketmqListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 处理 broker 推送过来的消息
log.info("Received message: {}", message);
}
}
SpringBoot集成RocketMQ,消费者部分的核心就在这个@RocketMQMessageListener注解上。所有消费者的核心功能也都会集成到这个注解中。所以我们还要注意下这个注解里面的属性:
例如:消息过滤可以由里面的selectorType属性和selectorExpression来定制
消息有序消费还是并发消费则由consumeMode属性定制。
消费者是集群部署还是广播部署由messageModel属性定制。
3. Controller
package com.meta.rocketmq.controller;
import com.meta.rocketmq.producer.RocketmqProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author gaoyang
* @date 2021-12-05 21:00
*/
@RestController
@RequestMapping("/mq/test")
public class RocketmqController {
@Value("${rocketmq.producer.customized-trace-topic}")
private String topic;
@Resource
private RocketmqProducer rocketmqProducer;
@RequestMapping("/send/message")
public String sendMessage(@RequestParam(name = "message", required = true) String message) {
rocketmqProducer.sendMessage(topic, message);
return "消息发送完成";
}
}
4. 分别启动生产者和消费者测试
4.1 发送消息
4.2 消费者消费消息
5. 源码地址
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/5416.html