flume写入Kafka错误:The server experienced an unexpected error when processing the request

导读:本篇文章讲解 flume写入Kafka错误:The server experienced an unexpected error when processing the request,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1. 今天在使用flume清洗日志数据时,遇到以下错误。

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:244)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
20/07/14 19:40:59 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:268)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:244)
	... 3 more
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
20/07/14 19:41:07 ERROR kafka.KafkaSink: Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
	at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:244)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:745)

在这里插入图片描述
2. 分析以上错误信息:kafkasink无法传递事件,首先检查自己flume配置是否有误。

# Name the components on this agent
ibike_accessLog.sources = r1
ibike_accessLog.sinks = k1 kafkasink
ibike_accessLog.channels = c1 c2

# Describe/configure the source
ibike_accessLog.sources.r1.type = spooldir
ibike_accessLog.sources.r1.spoolDir = /opt/ibike/log/accessLog
ibike_accessLog.sources.r1.fileHeader = true
ibike_accessLog.sources.r1.interceptors = i1 i2
ibike_accessLog.sources.r1.interceptors.i1.type = com.yc.flume.AccessLogInterceptor$Builder
ibike_accessLog.sources.r1.interceptors.i2.type = host

ibike_accessLog.sources.r1.charset=UTF-8
ibike_accessLog.sources.r1.deserializer=LINE
ibike_accessLog.sources.r1.deserializer.maxLineLength=2048
ibike_accessLog.sources.r1.deserializer.outputCharset=UTF-8


# Describe the sink
ibike_accessLog.sinks.k1.type = hdfs
ibike_accessLog.sinks.k1.hdfs.path = hdfs://node1:8020/ibike/log/visitLog
ibike_accessLog.sinks.k1.hdfs.writeFormat = Text
ibike_accessLog.sinks.k1.hdfs.fileType = DataStream
ibike_accessLog.sinks.k1.hdfs.rollInterval = 0
ibike_accessLog.sinks.k1.hdfs.rollSize = 0
ibike_accessLog.sinks.k1.hdfs.rollCount = 0
ibike_accessLog.sinks.k1.hdfs.callTimeout =100000
ibike_accessLog.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
ibike_accessLog.sinks.k1.hdfs.useLocalTimeStamp = true
ibike_accessLog.sinks.k1.hdfs.closeTries = 5


ibike_accessLog.sinks.kafkasink.type=org.apache.flume.sink.kafka.KafkaSink
ibike_accessLog.sinks.kafkasink.kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092
ibike_accessLog.sinks.kafkasink.kafka.topic=accesslog
ibike_accessLog.sinks.kafkasink.kafka.flumeBatchSize= 20
ibike_accessLog.sinks.kafkasink.kafka.producer.acks= 1
ibike_accessLog.sinks.kafkasink.kafka.producer.linger.ms= 1
ibike_accessLog.sinks.kafkasink.kafka.producer.compression.type = snappy

# Use a channel which buffers events in memory
ibike_accessLog.channels.c1.type = memory
ibike_accessLog.channels.c1.capacity = 1000
ibike_accessLog.channels.c1.transactionCapacity = 100

ibike_accessLog.channels.c2.type = memory
ibike_accessLog.channels.c2.capacity = 1000
ibike_accessLog.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
ibike_accessLog.sources.r1.channels = c1 c2
ibike_accessLog.sinks.k1.channel = c1
ibike_accessLog.sinks.kafkasink.channel = c2

3. 由于博主的sink输出存在于kakfa及hfds上,配置检查无误后观察kafka消费者及hdfs上是否有数据
在这里插入图片描述
在这里插入图片描述

4. 检查后得知,flume子进程不挂,部分数据也正常写入kafka和hdfs,但是报错。

分析原因:报错原因提示是请求包含的消息大于服务器将接受的最大消息大小。这不是flume的错误,而是kafka的问题,kafka中,能接受的单条消息的大小是有限制的,默认是1M,由于现有日志中包含图片信息,远大于1M,所以提升kafka能接受的单条消息的大小程度。有两种方式,一种是修改某一个topic,一种是修改kafka的配置文件。

(1) 修改kafka的一个topic(提升至200M):kafka-topics.sh --create -zookeeper node1:2181,node2:2181,node3:2181 -replication-factor 1 --partitions 3 --topic accesslog --config max.message.bytes=209715200

(2) 修改kafka的配置文件:在kafka的server.properties配置上添加两个配置:

#broker能接收消息的最大字节数
message.max.bytes=209715200

#broker可复制的消息的最大字节数,该配置项必须不小message.max.bytes,因为该配置项是消费者从partition中获取消息放入内存中所用的内存大小,
#如果小于message.max.bytes,可能会导致给消费者分配的内存放不下一个message
replica.fetch.max.bytes=209715200

(3) 修改完毕后重启启动kafka,问题就解决了

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/13716.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!