❝
大家好呀,我是小羊,如果大家喜欢我的文章的话😁,就关注我一起学习进步吧~
❞
1.准备
先简单写一个java kafka demo,然后通过分析源码,看看消息是如何发送出去了,由于涉及到的内容比较多,先说几个比较关键的步骤,然后再对每个步骤进行分析
kafka_demo
1.调用org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord<K, V> record)方法发送消息。


2.源码
经过一些方法跳转,到达了 org.apache.kafka.clients.producer.KafkaProducer.doSend(ProducerRecord<K, V> record) 方法,这个方法里面会处理发送消息的主要逻辑。
2.1 拉取broker远程信息
可以看到,在发送消息之前,会有一些准备工作,主要是为了拿到一些主数据, 比如 远程 broker 的ip,topic partition 等等。
2.2 把消息发送到缓冲区中。
accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs) 再将缓冲区中的消息封装成batch.
2.3 唤醒Sender 将消息发送出去
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
2.4 多线程循环发送
Sender 是一个 实现了 Runnable 接口的类,发送消息是采用多线程发送的,可以看到 run() 方法是一个死循环,void run(long now) 经过一系列的判断是否可以发送,最终执行 sendProducerData(now) 方法执行消息发送方法。


void run(long now) {
if (transactionManager != null) {
try {
if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
// Check if the previous run expired batches which requires a reset of the producer state.
transactionManager.resetProducerId();
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
// as long as there are outstanding transactional requests, we simply wait for them to return
client.poll(retryBackoffMs, now);
return;
}
// do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case).
if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, now);
return;
} else if (transactionManager.hasAbortableError()) {
accumulator.abortUndrainedBatches(transactionManager.lastError());
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request: {}", e);
transactionManager.authenticationFailed(e);
}
}
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
}
2.4 封装消息到 batch
private long sendProducerData(long now) 也会做一些准备工作,处理消息批次 batch ,最终执行到 private void sendProduceRequest(long now, int destination, short acks, int timeout, List

2.4 doSend() 方法将消息发送
kafkaClient.send() 会继续调用 NetWorkClient.doSend() 方法将消息发送到 broker 中,再往下就是一些 kafka Selectable 和 底层 nio 相关的代码,这边先不详细介绍了。
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String nodeId = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
clientRequest.correlationId(), nodeId);
} else {
log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), nodeId);
}
}
Send send = request.toSend(nodeId, header);
InFlightRequest inFlightRequest = new InFlightRequest(
header,
clientRequest.createdTimeMs(),
clientRequest.destination(),
clientRequest.callback(),
clientRequest.expectResponse(),
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
selector.send(inFlightRequest.send);
}
3.流程图
经过上述分析,可以大致把 流程图画出来

4.总结
这种架构,通过异步解耦的方式,先将消息写入缓冲区,然后通过多线程 nio 将消息发送出去,好处是显而易见的,
1.解耦,写入缓冲区和发送消息完全分开,不会互相影响,不会因为一个地方出问题,而导致其他地方不可用。
2.异步,因为是直接操作内存,写入缓冲区会很快,所以客户端可以发送后立刻返回,提高效率
3.可以快速处理大量数据,由于selector 是多线程nio 的,并且消息会封装成batch 批量发送,整个流程是在内存中处理的,所以可以快速发送大量数据。
我们在设计系统时,也可以学习kafkaProduct 的设计理念,让系统可以支持高并发,高吞吐。
今天的分享就先到这里啦。
喜欢我的话,可以给我点个赞呀。

原文始发于微信公众号(小羊架构):kafka 学习笔记 kafkaProduct源码分析,一条消息是怎么被发送出去的?
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/171728.html