那些年背过的面试题-Kafka相关问题。
Kafka是一个分布式的消息发布-订阅队列系统。
Kafka是什么?应用场景有哪些?
Kafka是一个分布式的消息发布-订阅队列系统。也可以说是一个分布式流式处理平台。
Kafka具有三个关键功能:
-
消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
-
容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
-
流式处理平台:在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。
Kafka 主要有两大应用场景:
-
消息队列:建立实时流数据管道,以可靠地在系统或应用程序之间获取数据
-
数据处理:构建实时的流数据处理程序来转换或处理数据
kafka与其他消息组件对比
-
极致的性能:基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。
-
生态系统兼容性无可匹敌:Kafka 与周边生态系统的兼容性是最好的,没有之一,尤其在大数据和流计算领域。
特性 | ActiveMq | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
Kafka如何实现高吞吐的原理
-
读写文件依赖OS文件系统的页缓存,而不是在JVM内部缓存数据,利用OS来缓存,内存利用率高
-
sendfile技术(零拷贝),避免了传统网络IO四步流程
-
支持End-to-End的压缩
-
顺序IO以及常量时间get、put消息
-
Partition 可以很好的横向扩展和提供高并发处理
队列模型了解吗?Kafka 的消息模型知道吗?
使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。 比如:生产者发送 100 条消息,由两个消费者消费,一般情况下,两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
队列模型存在的问题:如果需要将生产者产生的消息分发给多个消费者,并且每个消费者都能接收到完整的消息内容,那么队列模型是无法实现的。
发布-订阅模型:Kafka 消息模型
发布-订阅模型主要是为了解决队列模型存在的问题。
发布订阅模型(Pub-Sub)使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则收不到这条消息。
在发布 – 订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。发布 – 订阅模型在功能层面上是可以兼容队列模型的。
Kafka 采用的就是发布 – 订阅模型。
RocketMQ 的消息模型和 Kafka 基本是完全一样的。唯一的区别是 Kafka 中没有队列这个概念,与之对应的是 Partition(分区)。
什么是Producer、Consumer、Broker、Topic、Partition
Kafka 发布订阅流程如下(引用JavaGuide的一张图片):
上述流程中将生产者发布的消息发送到 Topic(主题) 中,需要这些消息的消费者可以订阅这些 Topic(主题)。
-
Producer(生产者):Kafka中,生产者将消息发送到Kafka集群指定的主题中进行存储。
-
Consumer(消费者):消费者从Kafka集群指定的主题中读取消息记录。在读取主题数据时,需要设置消费组名,如果不设置,则Kafka消费者会默认生成一个消费组名称。
-
Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。每个 Broker 中又包含了 Topic 以及 Partition 这两个重要的概念
-
Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息,通过主题来区分不同业务类型的消息记录。
-
Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。
Kafka 的多副本机制了解吗?带来了什么好处?
多副本机制即Kafka中的副本机制。
副本(Replication):Kafka中,每个主题在创建时会要求指定它的副本数,默认是1。通过副本机制来保证Kafka分布式集群数据的高可用性,多副本机制实现了故障的自动转移。副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader之间副本的消息同步。副本是用于提高消息主题的高可用性。
-
Kafka中,主题的副本数等于Leader+Follower
-
AR(Assigned Replicas):分区中的所有副本统称为AR
-
ISR(In-Sync Replicas):所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR
-
OSR(Out-of-Sync Replicas):与leader副本同步滞后过多的副本(不包括leader副本)组成OSR
-
同步复制:同步复制要求所有能工作的follower副本都复制完,这条消息才会被确认为已成功提交,这种复制方式极大地影响了性能。
-
异步复制:异步复制方式下,follower副本异步地从leader副本中复制数据,数据只要被leader副本写入就被认为已经成功提交。如果leader写入成功,follower副本复制失败, 会造成数据丢失。
根据kafka服务数据同步策略,若存在单个分区故障,将造成partition分区返回ack阻塞,因此引入ISR。
ISR是指:所有与leader保持一定程度同步的follower副本(包括leader副本在内)组成ISR。
-
当ISR中的follower完成数据同步以后,leader给生产者返回ack
-
当ISR中的follower长时间未从leader同步数据,则从ISR中剔除follower,时间阈值通过参数replica.lag.time.max.ms设置,默认值为10秒
-
当ISR中的leader发生故障后,从ISR的follower中选举新的leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选
Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?
-
Kafka 通过给特定 Topic 指定多个 Partition,而各个 Partition 可以分布在不同的 Broker 上,提供了比较好的并发能力(负载均衡)。
-
Partition 可以指定对应的 Replica 数,这也极大地提高了消息存储的安全性,提高了容灾能力,不过也相应的增加了所需要的存储空间。
使用 Kafka 能否不引入 Zookeeper?
在Kafka 2.8 之前,Kafka 是重度依赖于 Zookeeper。
在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,可以以一种轻量级的方式来使用 Kafka。Kafka 3.3.1 版本是第一个将 KRaft(Kafka Raft)共识协议标记为生产就绪的版本。
Kafka如何保证消息的消费顺序?
在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是:
-
更改用户会员等级。
-
根据会员等级计算订单价格。
假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。
我们知道 Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。
每次添加消息到 Partition(分区) 的时候都会采用尾加法,如上图所示。Kafka 只能为我们保证 Partition(分区) 中的消息有序。消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。
也就是说:1 个 Topic 只对应一个 Partition。这样就保证了消息的消费顺序,但是破坏了 Kafka 的设计初衷。
Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。
总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:
-
1 个 Topic 只对应一个 Partition。
-
(推荐)发送消息的时候指定 key/Partition。
可以参考生产者客户端分区的原则,也就是将生产的数据按一定的规则发送到指定的分区中,这样就保证了分区内消息的有序性。
Kafka如何保证消息的不丢失?
生产者丢失消息的情况
生产者(Producer) 调用send
方法发送消息之后,消息可能因为网络问题并没有发送过去。
因此,为了确定消息是发送成功,我们要判断消息发送的结果,send
方法发送消息实际上是异步的操作,通过 get()
方法获取调用结果,但是这样就变成了同步操作。
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
//生产者发送消息成功
}
但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> {
//消息发送成功
}, ex -> {
//消息发送失败
});
另外,这里推荐为 Producer 的retries
(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你 3 次一下子就重试完了。
消费者丢失消息的情况
我们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。
解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。 但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
Kafka代理丢失消息
Kafka 为分区(Partition)引入了多副本(Replica)机制。我们知道,主题的副本数等于Leader+Follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。Follower 副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。
试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
解决办法有如下几种
-
给生产者设置 acks = all:acks 是 Kafka 生产者(Producer) 很重要的一个参数。
-
acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后就算被成功发送。
-
配置 acks = all 或者 acks = -1都表示只有所有 ISR 列表的副本全部收到消息时,生产者才会接收到来自服务器的响应。这种模式是最高级别的,也是最安全的,可以确保不止一个 Broker 接收到了消息,但是该模式的延迟会很高。
-
给topic设置 replication.factor >= 3:为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
-
设置 min.insync.replicas > 1:一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送,也就是一个leader副本至少有一个follower副本。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。
-
为了保证整个 Kafka 服务的高可用性,需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下假如两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1。
-
设置 unclean.leader.election.enable = false
-
Kafka 0.11.0.0 版本开始 unclean.leader.election.enable 参数的默认值由原来的 true 改为 false
-
多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
Kafka 如何保证消息不重复消费?
kafka 出现消息重复消费的原因:
-
服务端侧已经消费的数据没有成功提交 offset(根本原因)。
-
Kafka侧由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
-
消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的唯一主键等天然的幂等功能,或者在进行消息消费之前通过查库等方式先进行检查。
-
将
enable.auto.commit
参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交 offset 合适? -
处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
-
拉取到消息即提交:会有消息丢失的风险,Kafka默认采用这种方式。这种适用于允许消息延时的场景,可以通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底
消费失败会怎么样?
在消费过程中,当其中一个消息消费异常时,会不会卡住后续队列消息的消费?这样业务岂不是卡住了?
生产者代码:
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(KafkaConst.TEST_TOPIC, String.valueOf(i))
}
消费者代码:
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer (String message) throws InterruptedException {
log.info("kafka customer:{}", message);
Integer n = Integer.parseInt(message);
if (n % 5 == 0) {
throw new RuntimeException();
}
}
在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。
下面是一段消费的日志,可以看出当 test-0@95
重试多次后会被跳过。
2023-08-10 12:03:32.918 DEBUG 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Skipping seek of: test-0@95
2023-08-10 12:03:32.918 TRACE 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Seeking: test-0 to: 96
2023-08-10 12:03:32.918 INFO 9700 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-apple-1, groupId=apple] Seeking to offset 96 for partition test-0
因此,即使某个消息消费异常,Kafka 消费者仍然能够继续消费后续的消息,不会一直卡在当前消息,保证了业务的正常进行。
默认会重试多少次,重试是否有时间间隔?
默认配置下,消费异常会进行重试,重试次数是多少,重试是否有时间间隔?
源码 FailedRecordTracker
类有个 recovered
函数,返回 Boolean 值判断是否要进行重试。
下面是这个函数中判断是否重试的逻辑
public boolean recovered(ConsumerRecord<?, ?> record, Exception exception, @Nullable MessageListenerContainer container, @Nullable Consumer<?, ?> consumer) throws InterruptedException {
if (this.noRetries) {
// 不支持重试
this.attemptRecovery(record, exception, (TopicPartition)null, consumer);
return true;
} else {
// 获取已经失败的消费记录集合
Map<TopicPartition, FailedRecordTracker.FailedRecord> map = (Map)this.failures.get();
if (map == null) {
this.failures.set(new HashMap());
map = (Map)this.failures.get();
}
// 获取消费记录所在的Topic和Partition
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
FailedRecordTracker.FailedRecord failedRecord = this.getFailedRecordInstance(record, exception, map, topicPartition);
// 通知注册的重试监听器,消息投递失败
this.retryListeners.forEach((rl) -> {
rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get());
});
// 获取下一次重试的时间间隔
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
if (nextBackOff != -1L) {
if (container == null) {
Thread.sleep(nextBackOff);
} else {
ListenerUtils.stoppableSleep(container, nextBackOff);
}
return false;
} else {
this.attemptRecovery(record, exception, topicPartition, consumer);
map.remove(topicPartition);
if (map.isEmpty()) {
this.failures.remove();
}
return true;
}
}
}
其中, BackOffExecution.STOP
的值为 -1。
@FunctionalInterfacepublic
interface BackOffExecution {
long STOP = -1;
long nextBackOff();
}
nextBackOff
的值调用 BackOff
类的 nextBackOff()
函数。
如果当前执行次数大于最大执行次数则返回 STOP
,即超过这个最大执行次数后才会停止重试。
public long nextBackOff() {
this.currentAttempts++;
if (this.currentAttempts <= getMaxAttempts()) {
return getInterval();
} else {
return STOP;
}
}
那么这个 getMaxAttempts
的值又是多少呢?回到最开始,当执行出错会进入 DefaultErrorHandler
。DefaultErrorHandler
默认的构造函数是:
public DefaultErrorHandler() {
this(null, SeekUtils.DEFAULT_BACK_OFF);
}
SeekUtils.DEFAULT_BACK_OFF
定义的是:
public static final int DEFAULT_MAX_FAILURES = 10;
public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0L, 9L);
DEFAULT_MAX_FAILURES
的值是 10,currentAttempts
从 0 到 9,所以总共会执行 10 次,每次重试的时间间隔为 0。
总结一下:Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。
如何自定义重试次数以及时间间隔?
从上面的代码可以知道,默认错误处理器的重试次数以及时间间隔是由 FixedBackOff
控制的,FixedBackOff
是 DefaultErrorHandler
初始化时默认的。所以自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler
初始化的时候传入自定义的 FixedBackOff
即可。重新实现一个 KafkaListenerContainerFactory
,调用 setCommonErrorHandler
设置新的自定义的错误处理器就可以实现。
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
// 自定义重试时间间隔以及次数
FixedBackOff fixedBackOff = new FixedBackOff(1000, 5);
factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff));
factory.setConsumerFactory(consumerFactory);
return factory;
}
如何在重试失败后进行告警?
自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 DefaultErrorHandler
的 handleRemaining
函数,加上自定义的告警等操作。
@Slf4j
public class DelErrorHandler extends DefaultErrorHandler {
public DelErrorHandler(FixedBackOff backOff) {
super(null, backOff);
}
@Override
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
super.handleRemaining(thrownException, records, consumer, container);
log.info("重试多次失败");
// 自定义操作
}
}
DefaultErrorHandler
只是默认的一个错误处理器,Spring Kafka 还提供了 CommonErrorHandler
接口。手动实现 CommonErrorHandler
就可以实现更多的自定义操作,有很高的灵活性。
例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。
重试失败后的数据如何再次处理?
当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?
死信队列(Dead Letter Queue,简称 DLQ) 是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被”丢弃”或”死亡”的情况。
当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。
@RetryableTopic
是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。
// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒
@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 100, maxDelay = 1000))
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {
log.info("kafka customer:{}", message);
Integer n = Integer.parseInt(message);
if (n % 5 == 0) {
throw new RuntimeException();
}
System.out.println(n);
}
当达到最大重试次数后,如果仍然无法成功处理消息,消息会被发送到对应的死信队列中。对于死信队列的处理,既可以用 @DltHandler
处理,也可以使用 @KafkaListener
重新消费。
看到最后了,一键三连➕关注,在此谢过各位。
原文始发于微信公众号(乐码人生):面试题目-Kafka
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/213784.html