SpringBoot整合RocketMQ实录


SpringBoot整合RocketMQ实录


使用SpringBoot开发的小伙伴也很多,因此spring官方也集成了RocketMQ调用模板,在理解原生API调用的基础上使用会有助于开发效率的提升。

SpringBoot整合

这部分我们看下SpringBoot如何快速集成RocketMQ.

在使用SpringBoot的tarter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-bootstarter.2.0.4版本开发的代码,升级到目前最新的rocketmq-springboot-starter.2.1.1后,基本就用不了了。

需要注意

SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。具体所有的配置信息可以参见org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。

SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,这在使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指定。

最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。

apache有一个官方的rocketmq-spring示例,地址: https://Github.com/apache/rocketmq-spring.git以后如果版本更新了,可以参考下这个示例代码。

配置信息(生产者)

SpringBoot整合RocketMQ实录
image-20210412154703270

消费者的配置

消费者的配置都在注释中,@RocketMQMessageListener

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup",topic = "TestTopic",consumeMode = ConsumeMode.CONCURRENTLY)
public class SpringConsumer implements RocketMQListener<String{

    @Override
    public void onMessage(String s) {
        System.out.println("Recieved message :"+s);
    }
}
//RocketMQMessageListener
public @interface RocketMQMessageListener {
    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";

    String consumerGroup();

    String topic();

    SelectorType selectorType() default SelectorType.TAG;//过滤消息,支持TAG和SQL92

    String selectorExpression() default "*";

    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;//顺序消费和并发消费

    MessageModel messageModel() default MessageModel.CLUSTERING;//集群和广播

    int consumeThreadMax() default 64;

    long consumeTimeout() default 15L;

    String accessKey() default "${rocketmq.consumer.access-key:}";//acl权限控制

    String secretKey() default "
${rocketmq.consumer.secret-key:}";//acl权限控制

    boolean enableMsgTrace() default true;//消息轨迹

    String customizedTraceTopic() default "
${rocketmq.consumer.customized-trace-topic:}";//消息轨迹

    String nameServer() default "
${rocketmq.name-server:}";

    String accessChannel() default "
${rocketmq.access-channel:}";
}

消息体

SpringBoot的消息体为spring的消息对象,因此获取一些属性时会有些变化。

        //获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
        String tags = message.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
//RocketMQHeaders
public class RocketMQHeaders {
    public static final String PREFIX = "rocketmq_";
    public static final String KEYS = "KEYS";
    public static final String TAGS = "TAGS";
    public static final String TOPIC = "TOPIC";
    public static final String MESSAGE_ID = "MESSAGE_ID";
    public static final String BORN_TIMESTAMP = "BORN_TIMESTAMP";
    public static final String BORN_HOST = "BORN_HOST";
    public static final String FLAG = "FLAG";
    public static final String QUEUE_ID = "QUEUE_ID";
    public static final String SYS_FLAG = "SYS_FLAG";
    public static final String TRANSACTION_ID = "TRANSACTION_ID";

    public RocketMQHeaders() {
    }
}

多个事务处理的情况

需要新创建一个rocketMQtemplate

@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

将对应的事务处理逻辑对象指向该rocketMQtemplate

@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
public class MyTransactionListenerImpl implements RocketMQLocalTransactionListener 


原文始发于微信公众号(云户):SpringBoot整合RocketMQ实录

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/25812.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!