安装与配置
Flume 安装与配置
建议直接下载
Component Interface | Type Alias | Implementation Class |
---|---|---|
*.Channel | memory | *.channel.MemoryChannel |
*.Channel | jdbc | *.channel.jdbc.JdbcChannel |
*.Channel | file | *.channel.file.FileChannel |
*.Channel | – | *.channel.PseudoTxnMemoryChannel |
*.Channel | – | org.example.MyChannel |
*.Source | avro | *.source.AvroSource |
*.Source | netcat | *.source.NetcatSource |
*.Source | seq | *.source.SequenceGeneratorSource |
*.Source | exec | *.source.ExecSource |
*.Source | syslogtcp | *.source.SyslogTcpSource |
*.Source | multiport_syslogtcp | *.source.MultiportSyslogTCPSource |
*.Source | syslogudp | *.source.SyslogUDPSource |
*.Source | spooldir | *.source.SpoolDirectorySource |
*.Source | http | *.source.http.HTTPSource |
*.Source | thrift | *.source.ThriftSource |
*.Source | jms | *.source.jms.JMSSource |
*.Source | – | *.source.avroLegacy.AvroLegacySource |
*.Source | – | *.source.thriftLegacy.ThriftLegacySource |
*.Source | – | org.example.MySource |
*.Sink | null | *.sink.NullSink |
*.Sink | logger | *.sink.LoggerSink |
*.Sink | avro | *.sink.AvroSink |
*.Sink | hdfs | *.sink.hdfs.HDFSEventSink |
*.Sink | hbase | *.sink.hbase.HBaseSink |
*.Sink | asynchbase | *.sink.hbase.AsyncHBaseSink |
*.Sink | elasticsearch | *.sink.elasticsearch.ElasticSearchSink |
*.Sink | file_roll | *.sink.RollingFileSink |
*.Sink | irc | *.sink.irc.IRCSink |
*.Sink | thrift | *.sink.ThriftSink |
*.Sink | – | org.example.MySink |
*.ChannelSelector | replicating | *.channel.ReplicatingChannelSelector |
*.ChannelSelector | multiplexing | *.channel.MultiplexingChannelSelector |
*.ChannelSelector | – | org.example.MyChannelSelector |
*.SinkProcessor | default | *.sink.DefaultSinkProcessor |
*.SinkProcessor | failover | *.sink.FailoverSinkProcessor |
*.SinkProcessor | load_balance | *.sink.LoadBalancingSinkProcessor |
*.SinkProcessor | – | |
*.interceptor.Interceptor | timestamp | *.interceptor.TimestampInterceptor$Builder |
*.interceptor.Interceptor | host | *.interceptor.HostInterceptor$Builder |
*.interceptor.Interceptor | static | *.interceptor.StaticInterceptor$Builder |
*.interceptor.Interceptor | regex_filter | *.interceptor.RegexFilteringInterceptor$Builder |
*.interceptor.Interceptor | regex_extractor | *.interceptor.RegexFilteringInterceptor$Builder |
*.channel.file.encryption.KeyProvider$Builder | jceksfile | *.channel.file.encryption.JCEFileKeyProvider |
*.channel.file.encryption.KeyProvider$Builder | – | org.example.MyKeyProvider |
*.channel.file.encryption.CipherProvider | aesctrnopadding | *.channel.file.encryption.AESCTRNoPaddingProvider |
*.channel.file.encryption.CipherProvider | – | org.example.MyCipherProvider |
*.serialization.EventSerializer$Builder | text | *.serialization.BodyTextEventSerializer$Builder |
*.serialization.EventSerializer$Builder | avro_event | *.serialization.FlumeEventAvroEventSerializer$Builder |
*.serialization.EventSerializer$Builder | – | org.example.MyEventSerializer$Builder |
Avro Source+Memory Channel+Logger Sink
使用
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger
# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.channels.ch1.capacity = 1000
agent1.sources = avro-source1
agent1.sinks = log-sink1
首先,启动
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1
然后,启动
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
Avro Source+Memory Channel+HDFS Sink
配置
# Define a source, channel, sink
agent1.sources = avro-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink
# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_file
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.rollInterval = 0
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT
首先,启动
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1
然后,启动
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
可以查看同步到
hdfs dfs -ls /data/flume
结果示例,如下所示:
-rw-r--r-- 3 shirdrn supergroup 1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log
-rw-r--r-- 3 shirdrn supergroup 1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log
-rw-r--r-- 3 shirdrn supergroup 259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log
Spooling Directory Source+Memory Channel+HDFS Sink
配置
# Define source, channel, sink
agent1.sources = spool-source1
agent1.channels = ch1
agent1.sinks = hdfs-sink1
# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000
# Define and configure an Spool directory source
agent1.sources.spool-source1.channels = ch1
agent1.sources.spool-source1.type = spooldir
agent1.sources.spool-source1.spoolDir = /home/shirdrn/data/
agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?
agent1.sources.spool-source1.batchSize = 50
agent1.sources.spool-source1.inputCharset = UTF-8
# Define and configure a hdfs sink
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://h1:8020/data/flume/
agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
agent1.sinks.hdfs-sink1.fileType = SequenceFile
agent1.sinks.hdfs-sink1.writeFormat = TEXT
agent1.sinks.hdfs-sink1.rollInterval = 0
启动
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1
可以查看
hdfs dfs -ls /data/flume
结果示例,如下所示:
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log
-rw-r--r-- 3 shirdrn supergroup 1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log
Exec Source+Memory Channel+File Roll Sink
配置
# Define source, channel, sink
agent1.sources = tail-source1
agent1.channels = ch1
agent1.sinks = file-sink1
# Configure channel
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 500000
# Define and configure an Exec source
agent1.sources.tail-source1.channels = ch1
agent1.sources.tail-source1.type = exec
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
agent1.sources.tail-source1.shell = /bin/sh -c
agent1.sources.tail-source1.batchSize = 50
# Define and configure a File roll sink
# and connect it to the other end of the same channel.
agent1.sinks.file-sink1.channel = ch1
agent1.sinks.file-sink1.type = file_roll
agent1.sinks.file-sink1.batchSize = 100
agent1.sinks.file-sink1.serializer = TEXT
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data
启动
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1
可以查看
-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
-rw-rw-r-- 1 shirdrn shirdrn 0 Sep 17 11:37 1410924990039-3
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5
Architecture Overview: 架构概览
主要有一下几个核心概念:
- Event:一个数据单元,带有一个可选的消息头
- Flow:
Event 从源点到达目的点的迁移的抽象 - Client:操作位于源点处的
Event ,将其发送到Flume Agent - Agent:一个独立的
Flume 进程,包含组件Source 、Channel、Sink - Source:用来消费传递到该组件的
Event - Channel:中转
Event 的一个临时存储,保存有Source 组件传递过来的Event - Sink:从
Channel 中读取并移除Event ,将Event 传递到Flow Pipeline 中的下一个Agent( 如果有的话)
外部系统产生日志,直接通过
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ...
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
尖括号里面的,我们可以根据实际需求或业务来修改名称。下面详细说明:
<Agent>
表示配置一个Agent 的名称,一个Agent 肯定有一个名称。<Source1> <Source2>
是Agent 的Source 组件的名称,消费传递过来的Event 。<Channel1> <Channel2>
是Agent 的Channel 组件的名称。<Sink1> <Sink2>
是Agent 的Sink 组件的名称,从Channel 中消费( 移除)Event 。
上面配置内容中,第一组中配置
Flow Pipeline
多个Agent 顺序连接
可以将多个
多个Agent 的数据汇聚到同一个Agent
这种情况应用的场景比较多,比如要收集
多路(Multiplexing ) Agent
。
Replication
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating
上面指定了
Multiplexing
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...
<Agent>.sources.<Source1>.selector.default = <Channel2>
上面
Load Balance: 负载均衡
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000