StreamingQueryListener
StreamingQueryListener,即监听StreamingQuery各种事件的接口,如下:
abstract class StreamingQueryListener {
import StreamingQueryListener._
// 查询开始时调用
def onQueryStarted(event: QueryStartedEvent): Unit
// 查询过程中状态发生更新时调用
def onQueryProgress(event: QueryProgressEvent): Unit
// 查询结束时调用
def onQueryTerminated(event: QueryTerminatedEvent): Unit
}
在QueryProgressEvent中,我们是可以拿到每个Source消费的Offset的。因此,基于StreamingQueryListener,可以将消费的offset的提交到kafka集群,进而实现对Kafka Lag的监控。
基于StreamingQueryListener向Kafka提交Offset
监控Kafka Lag的关键是能够向Kafka集群提交消费的Offset,以下示例演示了如何通过StreamingQueryListener向Kafka提交Offset。
KafkaOffsetCommiter
package com.bigdata.structured.streaming.monitor
import java.util
import java.util.Properties
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.slf4j.LoggerFactory
/**
* Author: Wang Pei
* Summary:
* 向Kafka集群提交Offset的Listener
*/
class KafkaOffsetCommiter(brokers: String, group: String) extends StreamingQueryListener {
val logger = LoggerFactory.getLogger(this.getClass)
// Kafka配置
val properties= new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
val kafkaConsumer = new KafkaConsumer[String, String](properties)
def onQueryStarted(event: QueryStartedEvent): Unit = {}
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
// 提交Offset
def onQueryProgress(event: QueryProgressEvent): Unit = {
// 遍历所有Source
event.progress.sources.foreach(source=>{
val objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.USE_LONG_FOR_INTS, true)
.registerModule(DefaultScalaModule)
val endOffset = objectMapper.readValue(source.endOffset,classOf[Map[String, Map[String, Long]]])
// 遍历Source中的每个Topic
for((topic,topicEndOffset) <- endOffset){
val topicPartitionsOffset = new util.HashMap[TopicPartition, OffsetAndMetadata]()
//遍历Topic中的每个Partition
for ((partition,offset) <- topicEndOffset) {
val topicPartition = new TopicPartition(topic, partition.toInt)
val offsetAndMetadata = new OffsetAndMetadata(offset)
topicPartitionsOffset.put(topicPartition,offsetAndMetadata)
}
logger.warn(s"提交偏移量... Topic: $topic Group: $group Offset: $topicEndOffset")
kafkaConsumer.commitSync(topicPartitionsOffset)
}
})
}
}
Structured Streaming App
package com.bigdata.structured.streaming.monitor
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
/**
* Author: Wang Pei
* Summary:
* 读取Kafka数据
*/
object ReadKafkaApp {
def main(args: Array[String]): Unit = {
val kafkaBrokers="kafka01:9092,kafka02:9092,kafka03:9092"
val kafkaGroup="read_kafka_c2"
val kafkaTopics1="topic_1,test_2"
val kafkaTopics2="test_3"
val checkpointDir="/Users/wangpei/data/apps/read_kafka/checkpoint/"
val queryName="read_kafka"
val spark = SparkSession.builder().master("local[3]").appName(this.getClass.getSimpleName.replace("$","")).getOrCreate()
import spark.implicits._
// 添加监听器
val kafkaOffsetCommiter = new KafkaOffsetCommiter(kafkaBrokers,kafkaGroup)
spark.streams.addListener(kafkaOffsetCommiter)
// Kafka数据源1
val inputTable1=spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaBrokers )
.option("subscribe",kafkaTopics1)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.select($"value")
// Kafka数据源2
val inputTable2=spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaBrokers )
.option("subscribe",kafkaTopics2)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.select($"value")
// 结果表
val resultTable = inputTable1.union(inputTable2)
// 启动Query
val query: StreamingQuery =resultTable
.writeStream
.format("console")
.option("truncate","false")
.outputMode("append")
.trigger(Trigger.ProcessingTime("2 seconds"))
.queryName(queryName)
.option("checkpointLocation", checkpointDir)
.start()
spark.streams.awaitAnyTermination()
}
}
查看Kafka Offset 可通过以下命令查看Topic消费者组对应的Offset。
bin/kafka-consumer-offset-checker.sh --zookeeper kafka01:2181 --topic test_3 --group read_kafka_c2
Group Topic Pid Offset logSize Lag Owner
read_kafka_c2 test_3 0 32 32 0 none
read_kafka_c2 test_3 1 32 32 0 none
read_kafka_c2 test_3 2 34 34 0 none
同理,可查看另外两个Topic对应的Group的Offset。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13942.html