SpringBoot整合RocketMQ

不管现实多么惨不忍睹,都要持之以恒地相信,这只是黎明前短暂的黑暗而已。不要惶恐眼前的难关迈不过去,不要担心此刻的付出没有回报,别再花时间等待天降好运。真诚做人,努力做事!你想要的,岁月都会给你。SpringBoot整合RocketMQ,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

SpringBoot 整合 RocketMQ

SpringBoot 提供了快捷操作 RocketMQ 的 RocketMQTemplate 对象。

1、引入依赖

注意依赖的版本需要和 RocketMQ 的版本相同。

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

2、编写配置文件

# 应用名称
spring.application.name=rocketmq-demo
# 应⽤服务 WEB 访问端⼝
server.port=8080
# nameserver地址
rocketmq.name-server=192.168.194.134:9876
# 配置⽣产者组
rocketmq.producer.group=producer-demo-group1

3、编写生产者发送普通消息

@Component
public class MyProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送消息
     *
     * @param topic   主题
     * @param message 消息
     */
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
    }
}

4、编写单元测试发送消息

@SpringBootTest
class MyProducerTest {

    @Autowired
    private MyProducer myProducer;

    @Test
    void testSendMessage() {
        String topic = "my-boot-topic";
        String message = "hello rocketmq springboot message!";
        myProducer.sendMessage(topic, message);
        System.out.println("消息发送成功!");
    }
}

报错:sendDefaultImpl 呼叫超时

Caused by: org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:717)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1426)
	at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:370)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:688)

原因:启动 namesrv,broke 没有指定 ip 或者是没开启。

配置错了,配置文件中写的是服务器 1 的地址,没有开启 broker,将配置改成指定服务器 2 的 ip地址 即可。

在这里插入图片描述

运行成功后,可视化控制台:

在这里插入图片描述

5、创建消费者程序

新建一个模块,在里面编写消费者代码

  1. 引入依赖

  2. 编写配置文件

    # 应用名称
    spring.application.name=my-boot-consumer-demo
    # 应⽤服务 WEB 访问端⼝
    server.port=8081
    # nameserver地址
    rocketmq.name-server=192.168.194.134:9876
    
  3. 编写消费者类

    @Component
    @RocketMQMessageListener(consumerGroup = "my-boot-consumer-group1",topic = "my-boot-topic")
    public class MyConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            System.out.println("收到的消息:" + message);
        }
    }
    

运行结果:

在这里插入图片描述

6、发送事务消息

  1. 在生产者类里编写方法

    /**
     * 发送消息事务
     *
     * @param topic 主题
     * @param msg   消息
     * @throws InterruptedException 中断异常
     */
    public void sendMessageInTransaction(String topic, String msg) throws InterruptedException {
        String[] tags = new String[]{"TagA","TagB","TagC","TagD","TagE"};
        for (int i = 0; i < 10; i++) {
            // 创建一个消息对象,并通过调用withPayload()方法,向消息对象中添加了一个负载,即要发送的字符串类型的数据
            Message<String> message = MessageBuilder.withPayload(msg).build();
            // topic 和 tag 整合成一个字符串
            String destination = topic+":"+tags[ i % tags.length];
            // 第一个destination是消息要发送的目的地topic,第二个destination是消息携带的业务数据
            TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
            System.out.println(sendResult);
            // 暂停10毫秒,以模拟消息的发送和处理过程所需要的时间
            Thread.sleep(10);
        }
    }
    
  2. 重写事务监听器

    @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
    public class MyTransactionListener implements RocketMQLocalTransactionListener {
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
            String destination = (String) arg;
            // 把spring的message转换成RocketMQ的message
            org.apache.rocketmq.common.message.Message message1 = RocketMQUtil.convertToRocketMessage(
                    new StringMessageConverter(),
                    "utf-8",
                    destination,
                    message
            );
            // 获取message1上的tag对内容
            String tags = message1.getTags();
            if (StringUtils.contains(tags, "TagA")) {
                return RocketMQLocalTransactionState.COMMIT; // 返回提交事务状态,表示允许消费者消费该消息
            }else if(StringUtils.contains(tags, "TagB")) {
                return RocketMQLocalTransactionState.ROLLBACK; // 返回回滚事务状态,表示该消息将被删除,不允许消费
            }else {
                return RocketMQLocalTransactionState.UNKNOWN; // 返回中间状态,表示需要回查才能确定状态
            }
        }
    
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            return null;
        }
    }
    

7、编写单元测试发送事务消息

@Test
void testSendMessageInTransaction() throws InterruptedException {
    String topic = "my-boot-topic";
    String message = "hello rocketmq transaction springboot message";
    myProducer.sendMessageInTransaction(topic, message);
    System.out.println("事务消息发送成功");
}

运行结果:

SendResult [sendStatus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC2428317E70000, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-b, queueId=2], queueOffset=34]
SendResult [sendStatus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC24283F95B0003, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-b, queueId=3], queueOffset=35]
SendResult [sendStatus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC242856FB20006, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-a, queueId=0], queueOffset=66]
SendResult [sendStatus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC2428573320009, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-a, queueId=1], queueOffset=67]
SendResult [sendStatus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC242857FAC000C, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-a, queueId=2], queueOffset=68]
SendResult [sendStatus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC242857FC5000F, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-a, queueId=3], queueOffset=69]
SendResult [sendStatus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC242857FDB0012, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-b, queueId=0], queueOffset=36]
SendResult [sendStatus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC2428580A10015, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-b, queueId=1], queueOffset=37]
SendResult [sendStatus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC24285810B0018, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-b, queueId=2], queueOffset=38]
SendResult [sendStatus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC242858133001B, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-b, queueId=3], queueOffset=39]
事务消息发送成功

atus=SEND_OK, msgId=240884599C20078554A5D7CC84F1B9A74C4018B4AAC242858133001B, offsetMsgId=null, messageQueue=MessageQueue [topic=my-transaction-topic, brokerName=broker-b, queueId=3], queueOffset=39]
事务消息发送成功


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/189743.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之家——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!