Springboot整合RocketMQ实现消息过滤功能

导读:本篇文章讲解 Springboot整合RocketMQ实现消息过滤功能,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

环境:springboot2.3.9RELEASE + RocketMQ4.8.0


先阅读如下两篇文章

SpringBoot整合RocketMQ入门示例

 

SpringBoot整合RocketMQ事务/广播/顺序消息

依赖及配置

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>
server:
  port: 8082
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: demo-mq

RocketMQ有两种过滤消息方式:TAG和SQL表达式。

方式1:根据TAG过滤消息

消息发送端只能设置一个tag,消息接收端可以设置多个tag。

  • 消息接收端

接收消息端通过 ‘||’ 设置多个tag,如下:

tag1 || tag2 || tag3 || ...
@RocketMQMessageListener(topic = "filter-topic", consumerGroup = "consumer06-group", 
  selectorExpression = "tag11 || tag12 || tag13", messageModel = MessageModel.CLUSTERING, 
  selectorType = SelectorType.TAG)
@Component
public class ConsumerFilterMessageListener implements RocketMQListener<MessageExt> {

  @Override
  public void onMessage(MessageExt message) {
    System.out.println(Thread.currentThread().getName()) ;
    System.out.println(new String(message.getBody())) ;
    System.out.println(message.getProperties()) ;
	}

}

selectorType:指明了消息选择通过tag的方式,默认值SelectorType.TAG。

messageModel:指明了消息消费模式,默认值MessageModel.CLUSTERING每条消息只能有一个消费端进行消费;MessageModel.BROADCASTING广播消息,所有订阅者都能收到消息。

selectorExpression:指明了能够接收哪些tag,多个tag通过 ‘||’ 或方式。

  • 消息发送端

消息发送端在设置消息目的地时通过topic与tag拼接方式。

public void sendFilterMessage(String topic, String message, String tags) {
  Message<String> msg = MessageBuilder.withPayload(message).build() ;
  rocketMQTemplate.convertAndSend(topic + ":" + tags, msg);
}

格式:topic + ‘:’ + tag

注意:这里的tag只能设置一个。

  • 测试

@GetMapping("/filter")
public Object sendFilterMessage(String content) {
  ps.sendFilterMessage("filter-topic", content, "tag12") ;
  return "send filter message success" ;
}

发送消息时设置了tag为:tag12;那接收消息端只有设置了selectorExpression值中包含了tag12的才能接收消息。

Springboot整合RocketMQ实现消息过滤功能

 

根据上面的配置能够收到消息。当发送时将tag修改其它值,消费端是不接收不到消息的。

方式2:根据SQL表达式过滤消息

SQL表达式方式可以根据发送消息时输入的属性进行一些计算。在RocketMQ定义的语法下,可以实现一些有趣的逻辑。如下:

图片

 

RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。

  1. 数字比较,如>,>=,<,<=,BETWEEN,=;

  2. 字符比较,如:=,<>,IN;

  3. IS NULL or IS NOT NULL;

  4. 逻辑运算符:AND, OR, NOT;

常量类型:

  1. 数值,如:123, 3.1415;

  2. 字符, 如:‘abc’, 必须使用单引号;

  3. NULL,特殊常量

  4. Boolean, TRUE or FALSE;

  • 消费端

@RocketMQMessageListener(topic = "filter-topic", consumerGroup = "consumer06-group", 
  selectorExpression = "pack = 'abc' || a = 1", messageModel = MessageModel.CLUSTERING, 
  selectorType = SelectorType.SQL92)
@Component
public class ConsumerFilterMessageListener implements RocketMQListener<MessageExt> {

  @Override
  public void onMessage(MessageExt message) {
    System.out.println(Thread.currentThread().getName()) ;
    System.out.println(new String(message.getBody())) ;
    System.out.println(message.getProperties()) ;
  }

}

selectorExpression:指明了只能接收消息属性(header)中pack=abc或者a=1的消息。

selectorType:指明了消息过滤使用SQL92方式。

  • 消息发送端

public void sendFilterMessage(String topic, String message, String tags) {
  Message<String> msg = MessageBuilder.withPayload(message).build() ;
  Map<String, Object> headers = new HashMap<>() ;
  headers.put("pack", "abc") ;
  headers.put("a", 10) ;
  rocketMQTemplate.convertAndSend(topic, msg, headers);
}

这里的Map就是用来设置消息的属性(header)信息。

  • 测试

@GetMapping("/filter")
public Object sendFilterMessage(String content) {
  ps.sendFilterMessage("filter-topic", content, null) ;
  return "send filter message success" ;
}

根据上面的配置:

图片

 

成功接收到消息;

修改消息消费端selectorExpression属性值,OR 改成 AND

@RocketMQMessageListener(topic = "filter-topic", consumerGroup = "consumer06-group", 
	selectorExpression = "pack = 'abc' AND a = 1", messageModel = MessageModel.CLUSTERING, 
	selectorType = SelectorType.SQL92)
@Component
public class ConsumerFilterMessageListener implements RocketMQListener<MessageExt> {

再次测试,消息没有接收到

图片

 

以上就是RocketMQ消息过滤的两种方式。

完毕!!!

给个关注+转发呗谢谢

图片

 

图片

 

图片

 

图片

 

图片

Springboot整合RocketMQ实现消息过滤功能

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/80035.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!