Spring Cloud Stream应用RocketMQ

导读:本篇文章讲解 Spring Cloud Stream应用RocketMQ,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

What is Spring Cloud Stream?

  微服务中会经常使用消息中间件,通过消息中间件在服务与服务之间传递消息,例如RabbitMQ、Kafka和RocketMQ,无论使用哪一种消息中间件和服务之间都有一点耦合性,这个耦合性指的是原来使用RabbitMQ,现在要替换为RocketMQ,我们的微服务改动比较大,因为两款消息中间件有一些区别,使用Spring Cloud Stream来整合我们的消息中间件,这样就可以降低微服务和消息中间件的耦合性,做到轻松在不同消息中间件之间切换,然而Spring Cloud Stream官方整合了消息中间件,Spring Cloud Alibaba写了个starter可以支持RocketMQ。
  Spring Cloud Stream是一个构建消息驱动微服务的框架,Spring Cloud Stream解决了开发人员无感知的使用消息中间件的问题,因为Spring Cloud Stream对消息中间件的进一步封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为rocketmq或者kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程;

Spring Cloud Stream 重要概念:

在这里插入图片描述
  Spring Cloud Stream 内部有几个概念:Binder 、Binding、input、output;
1、Binder:跟外部消息中间件集成的组件,用来创建Binding,各消息中间件都有自己的Binder实现;
例如:Kafka 的实现 KafkaMessageChannelBinder
   RabbitMQ 的实现 RabbitMessageChannelBinder
   RocketMQ 的实现 RocketMQMessageChannelBinder;
2、Binding:包括InputBinding和OutputBinding
  Binding在消息中间件与应用程序提供的Provider和Consumer之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触;
3、input:应用程序通过input(相当于消费者consumer)与Spring Cloud Stream 中Binder交互,而Binder负责与消息中间件交互,因此,我们只需关注如何与Binder交互即可,而无需关注与具体消息中间件的交互。
4、output:output(相当于生产者producer)与Spring Cloud Stream中Binder交互;

组成 说明
Binder Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现;
@Input 该注解标识输入通道,通过该输入通道接收消息进入应用程序
@Output 该注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 将信道channel和exchange、topic绑定在一起

Spring Cloud Stream与RocketMQ应用:

1、创建项目添加依赖

	<properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.2.5.RELEASE</spring-boot.version>
        <spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--spring-cloud-starter-stream-rocketmq-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

2、application.properties配置文件

server.port=8090

########## RocketMQ 通用配置
# 客户端接入点,必填  rocketMQ的连接地址,binder高度抽象
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

# 日志级别
logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO


########## Produce Config
# output 的配置如下:bindings 具体生产消息、消费消息的桥梁
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.contentType=text/plain
spring.cloud.stream.bindings.output.group=test-group

########## Consumer Config
# input 的配置:
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.contentType=text/plain
spring.cloud.stream.bindings.input.group=test-group

3、消息的发送:

/**
 * @author 
 * @Description: 发送消息
 * @date 2021/1/26 0:49
 */
@Service
public class SenderService {

    //spring cloud stream里面发消息通过 Source 发送
    @Autowired
    private Source source;

    /**
     * 发送消息的方法
     * @param message
     */
    public void sendMessage(String message){
        boolean send = source.output().send(MessageBuilder.withPayload(message).build());
        System.out.println(send);
    }
}

4、接收消息:

/**
 * @author 
 * @Description: 接收消息
 * @date 2021/1/26 0:59
 */
@Service
public class ReceiveService {

    //spring cloud stream里面发消息通过 Sink 发送
    @Autowired
    private Sink sink;

    @StreamListener(value = Sink.INPUT)
    public void getListener(String message){
        System.out.println("test-group="+message);
    }
}

5、SpringBoot入口类

@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class MQApplication implements CommandLineRunner {

    @Autowired
    private SenderService senderService;

    public static void main(String[] args) {
        SpringApplication.run(MQApplication.class,args);
    }

    @Override
    public void run(String... args) throws Exception {
        senderService.sendMessage("hello rocketmq-srping-cloud-stream2");
    }
}

结果显示:
发送消息成功

Spring Cloud Stream自定义信道:

/**
 * @author 
 * @Description: 自定义Source
 * @date 2021/1/27 23:22
 */
public interface MySource {

    String OUTPUT1 = "output1";
    
    @Output(MySource.OUTPUT1)
    MessageChannel output1();

    String OUTPUT2 = "output2";

    @Output(MySource.OUTPUT2)
    MessageChannel output2();
}
/**
 * @author 
 * @Description: ${todo}
 * @date 2021/1/27 23:24
 */
public interface MySink {

    String INPUT1 = "input1";

    @Input(MySink.INPUT1)
    SubscribableChannel input1();

    String INPUT2= "input2";

    @Input(MySink.INPUT2)
    SubscribableChannel input2();
}
server.port=8090

########## RocketMQ 通用配置
# 客户端接入点,必填  rocketMQ的连接地址,binder高度抽象
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

# 日志级别
logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO


########## Produce Config
# output 的配置如下:bindings 具体生产消息、消费消息的桥梁
spring.cloud.stream.bindings.output1.destination=test-topic1
spring.cloud.stream.bindings.output1.content-type=text/plain
spring.cloud.stream.bindings.output1.group=test-group1

