Flume-1.8.0的安装与配置

勤奋不是嘴上说说而已,而是实际的行动,在勤奋的苦度中持之以恒,永不退却。业精于勤,荒于嬉;行成于思,毁于随。在人生的仕途上,我们毫不迟疑地选择勤奋,她是几乎于世界上一切成就的催产婆。只要我们拥着勤奋去思考,拥着勤奋的手去耕耘,用抱勤奋的心去对待工作,浪迹红尘而坚韧不拔,那么,我们的生命就会绽放火花,让人生的时光更加的闪亮而精彩。

导读:本篇文章讲解 Flume-1.8.0的安装与配置,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

安装

下载

http://flume.apache.org/download.html
http://archive.apache.org/dist/flume/1.8.0/

解压

tar -xzvf apache-flume-1.8.0-bin.tar.gz
建立一个软连接
ln -s apache-flume-1.8.0-bin flume-1.8.0

设置环境变量

[root@single ~]# echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/home/hadoop/hadoop-2.7.3/bin:/home/hadoop/hadoop-2.7.3/sbin:/home/hadoop/hbase-1.2.6/bin:/home/hadoop/zookeeper-3.4.6/bin:/root/bin:/home/hadoop/hadoop-2.7.3/bin:/home/hadoop/hadoop-2.7.3/sbin:/home/hadoop/hbase-1.2.6/bin:/home/hadoop/zookeeper-3.4.6/bin:/home/hadoop/flume-1.8.0/bin
[root@single ~]# 
[root@single ~]# 
[root@single ~]# flume-ng version
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
From source with checksum fbb44c8c8fb63a49be0a59e27316833d

配置与运行

(1)在配置文件中描述Source,Channel与Sink的具体实现。
(2)运行一个Agent实例,在运行实例的过程中会读取配置文件的内容,这样Flume就会采集到数据。

  1. 在$FLUME_HOME/conf下创建my.conf
touch my.conf
  1. 从整体上描述Agent中的Sources, Sinks,Channels
[root@single conf]# cat my.conf 
a1.sources=s1
a1.sinks=k1
a1.channels=c1

  1. 指定source, sink, channel的属性特征
[root@single conf]# cat my.conf 
#指定Agent的组件名称
a1.sources=s1
a1.sinks=k1
a1.channels=c1
#指定source的类型为spoolDir,要监听的路径为/home/hadoop/tmp
a1.sources.s1.type=spoolDir
a1.sources.s1.spoolDir=/home/hadoop/tmp

#指定sink的类型为logger
a1.sinks.k1.type = logger

#指定channel为内存通道,通道的最大容量为1000,单事务一次读写channel的事件最多为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#通过通道将c1将源s1和槽k1连起来
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1

  1. 启动
flume-ng agent --conf conf --conf-file my.conf --name a1 -DFlume.root.logger=INFO,console

输出为

20/09/21 15:19:57 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/09/21 15:19:58 INFO node.Application: Starting Sink k1
20/09/21 15:19:58 INFO node.Application: Starting Source s1
20/09/21 15:19:58 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/hadoop/tmp
20/09/21 15:19:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
20/09/21 15:19:58 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
  1. 写入日志
    打开一个新的terminal
[root@single tmp]# echo Hello World! > test.log
[root@single tmp]# pwd
/home/hadoop/tmp

查看你启动flume的terminal

20/09/21 15:19:58 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
20/09/21 15:21:36 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
20/09/21 15:21:36 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/tmp/test.log to /home/hadoop/tmp/test.log.COMPLETED
20/09/21 15:21:40 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21             Hello World! }

  1. 遇到问题

问题1

org.apache.flume.conf.ConfigurationException: No channel configured for sink: k1
	at org.apache.flume.conf.sink.SinkConfiguration.configure(SinkConfiguration.java:52)
	at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:680)
	at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:347)
	at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212)
	at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126)
	at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:108)
	at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:194)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:93)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Fixed

a1.sinks.k1.channels=c1
修改为
a1.sinks.k1.channel=c1

问题2

java.lang.IllegalStateException: Configuration must specify a spooling directory
	at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
	at org.apache.flume.source.SpoolDirectorySource.configure(SpoolDirectorySource.java:146)
	at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
	at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:326)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Fixed

#指定source的类型为spooldir,要监听的路径为/home/hadoop/tmp
a1.sources.s1.type=spooldir
a1.sources.s1.spooldir=/home/hadoop/tmp
修改为
#指定source的类型为spoolDir,要监听的路径为/home/hadoop/tmp
a1.sources.s1.type=spoolDir
a1.sources.s1.spoolDir=/home/hadoop/tmp

错误三

java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /home/data/test2.log.COMPLETED
	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463)
	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414)
	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326)
	at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

解决方案

  1. 更改flume的配置文件conf的目录
/home/hadoop/tmp 
修改为
/home/data
  1. 在当前flume配置下,重新定义文件
echo Hello World! > test.log
修改为
echo Hello World! > test1.log

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/140810.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!