what(Spring Cloud Stream介绍)
Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架,是一个基于Spring Boot创建的、独立生产级的、提供消息代理的Spring应用。
why
多个微服务之间消息中间件实现消息流达到多服务交互的目的。
how(架构实现)
应用程序通过inputs或outputs来与Spring Cloud Stream中的Binder交互,通过配置绑定。
Spring Cloud Stream的Binder负责与中间件交互。
抽象绑定器(The Binder Abstraction)
Binder主要作用是通过Binder使得我们可以更方便的连接中间件
Spring Cloud Stream实现了Kafka和RabbitMQ的Binder实现,也包括一个TestSupportBinder用于测试使用,也可以自己实现Binder。
其中,Spring Cloud Stream 使用了Spring Boot的自动配置,并抽象了Binder,所以可以很方便的替换为Kafka或RabbitMQ,只需修改配置而无需修改任何代码
编程模型
主要有如下三个核心概念:
-
Destination Binders(目的地绑定器):
负责与外部消息系统集成交互的组件
Destination Binders是Spring Cloud Stream与外部消息中间件提供了必要的配置和实现促进集成的扩展组件。集成了生产者和消费者的消息的路由、连接和委托、数据类型转换、用户代码调用等。
尽管Binders帮我们处理了许多事情,我们仍需要对他进行配置。 -
Destination Bindings(目的地绑定):
在外部消息系统和应用的生产者和消费者之间的桥梁(由Destination Binders创建)
Destination Bindings 提供连接外部消息中间件和应用提供的生产者和消费者中间的桥梁。
使用@EnableBinding
注解打在一个配置类上来定义一个Destination Binding,这个注解
本身包含有@Configuration,会触发Spring Cloud Stream的基本配置。 -
Message (消息):
用于生产者、消费者通过Destination Binders沟通的规范数据。
主要概念(Main Concepts)
发布-订阅(Persistent Publish-Subscribe Support)
图中是经典的Spring Cloud Stream的发布-订阅模型,生产者生产消息发布在topic(共享主题)上,然后消费者通过订阅这个topic来消费消息
其中topic对应于Spring Cloud Stream中的destinations(Kafka 的topic,RabbitMQ的exchanges)
配置:spring.cloud.stream.bindings.input.destination
消费组(Consumer Groups)
对多个订阅者进行分组避免出现所有实例都去消费同一条数据造成重复消费的问题。
场景:同一个应用中只能有一个实例消费该消息
特性:当一个应用程序不同实例放置在一个具有竞争关系的消费组中,那么组里面的实例只能有一个实例能够消费消息
配置:spring.cloud.stream.bindings.<channelName>.group
如图所示,GroupA中只能由一个Service消费消息,GroupB也是一样,如果没有给实例设置分组,那么默认分配在一个匿名消费组里,会与其它组一起消费,就会出现重复消费的问题
注意:以上订阅主题的消费组都是持久化的,除了匿名的消费组
分区支持(Partitioning Support)
分区的作用是为了确保具有共同特征标识的数据由同一个消费者实例进行处理。
可以让那些不支持分区的中间件(例如RabbitMQ)也可以使用分区。
注意:要使用分区处理,你必须同时对生产者和消费者进行配置。
消费者类型(Consumer Types)
- Message-driven (消息驱动型,有时简称为异步),Spring Cloud 2.0版本支持这种类型
- Polled (轮询型,有时简称为 同步)
最简示例
示例中将展示如何创建一个Spring Cloud Stream 的应用程序,它是如何从消息中间件接收消的。其中消息中间件有两种选择:RabbitMQ和Kafka,示例中以RabbitMQ为准。
-
安装并运行RabbitMQ,参考:https://blog.csdn.net/qq_32828253/article/details/110262311
-
新建一个Spring Boot 项目,并添加依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <!-- Stream rabbit 依赖中包含 binder-rabbit,所以只需导入此依赖即可 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <scope>test</scope> <classifier>test-binder</classifier> <type>test-jar</type> </dependency>
-
配置配置文件(主要是RabbitMQ),我这里是本机:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
-
添加一个Message Handler,我这里直接写在启动类中:
@SpringBootApplication @EnableBinding(Sink.class) public class SpringCloudStreamApplication { public static void main(String[] args) { SpringApplication.run(SpringCloudStreamApplication.class, args); } @StreamListener(Sink.INPUT) public void receive(Object payload) { System.out.println("Received: " + payload); } }
@EnableBinding(Sink.class)
:开启Skin的绑定,会自动创建绑定到Skin.INPUT的通道目标(即queue、topic或其它)@StreamListener(Sink.INPUT)
:流入数据的处理方法,监听消息类型为String的消息
-
启动项目后打开RabbitMQ并登录控制台 http://localhost:15672/ ,点击Connections,发现多出了一个连接,点击Queue,也有相应的队列被创建
-
点击进入
input.anonymous.NYJUfXvXTQeNzardQfAB_g
队列,推送消息到监听者,这时发现在项目控制台下打印了对应的message
至此,你已经完成了一个简单的消息驱动微服务示例。
在业务中,需要获取这些对象手动向管道发送消息,还可以手动注入这些接口实现消息发送:
@RunWith(SpringRunner.class)
@EnableBinding(value = {Source.class})
@SpringBootTest
public class TestSendMessage {
@Autowired
private Source source;
/**
* 会在控制台得到"Message from Pipe"的消息
*/
@Test
public void testSender() {
source.output().send(MessageBuilder.withPayload("Message from Pipe").build());
}
}
自定义消息通道–发送与接收
@EnableBinding
注解启用绑定的时候,可以带着一个或多个接口作为参数(示例中使用的是Skin接口),一个接口往往声明了输入和输出的渠道,Spring Cloud Stream提供了Source
、Skin
、Processor
这三个接口,我们先来看下它们的源码定义:
Skin接口源码:
Skin一般用来标识消息消费者的约定
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
@Input
注解区分了一个输入channel,通过它来接收消息到应用中,注解都支持设置channel的名称,默认值是使用注解下方法的名称SubscribaleChannel
:继承于MessageChannel接口,定义了维护消息通道订阅者的方法
Source接口源码:
与Skin相反,用于标识消息生产者的约定
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
@Output
注解区分输出channel,消息通过它从应用输出到外部。MessageChannel
:定义了向维护消息通道发送消息的方法
Processor接口源码:
继承了Skin和Source,自然也拥有它们的作用,标识了消息生产者和消费者
public interface Processor extends Source, Sink {
}
注意:在定义一个管道的时候,不能有输入、输出管道名相同的,否则会出现发送消息被自己接收或报错的情况
自定义消息通道实现发送与接收
了解上面默认的一些输入、输出绑定接口定义后,我们可以创建自己的绑定通道。
-
创建一个新的模块
spring-cloud-stream-custom
,其它配置同示例一样 -
自定义一个接收消息的接口
public interface MyPipe { String INPUT = "output"; /** * 将消息发送到名为output的管道中,那么监听output管道的输入流一端就能获得数据 */ @Input(MyPipe.INPUT) SubscribableChannel input(); }
可以看到,定义的是名为“output”管道的输入
-
在启动类加上自定义的接口与Skin,并定义消息的发送与接收handler
@SpringBootApplication @EnableBinding({Sink.class, MyPipe.class}) public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } @StreamListener(Processor.INPUT) // 监听输入的消息,也就是名为"input"的管道 @SendTo(Processor.OUTPUT) // 输出大写后的消息到名为“output”管道中 public String handle(String value) { System.out.println("Received: " + value); return value.toUpperCase(); } @StreamListener(MyPipe.INPUT) // 监听"output"管道的输入消息,也就是上面的转换大写之后的消息 public void receiveFromMyPipe(String value) { System.out.println("Received in mypipe: " + value); } }
-
启动项目,进入RabbitMQ控制台 http://localhost:15672/,找到对应的Queue
- 在控制台中发送消息’helloworld’到队列
- handle方法会接收到消息打印到控制台并转换成大写字母后输出到名为”output”管道
- receiveFromMyPipe方法监听到名为”output”管道的消息,打印到大写的”HELLOWORLD”到控制台
至此,你已经实现了一个自定义消息管道功能,这里可能会出现如下问题:
-
在一个接口中定义多个输出流管道,如何注入指定的管道?
/** * 多个输出管道 */ public interface MutiplePipe @Output("output1") MessageChannel output1(); @Output("output2") MessageChannel output2(); }
解决方案是在注入指定管道
MessageChannel
时指定名称,因为每个MessageChannel都是使用消息通道的名字做为bean的名称@Autowired @Qualifier("output1") MessageChannel messageChannel;
常用配置
给消费者设置消费组和主题:
- 设置消费组:
spring.cloud.stream.bindings.<通道名>.group=<消费组名>
- 设置主题:
spring.cloud.stream.bindings.<通道名>.destination=<主题名>
消费者开启分区,指定实例数量与实例索引:
- 开启消费分区:
spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
- 消费实例数量:
spring.cloud.stream.instanceCount=1
(具体指定) - 实例索引:
spring.cloud.stream.instanceIndex=1
#设置当前实例的索引值
生产者指定分区键:
- 分区键:
spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分区键>
- 分区数量:
spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分区数量>
总结
Spring Cloud Stream 就像JPA一样,抽象了概念,对于一些实现可以非常轻松的替换。并且可以兼容不同的消息中间件,使得微服务开发的高度解耦,可以更多的关注业务而忽略底层技术实现
相关文档
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/17822.html