本文大约需要三分钟的时间,点开上面的歌曲,边听边看。
正文开始:
常用的绝大多数消息队列,如 RocketMQ
、RabbitMQ
等在消息传输上都只能保证至少传输成功一次(At least once
), 但是不能保证只传输成功一次(Exactly once
), 重复发送.
生产者保证消息可靠性
消息发送方式
在之前的文章架构设计之消息 中,我们知道了 生产者生产消息有三种方式:
-
同步发送。发送线程同步等待,通过同步检查 Brocker
返回的状态来判断消息是否持久化成功。从而保证消息的可靠性. -
异步发送。发送线程异步等待,可以通过传入的回调函数来判断消息持久化状态. 根据状态来判断是否需要重试消息,从而保证消息的可靠性. -
one way
方式。这种方式不能保证消息的可靠性.发送端发送完成之后,调用该发送接口后立刻返回,并不返回发送的结果。
除了同步发送和异步发送这两种方式来保证消息可靠性之外.
重试机制
在发送消息的过程中,Producer
还有 消息发送的重试机制来提高消息的可靠性.
如果broker
只有一个节点,则broker
宕机了,即使producer
有重试机制,也没用,因此利用多主模式,当某台broker
宕机了,换一台broker
进行投递。
当发送端需要发送消息时,如果发送端中缓存了topic
的路由信息,并包含了消息队列,则直接返回该路由信息,如果没有缓存或没有消息队列,则向NameServer
查询该topic
的路由信息,查询到路由消息之后,采用指定的队列选择策略选择相应的queue
发送消息,默认是采用轮询策略,发送成功则返回, 收到异常则根据相应的策略进行重试,可以根据发送端感知到的Broker
的时延、上次发送失败的Broker
信息和发送端配置的是否重试不同Broker
的参数以及发送端设置的最大超时时间等等策略来灵活地实现不同等级的消息发送可靠性保证。重试策略可以有效的保证消息发送成功的概率,最终提高消息发送的可靠性。
总结两点:
-
Producer
选择一个MessageQueue
发送消息时。默认轮询方式选择MessageQueue
, 如果启用Broker
端的故障延迟机制,则会判断MessageQueue
的Broker
是否可用,才发送消息到该Message
中。 -
Producer
在使用 Sync 方式发送消息时会重新N
次,N
可由Producer
端的配置决定的。
发送消息的返回状态
这里再补充一点, 同步方式发送成功,发送的状态是由 SendStatus
这个枚举类决定的.
public enum SendStatus {
/**
* 发送成功
*/
SEND_OK,
/**
* 刷盘超时
*/
FLUSH_DISK_TIMEOUT,
/**
* 同步从节点超时
*/
FLUSH_SLAVE_TIMEOUT,
/**
* 从节点 不可用
*/
SLAVE_NOT_AVAILABLE,
}
-
SEND_OK
: 表示消息发送成功. 但是这个并不代表它是可靠的。要确保消息不丢失,还需要启动同步Master
服务器 或者同步刷盘, 即SYNC_MASTER
和SYNC_FLUSH
-
FLUSH_DISK_TIMEOUT
: 消息发送成功,但是刷盘超时。此时消息已经进去了内存(MessageQueue
)中, 这种情况下,只有服务器宕机, 消息才会丢失。 -
FLUSH_SLAVE_TIMEOUT
: 消息发送成功, 但是同步到SLAVE
时超时。同样的。此时消息已经进去了内存(MessageQueue
)中, 这种情况下,只有服务器宕机, 消息才会丢失。 -
SLAVE_NOT_AVAILABLE
: 消息发送成功, 但是没有SLAVE
机器可用。
FLUSH_DISK_TIMEOUT
, FLUSH_SLAVE_TIMEOUT
, SLAVE_NOT_AVAILABLE
都是Broker
端异常导致的不正常的情况。
Broker的消息可靠性
我们知道了 当消息发送成功,写入了 MessageQueue
但是没有持久到磁盘上的时候,就会造成消息的丢失。在 官方的文档中, 提及到了 五种因为Broker
端异常导致的消息可能会丢失的情况:
-
Broker
非正常关闭 -
Broker
异常Crash
-
OS Crash
-
机器掉电,但是能立即恢复供电情况 -
机器无法开机(可能是 cpu
、主板、内存等关键设备损坏) -
磁盘设备损坏
其中 1-4
属于硬件资源可立即恢复情况,RocketMQ
在这四种情况下能保证消息不丢(同步刷盘),或者丢失少量数据(异步刷盘)。5-6
属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ
在这两种情况下,通过异步复制,可保证99%
的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ
从3.0
版本开始支持同步双写。
可以看出 Broker 端的消息可靠性主要通过 单机情况下的刷盘策略 和 主从之间数据复制 来保证的。
刷盘策略
在一个节点中,一条消息会从 Producer发送给 Broker
, Broker
端把消息存储到 MessageQueue
中, 也就是 内存中, 在 RocketMQ
的源码里才能出消息内容的结构是 MappedFile
, 然后通过 刷盘机制(同步刷盘,异步刷盘)写入到物理磁盘上. 完成消息的持久化。
在源码中 刷盘的线程 由 FlushCommitLogService
类表示。这个类有三个子类:
-
FlushRealTimeService
: 当配置为异步刷盘策略 并且没有开启TransientStorePool
的时候,Broker
会运行一个服务FlushRealTimeService
用来刷新缓冲区的消息内容到磁盘,这个服务使用一个独立的线程来做刷盘这件事情,默认情况下每隔500ms
来检查一次是否需要刷盘。 -
CommitRealTimeService
: 异步刷盘且开启TransientStorePool
,使用CommitRealService
。 -
GroupCommitService
: 同步刷盘使用GroupCommitService
.
TransientStorePool
是短暂的消息存储池。这里直接开辟默认5
个1G
的直接内存ByteBuffer
,用来临时存储消息。它还引入了内存锁的机制,避免直接内存的数据被替换到系统中的Swap
分区中,提高系统存储性能,使RocketMQ
消息低延迟、高吞吐量。
同步刷盘
RocketMQ
使用 GroupCommitService
这个对象来实现 同步刷盘。
消息写入内存的 PageCache
后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。
public void run() {
// Broker 不关闭时。
while (!this.isStopped()) {
try {
// 等待 10 毫秒
this.waitForRunning(10);
// 执行一次刷盘
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// 正常情况下shutdown,等待10ms 请求到来,然后flush到磁盘。
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
从代码中看出, 当Broker
正常关闭的时候,还是等待10ms
的,等待这10ms
内的请求。处理完请求, 将数据保存到磁盘上才会关闭线程。细节满满。
异步刷盘
消息写入到内存的 PageCache
中,就立刻给客户端返回写操作成功,当 PageCache
中的消息积累到一定的量时,触发一次写操作,或者定时等策略将 PageCache
中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是 PageCache
中的数据可能丢失,不能保证数据绝对的安全。
public void run() {
while (!this.isStopped()) {
// 省略部分代码...
try {
// flushCommitLogTimed 是否定时刷新
if (flushCommitLogTimed) {
// interval为配置的时间
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
long begin = System.currentTimeMillis();
// 刷盘、 flushPhysicQueueLeastPages 每次刷新的页数。
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
// 设置检查点
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// 正常关闭时,保证在退出之前全部刷新到磁盘
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
}
}
主从同步
在集群环境中, 如果一个 Broker
有 master
和 slave
时,就需要将 master
上的消息复制到 slave
上, 复制的方式有两种:
-
同步复制: master
和slave
均写成功,才返回客户端成功。master
挂了以后可以保证数据不丢失。但是数据同步复制会增加数据延迟风险,降低吞吐量。简单说一下,同步复制其实是 在 CommitLog 将消息 刷盘之后,同步的方式将消息同步给Slave
节点. -
异步复制: master
写成功,返回客户端成功。拥有较低的延迟和较高的吞吐量, 但是当master
出现故障后,有可能造成数据丢失。异步复制的方式,其实是Slave
节点中会启动一个HAService
线程,定时的去同步Master节点的数据,延时大概在毫秒级。
后面我们会有一篇文章专门介绍 RocketMQ
主从复制的实现细节。这里就不多做介绍了。
消费者的消息可靠性
RocketMQ
在消费端 实现了 At least Once
机制,来保证消息的可靠性消费.
什么是 at lease once
呢?
consumer
会把消息先pull
到本地, 消费完成之后,才向 Broker
端发送 ack
.
RocketMQ
有三种措施来实现可靠性.
消费重试
消费者从 RocketMQ
拉取到消息之后,需要返回消费成功来表示业务方正常消费完成。因此只有返回CONSUME_SUCCESS才算消费完成,如果返回 CONSUME_LATER
则会按照不同的 messageDelayLevel
时间进行再次消费,时间分级从秒到小时,最长时间为2个小时后再次进行消费重试,如果消费满 16
次之后还是未能消费成功,则不再重试,会将消息发送到死信队列,从而保证消息存储的可靠性。这一措施主要是通过 延时消息队列来实现。
死信队列
未能成功消费的消息,消息队列并不会立刻将消息丢弃,而是将消息发送到死信队列,其名称是在原队列名称前加%DLQ%
,如果消息最终进入了死信队列,则可以通过RocketMQ
提供的相关接口从死信队列获取到相应的消息,保证了消息消费的可靠性。
消息回溯
回溯消费是指 Consumer
已经消费成功的消息,或者之前消费业务逻辑有问题,现在需要重新消费。要支持此功能,则 Broker
存储端在向 Consumer
消费端投递成功消息后,消息仍然需要保留。重新消费一般是按照时间维度,例如由于 Consumer
系统故障,恢复后需要重新消费1
小时前的数据。RocketMQ Broker
提供了一种机制,可以按照时间维度来回退消费进度,这样就可以保证只要发送成功的消息,只要消息没有过期,消息始终是可以消费到的。
最后
希望和你一起遇见更好的自己
既然,看到这里了,就点个赞再走吧~
原文始发于微信公众号(方家小白):RocketMQ系列~消息可靠性实现原理
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/37646.html