kafka 学习笔记 kafkaProduct源码分析,一条消息是怎么被发送出去的?

大家好呀,我是小羊,如果大家喜欢我的文章的话😁,就关注我一起学习进步吧~

1.准备

先简单写一个java kafka demo,然后通过分析源码,看看消息是如何发送出去了,由于涉及到的内容比较多,先说几个比较关键的步骤,然后再对每个步骤进行分析

kafka_demo

1.调用org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord<K, V> record)方法发送消息。

kafka 学习笔记 kafkaProduct源码分析,一条消息是怎么被发送出去的?
kafka 学习笔记 kafkaProduct源码分析,一条消息是怎么被发送出去的?

2.源码

经过一些方法跳转,到达了 org.apache.kafka.clients.producer.KafkaProducer.doSend(ProducerRecord<K, V> record) 方法,这个方法里面会处理发送消息的主要逻辑。

2.1 拉取broker远程信息

可以看到,在发送消息之前,会有一些准备工作,主要是为了拿到一些主数据, 比如 远程 broker 的ip,topic  partition 等等。

2.2 把消息发送到缓冲区中。

accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs)  再将缓冲区中的消息封装成batch.

2.3 唤醒Sender 将消息发送出去

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // first make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
 
            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
 
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
 
            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);
 
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            this.errors.record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw new InterruptException(e);
        } catch (BufferExhaustedException e) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw e;
        } catch (KafkaException e) {
            this.errors.record();
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw e;
        } catch (Exception e) {
            // we notify interceptor about all exceptions, since onSend is called before anything else in this method
            if (this.interceptors != null)
                this.interceptors.onSendError(record, tp, e);
            throw e;
        }
    }

2.4 多线程循环发送

Sender 是一个 实现了 Runnable 接口的类,发送消息是采用多线程发送的,可以看到 run() 方法是一个死循环,void run(long now) 经过一系列的判断是否可以发送,最终执行 sendProducerData(now) 方法执行消息发送方法。

kafka 学习笔记 kafkaProduct源码分析,一条消息是怎么被发送出去的?
kafka 学习笔记 kafkaProduct源码分析,一条消息是怎么被发送出去的?
void run(long now) {
        if (transactionManager != null) {
            try {
                if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                    // Check if the previous run expired batches which requires a reset of the producer state.
                    transactionManager.resetProducerId();
 
                if (!transactionManager.isTransactional()) {
                    // this is an idempotent producer, so make sure we have a producer id
                    maybeWaitForProducerId();
                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                    transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
                            "some previously sent messages and can no longer retry them. It isn't safe to continue."));
                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                    // as long as there are outstanding transactional requests, we simply wait for them to return
                    client.poll(retryBackoffMs, now);
                    return;
                }
 
                // do not continue sending if the transaction manager is in a failed state or if there
                // is no producer id (for the idempotent case).
                if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                    RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                        maybeAbortBatches(lastError);
                    client.poll(retryBackoffMs, now);
                    return;
                } else if (transactionManager.hasAbortableError()) {
                    accumulator.abortUndrainedBatches(transactionManager.lastError());
                }
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request: {}", e);
                transactionManager.authenticationFailed(e);
            }
        }
 
        long pollTimeout = sendProducerData(now);
        client.poll(pollTimeout, now);
    }

2.4 封装消息到 batch

private long sendProducerData(long now) 也会做一些准备工作,处理消息批次 batch ,最终执行到    private void sendProduceRequest(long now, int destination, short acks, int timeout, Listbatches) 方法 将 消息封装到  ClientRequest 中,最终由  KafkaClient.send() 方法发送消息。

kafka 学习笔记 kafkaProduct源码分析,一条消息是怎么被发送出去的?

2.4 doSend() 方法将消息发送

kafkaClient.send() 会继续调用  NetWorkClient.doSend() 方法将消息发送到 broker 中,再往下就是一些  kafka Selectable  和 底层 nio 相关的代码,这边先不详细介绍了。

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    String nodeId = clientRequest.destination();
    RequestHeader header = clientRequest.makeHeader(request.version());
    if (log.isDebugEnabled()) {
        int latestClientVersion = clientRequest.apiKey().latestVersion();
        if (header.apiVersion() == latestClientVersion) {
            log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
                    clientRequest.correlationId(), nodeId);
        } else {
            log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
                    header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), nodeId);
        }
    }
    Send send = request.toSend(nodeId, header);
    InFlightRequest inFlightRequest = new InFlightRequest(
            header,
            clientRequest.createdTimeMs(),
            clientRequest.destination(),
            clientRequest.callback(),
            clientRequest.expectResponse(),
            isInternalRequest,
            request,
            send,
            now);
    this.inFlightRequests.add(inFlightRequest);
    selector.send(inFlightRequest.send);
}


3.流程图

经过上述分析,可以大致把 流程图画出来

kafka 学习笔记 kafkaProduct源码分析,一条消息是怎么被发送出去的?

4.总结

这种架构,通过异步解耦的方式,先将消息写入缓冲区,然后通过多线程 nio 将消息发送出去,好处是显而易见的,

1.解耦,写入缓冲区和发送消息完全分开,不会互相影响,不会因为一个地方出问题,而导致其他地方不可用。

2.异步,因为是直接操作内存,写入缓冲区会很快,所以客户端可以发送后立刻返回,提高效率

3.可以快速处理大量数据,由于selector 是多线程nio 的,并且消息会封装成batch 批量发送,整个流程是在内存中处理的,所以可以快速发送大量数据。

我们在设计系统时,也可以学习kafkaProduct 的设计理念,让系统可以支持高并发,高吞吐。

今天的分享就先到这里啦。

喜欢我的话,可以给我点个赞呀。

kafka 学习笔记 kafkaProduct源码分析,一条消息是怎么被发送出去的?


原文始发于微信公众号(小羊架构):kafka 学习笔记 kafkaProduct源码分析,一条消息是怎么被发送出去的?

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/171728.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!