安装
下载
http://flume.apache.org/download.html
http://archive.apache.org/dist/flume/1.8.0/
解压
tar -xzvf apache-flume-1.8.0-bin.tar.gz
建立一个软连接
ln -s apache-flume-1.8.0-bin flume-1.8.0
设置环境变量
[root@single ~]# echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/home/hadoop/hadoop-2.7.3/bin:/home/hadoop/hadoop-2.7.3/sbin:/home/hadoop/hbase-1.2.6/bin:/home/hadoop/zookeeper-3.4.6/bin:/root/bin:/home/hadoop/hadoop-2.7.3/bin:/home/hadoop/hadoop-2.7.3/sbin:/home/hadoop/hbase-1.2.6/bin:/home/hadoop/zookeeper-3.4.6/bin:/home/hadoop/flume-1.8.0/bin
[root@single ~]#
[root@single ~]#
[root@single ~]# flume-ng version
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f
Compiled by denes on Fri Sep 15 14:58:00 CEST 2017
From source with checksum fbb44c8c8fb63a49be0a59e27316833d
配置与运行
(1)在配置文件中描述Source,Channel与Sink的具体实现。
(2)运行一个Agent实例,在运行实例的过程中会读取配置文件的内容,这样Flume就会采集到数据。
- 在$FLUME_HOME/conf下创建my.conf
touch my.conf
- 从整体上描述Agent中的Sources, Sinks,Channels
[root@single conf]# cat my.conf
a1.sources=s1
a1.sinks=k1
a1.channels=c1
- 指定source, sink, channel的属性特征
[root@single conf]# cat my.conf
#指定Agent的组件名称
a1.sources=s1
a1.sinks=k1
a1.channels=c1
#指定source的类型为spoolDir,要监听的路径为/home/hadoop/tmp
a1.sources.s1.type=spoolDir
a1.sources.s1.spoolDir=/home/hadoop/tmp
#指定sink的类型为logger
a1.sinks.k1.type = logger
#指定channel为内存通道,通道的最大容量为1000,单事务一次读写channel的事件最多为100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#通过通道将c1将源s1和槽k1连起来
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
- 启动
flume-ng agent --conf conf --conf-file my.conf --name a1 -DFlume.root.logger=INFO,console
输出为
20/09/21 15:19:57 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/09/21 15:19:58 INFO node.Application: Starting Sink k1
20/09/21 15:19:58 INFO node.Application: Starting Source s1
20/09/21 15:19:58 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/hadoop/tmp
20/09/21 15:19:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
20/09/21 15:19:58 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
- 写入日志
打开一个新的terminal
[root@single tmp]# echo Hello World! > test.log
[root@single tmp]# pwd
/home/hadoop/tmp
查看你启动flume的terminal
20/09/21 15:19:58 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
20/09/21 15:21:36 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
20/09/21 15:21:36 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/tmp/test.log to /home/hadoop/tmp/test.log.COMPLETED
20/09/21 15:21:40 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 Hello World! }
- 遇到问题
问题1
org.apache.flume.conf.ConfigurationException: No channel configured for sink: k1
at org.apache.flume.conf.sink.SinkConfiguration.configure(SinkConfiguration.java:52)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:680)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:347)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:108)
at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:194)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:93)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Fixed
a1.sinks.k1.channels=c1
修改为
a1.sinks.k1.channel=c1
问题2
java.lang.IllegalStateException: Configuration must specify a spooling directory
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.source.SpoolDirectorySource.configure(SpoolDirectorySource.java:146)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:326)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Fixed
#指定source的类型为spooldir,要监听的路径为/home/hadoop/tmp
a1.sources.s1.type=spooldir
a1.sources.s1.spooldir=/home/hadoop/tmp
修改为
#指定source的类型为spoolDir,要监听的路径为/home/hadoop/tmp
a1.sources.s1.type=spoolDir
a1.sources.s1.spoolDir=/home/hadoop/tmp
错误三
java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /home/data/test2.log.COMPLETED
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
解决方案
- 更改flume的配置文件conf的目录
/home/hadoop/tmp
修改为
/home/data
- 在当前flume配置下,重新定义文件
echo Hello World! > test.log
修改为
echo Hello World! > test1.log
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/140810.html