spring.cloud.stream.bindings.output2.destination=test-topic2
spring.cloud.stream.bindings.output2.content-type=text/plain
spring.cloud.stream.bindings.output2.group=test-group2

########## Consumer Config
# input 的配置:
spring.cloud.stream.bindings.input1.destination=test-topic1
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1

spring.cloud.stream.bindings.input2.destination=test-topic2
spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2

  发送消息指定不同的分组,或者多个接收消息指定相同组名称只有一个能接收到消息。

Spring Cloud Stream RocketMQ事务消息:

事务消息
事务消息
  上图说明事务消息方案,大致为两个流程:正常事务消息的发送及提交、事务消息的补偿流程;
Half(Prepare) Message:
  指的是暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
1、事务消息发送及提交:
(1)producer发送half message消息
(2)服务端响应消息写入结果
(3)根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行,需要将UNKNOW、commit、rollback告诉broker这是一个oneway消息,而且失败不重试)
(4)根据本地事务状态执行Commit或者Rollback(MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。)
2、补偿流程:
(5)对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”;
(6)Producer收到回查消息,检查回查消息对应的本地事务的状态;
(7)根据本地事务状态,重新Commit或者Rollback;
补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况;

事务消息一共有三种状态:提交状态、回滚状态、中间状态;
TransactionStatus.CommitTransaction: 提交事务,代表消费者可以消费此消息;
TransactionStatus.RollbackTransaction: 回滚事务,代表消息将被删除,不能被消费;
TransactionStatus.Unknown: 中间状态,代表需要检查消息队列来确定状态;

代码书写:

application.properties配置文件:

#--------------------------事务消息生产的配置-----------------------
spring.cloud.stream.bindings.outputTX.destination=TransactionTopic
spring.cloud.stream.bindings.outputTX.content-type=application/json
spring.cloud.stream.rocketmq.bindings.outputTX.producer.group=myTxProducerGroup

#是否为事务消息,默认为false表示不是事务消息,true表示是事务消息
spring.cloud.stream.rocketmq.bindings.outputTX.producer.transactional=true

#--------------------------事务消息消费的配置-----------------------
spring.cloud.stream.bindings.inputTX.destination=TransactionTopic
spring.cloud.stream.bindings.inputTX.content-type=text/plain
spring.cloud.stream.bindings.inputTX.group=transaction-group
spring.cloud.stream.rocketmq.bindings.inputTX.consumer.broadcasting=false

自定义事务通道:

/**
 * @author 
 * @Description: 自定义Source
 * @date 2021/1/27 23:22
 */
public interface MySource {

    String OUTPUTTX = "outputTX";
    
    @Output(MySource.OUTPUTTX)
    MessageChannel outputTX();
}
public interface MySink {

    String INPUTTX= "inputTX";

    @Input(MySink.INPUTTX)
    SubscribableChannel inputTX();
}

发送事务消息:

/**
 * @author 
 * @Description: 发送事务消息
 * @date 2021/1/28 0:31
 */
@Component
public class Sender {

    @Autowired
    private MySource mySource;

    /**
     * 发送事务消息
     * @param msg   消息内容
     * @param num   
     * @param <T>
     */
    public <T> void sendTransactionalMessage(T msg, int num){
        MessageBuilder<T> builder = MessageBuilder.withPayload(msg)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .setHeader("test", String.valueOf(num));
        Message message = builder.build();
        boolean flag = mySource.outputTX().send(message);
        System.out.println(flag);
    }
}
/**
 * 指定监听事务消息组为:myTxProducerGroup
 * corePoolSize 和 maximumPoolSize 默认都为1
 */
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

	/**
	 * 执行本地事务:也就是执行本地业务逻辑
	 *
	 * @param msg
	 * @param arg
	 * @return
	 */
	@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		Object num = msg.getHeaders().get("test");

		if ("1".equals(num)) {
			System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
			return RocketMQLocalTransactionState.UNKNOWN;
		}
		else if ("2".equals(num)) {
			System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
			return RocketMQLocalTransactionState.ROLLBACK;
		}
		System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit");
		return RocketMQLocalTransactionState.COMMIT;
	}

	/**
	 * 回调检查
	 *
	 * @param msg
	 * @return
	 */
	@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
		System.out.println("check: " + new String((byte[]) msg.getPayload()));
		return RocketMQLocalTransactionState.COMMIT;
	}
}

SpringBoot入口类:

@EnableBinding({Source.class, Sink.class, MySource.class, MySink.class})
@SpringBootApplication
public class MQApplication implements CommandLineRunner {


    @Autowired
    private Sender sender;

    public static void main(String[] args) {
        SpringApplication.run(MQApplication.class,args);
    }

    @Override
    public void run(String... args) throws Exception {
        sender.sendTransactionalMessage("transactionMessage1",1);
        sender.sendTransactionalMessage("transactionMessage2",2);
        sender.sendTransactionalMessage("transactionMessage3",3);
    }

测试结果
以上就是Spring Cloud Stream结合RocketMQ的简单入门使用!!!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/77242.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!