博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Setting Up Flume High Availability
阅读量:6190 次
发布时间:2019-06-21

本文共 8769 字,大约阅读时间需要 29 分钟。

Flume是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中。轻量,配置简单,适用于各种日志收集,并支持 Failover和负载均衡。并且它拥有非常丰富的组件。Flume采用的是三层架构:Agent层,Collector层和Store层,每一层均可水平拓展。其中Agent包含Source,Channel和 Sink,三者组建了一个Agent。三者的职责如下所示:

  • Source:用来消费(收集)数据源到Channel组件中
  • Channel:中转临时存储,保存所有Source组件信息
  • Sink:从Channel中读取,读取成功后会删除Channel中的信息
    Flume单节点模式架构图(官方图)如下图所示:
    Setting Up Flume High Availability
    图中表示,从外部系统(Web Server)中收集产生的日志,然后通过Flume的Agent的Source组件将数据发送到临时存储Channel组件,最后传递给Sink组件,Sink组件直接把数据存储到HDFS文件系统中。
    本文使用的flume版本为目前最新版1.8,分别介绍了单节点配置以及集群模式配置。在配置之前,已经配置好了Hadoop集群。

    一、单节点模式

    1.1 下载安装

    [hadoop@hdp01 ~]$ http://mirrors.hust.edu.cn/apache/flume/stable/apache-flume-1.8.0-bin.tar.gz[hadoop@hdp01 ~]$ tar -xzf apache-flume-1.8.0-bin.tar.gz;mv apache-flume-1.8.0-bin /u01/flume

    1.2 设置环境变量

    [hadoop@hdp01 ~]$ vi .bash_profileexport FLUME_HOME=/u01/flumeexport PATH=$PATH:$FLUME_HOME/bin

    1.3 创建Flume配置文件

    [hadoop@hdp01 ~]$ vi /u01/flume/conf/flume-hdfs.conf#Agent Namea1.sources = so1a1.sinks = si1a1.channels = ch1#Setting Source so1a1.sources.so1.type = spooldira1.sources.so1.spoolDir = /u01/flume/loghdfsa1.sources.so1.channels = ch1a1.sources.so1.fileHeader = falsea1.sources.so1.interceptors = i1a1.sources.so1.interceptors.i1.type = timestampa1.sources.so1.ignorePattern = ^(.)*\\.tmp$#Setting Sink With HDFSa1.sinks.si1.channel = ch1a1.sinks.si1.type = hdfsa1.sinks.si1.hdfs.path = hdfs://NNcluster/flume/inputa1.sinks.si1.hdfs.fileType = DataStreama1.sinks.si1.hdfs.writeFormat = Texta1.sinks.si1.hdfs.rollInternal = 1a1.sinks.si1.hdfs.filePrefix = %Y-%m-%da1.sinks.si1.hdfs.fileSuffix= .txt  #Binding Source and Sink to Channela1.channels.ch1.type = filea1.channels.ch1.checkpointDir = /u01/flume/loghdfs/pointa1.channels.ch1.dataDirs = /u01/flume/loghdfs[hadoop@hdp01 ~]$ cp /u01/flume/conf/flume-env.sh.template /u01/flume/conf/flume-env.sh[hadoop@hdp01 ~]$ vi /u01/flume/conf/flume-env.shexport JAVA_HOME=/usr/java/jdk1.8.0_152--创建相关目录[hadoop@hdp01 ~]$ mkdir -p /u01/flume/loghdfs/point--链接hadoop配置文件☞/u01/flume/conf现有的Hadoop环境配置了NameNode高可用,必须链接相关配置,否则flume不知道往哪存数据。[hadoop@hdp01 ~]$ ln -s /u01/hadoop/etc/hadoop/core-site.xml /u01/flume/conf/core-site.xml[hadoop@hdp01 ~]$ ln -s /u01/hadoop/etc/hadoop/hdfs-site.xml /u01/flume/conf/hdfs-site.xml

    到此,单节点模式就配置完成。

    1.4 启动flume服务

    [hadoop@hdp01 ~]$ flume-ng agent --conf conf --conf-file /u01/flume/conf/flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,console > /u01/flume/logs/flume-hdfs.log 2>&1 &

    注意:命令中的a1表示配置文件中的Agent的Name;而flume的配置文件必须使用绝对路径。

    1.5 效果测试
    在/u01/flume/loghdfs下面,随便创建一个文件,并写入数据,结果如下图:
    Setting Up Flume High Availability

    二、Flume集群模式

    Flume集群模式的架构图(官方图)如下图所示:

    Setting Up Flume High Availability
    图中,Flume的存储可以支持多种,这里只列举了HDFS和Kafka(如:存储最新的一周日志,并给Storm系统提供实时日志流)。这里以Oracle的alert日志为例。环境如下表所示:
    Setting Up Flume High Availability
    表中的RAC两个节点的alert日志通过Collector1和Collector2存入HDFS。另外Flume本身提供了Failover机制,可以自动切换和恢复。
    2.1 RAC节点安装Flume

    [Oracle@ebsdb1 ~]$ http://mirrors.hust.edu.cn/apache/flume/stable/apache-flume-1.8.0-bin.tar.gz[Oracle@ebsdb1 ~]$ tar -xzf apache-flume-1.8.0-bin.tar.gz;mv apache-flume-1.8.0-bin /u01/app/oracle/flume

    RAC的其他节点也类似安装

    2.2 配置RAC节点的Agent
    2.2.1 配置ebsdb1的agent

    [oracle@ebsdb1 ~]$ vi /u01/flume/conf/flume-client.properties #agent nameagent1.channels = c1agent1.sources = r1agent1.sinks = k1 k2#set gruopagent1.sinkgroups = g1#Setting Channelagent1.channels.c1.type = memoryagent1.channels.c1.capacity = 100000agent1.channels.c1.transactionCapacity = 100#Just For Fllowing Error Messgaes#Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tightagent1.channels.c1.byteCapacityBufferPercentage=20agent1.channels.c1.byteCapacity=800000agent1.channels.c1.keep-alive = 60#Setting Sourcesagent1.sources.r1.channels = c1agent1.sources.r1.type = execagent1.sources.r1.command = tail -F /u01/app/oracle/diag/rdbms/prod/prod1/trace/alert_prod1.logagent1.sources.r1.interceptors = i1 i2agent1.sources.r1.interceptors.i1.type = staticagent1.sources.r1.interceptors.i1.key = Typeagent1.sources.r1.interceptors.i1.value = LOGINagent1.sources.r1.interceptors.i2.type = timestamp# Setting Sink1agent1.sinks.k1.channel = c1agent1.sinks.k1.type = avroagent1.sinks.k1.hostname = hdp01agent1.sinks.k1.port = 52020# Setting Sink2agent1.sinks.k2.channel = c1agent1.sinks.k2.type = avroagent1.sinks.k2.hostname = hdp02agent1.sinks.k2.port = 52020#Seting Sink Groupagent1.sinkgroups.g1.sinks = k1 k2#Setting Failoveragent1.sinkgroups.g1.processor.type = failoveragent1.sinkgroups.g1.processor.priority.k1 = 10agent1.sinkgroups.g1.processor.priority.k2 = 1agent1.sinkgroups.g1.processor.maxpenalty = 10000

    2.2.2 配置ebsdb2的agent

    [oracle@ebsdb2 ~]$ vi /u01/flume/conf/flume-client.properties #Setting Agent Nameagent1.channels = c1agent1.sources = r1agent1.sinks = k1 k2#Setting Gruopagent1.sinkgroups = g1#Setting Channelagent1.channels.c1.type = memoryagent1.channels.c1.capacity = 100000agent1.channels.c1.transactionCapacity = 100#Just For Fllowing Error Messgaes#Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight#agent1.channels.c1.byteCapacityBufferPercentage=20agent1.channels.c1.byteCapacity=800000agent1.channels.c1.keep-alive = 60#Seting Sourcesagent1.sources.r1.channels = c1agent1.sources.r1.type = execagent1.sources.r1.command = tail -F /u01/app/oracle/diag/rdbms/prod/prod2/trace/alert_prod2.logagent1.sources.r1.interceptors = i1 i2agent1.sources.r1.interceptors.i1.type = staticagent1.sources.r1.interceptors.i1.key = Typeagent1.sources.r1.interceptors.i1.value = LOGINagent1.sources.r1.interceptors.i2.type = timestamp#Settinf Sink1agent1.sinks.k1.channel = c1agent1.sinks.k1.type = avroagent1.sinks.k1.hostname = hdp01agent1.sinks.k1.port = 52020# Setting Sink2agent1.sinks.k2.channel = c1agent1.sinks.k2.type = avroagent1.sinks.k2.hostname = hdp02agent1.sinks.k2.port = 52020#Setting Sink Groupagent1.sinkgroups.g1.sinks = k1 k2#Set Failoveragent1.sinkgroups.g1.processor.type = failoveragent1.sinkgroups.g1.processor.priority.k1 = 10agent1.sinkgroups.g1.processor.priority.k2 = 1agent1.sinkgroups.g1.processor.maxpenalty = 10000

    2.3 配置Flume的Collector

    2.3.1 hdp01的collector配置

    [hadoop@hdp01 conf]$ vi flume-server.properties#Setting Agent Namea1.sources = r1a1.channels = c1a1.sinks = k1#Setting Channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100#Setting Sourcesa1.sources.r1.type = avroa1.sources.r1.bind = hdp01a1.sources.r1.port = 52020a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = Collectora1.sources.r1.interceptors.i1.value = hdp01a1.sources.r1.channels = c1#Setting Sink To HDFSa1.sinks.k1.type=hdfsa1.sinks.k1.hdfs.path=hdfs://NNcluster/flume/Oracle/logsa1.sinks.k1.hdfs.fileType=DataStreama1.sinks.k1.hdfs.writeFormat=TEXTa1.sinks.k1.hdfs.rollInterval=1a1.sinks.k1.channel=c1a1.sinks.k1.hdfs.filePrefix=%Y-%m-%da1.sinks.k1.hdfs.fileSuffix=.txt

    2.3.2 hdp02的collector配置

    [hadoop@hdp02 conf]$ vi flume-server.properties #Setting Agent Namea1.sources = r1a1.channels = c1a1.sinks = k1#Setting Channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Seting Sourcesa1.sources.r1.type = avroa1.sources.r1.bind = hdp02a1.sources.r1.port = 52020a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = Collectora1.sources.r1.interceptors.i1.value = hdp02a1.sources.r1.channels = c1#Setting Sink To HDFSa1.sinks.k1.type=hdfsa1.sinks.k1.hdfs.path=hdfs://NNcluster/flume/Oracle/logsa1.sinks.k1.hdfs.fileType=DataStreama1.sinks.k1.hdfs.writeFormat=TEXTa1.sinks.k1.hdfs.rollInterval=1a1.sinks.k1.channel=c1a1.sinks.k1.hdfs.filePrefix=%Y-%m-%da1.sinks.k1.hdfs.fileSuffix=.txt

    2.4 Flume集群服务启动

    2.4.1 启动Flume的Collector

    [hadoop@hdp01 conf]$ flume-ng agent --conf conf --conf-file /u01/flume/conf/flume-server.properties --name a1 -Dflume.root.logger=INFO,console > /u01/flume/logs/flume-server.log 2>&1 &[hadoop@hdp02 conf]$ flume-ng agent --conf conf --conf-file /u01/flume/conf/flume-server.properties --name a1 -Dflume.root.logger=INFO,console > /u01/flume/logs/flume-server.log 2>&1 &

    启动后,可查看flume的日志文件,内容如下图:

    Setting Up Flume High Availability
    2.4.2 启动Flume的Agent

    [oracle@ebsdb1 bin]$ ./flume-ng agent --conf conf --conf-file /u01/app/oracle/flume/conf/flume-client.properties --name agent1 -Dflume.root.logger=INFO,console > /u01/app/oracle/flume/logs/flume-client.log 2>&1 &  [oracle@ebsdb2 bin]$ ./flume-ng agent --conf conf --conf-file /u01/app/oracle/flume/conf/flume-client.properties --name agent1 -Dflume.root.logger=INFO,console > /u01/app/oracle/flume/logs/flume-client.log 2>&1 &

    待agent启动完毕后,观察collecter日志,就会发现agent已成功连接到collector,如下图:

    Setting Up Flume High Availability
    2.5 Flume高可用测试
    由于collector1配置的权重大于collector2,所以 Collector1优先采集并上传到存储系统。这里假如kill掉collector1,由Collector2负责日志的采集上传工作,看是否上传成功。
    Setting Up Flume High Availability
    Setting Up Flume High Availability
    接着恢复Collector1节点的Flume服务,再次在Agent1上传文件,发现Collector1恢复优先级别的采集工作。
    参考文献:
    1、

转载地址:http://vjeda.baihongyu.com/

你可能感兴趣的文章
Linux 多学习过程
查看>>
★Kali信息收集~3.子域名系列
查看>>
深入浅出Java并发包—原子类操作
查看>>
【Python】Django RestFramework资料
查看>>
W5500问题集锦(二)
查看>>
Solr和IK分词器的整合
查看>>
深入理解JavaScript系列(33):设计模式之策略模式
查看>>
Netty学习链接
查看>>
【架构】Nginx如何设置X-Request-ID请求头,记录请求时间:毫秒?
查看>>
Android开发-自动更新
查看>>
[摘录]代码优化规则
查看>>
软件设计的复杂度
查看>>
git使用时遭遇the authenticity of host can't be established
查看>>
EF和MVC系列文章导航:EF Code First、DbContext、MVC
查看>>
二叉树
查看>>
Haproxy的安装和配置示例
查看>>
android视图切换动画:ViewAnimator类及其子类
查看>>
国内公共DNS
查看>>
一颗可靠的时间胶囊:苹果AirPort Time Capsule测评
查看>>
Windows平台分布式网站系统应用(转)
查看>>