第四章 Flume专题-日志采集工具

2023-11-19

一 Flume专题之组件及架构介绍

1、Flume概述

1.1、Flume定义

​ Flume是一种分布式的、高可靠的和高可用的服务,用于有效地收集、聚合和移动大量日志数据框架

img

  • Flume是一个简单灵活的基于流数据的体系结构。
1.2、Flume特性

(1)支持自定义Source

​ flume 支持在日志系统中定制各类数据发送方,用于收集数据。

(2)支出数据简单处理

​ flume支持对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

(3)事件基本数据单位

​ flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个) Channel 中。可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。

(4)高可靠性

​ 当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:

  • end-to-end:收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。
  • Store on failure:这也是scribe采用的策略,当数据接收方crash崩溃时,将数据写到本地,待恢复后,继续发送。
  • Best effort:数据发送到接收方后,不会进行确认
1.3、Flume使用场景

​ 实时监控读取服务器产生到本地磁盘的数据,将数据写入到HDFS、Kafka等下游处理流程中去。

img

  • 1、webserver产生日志;
  • 2、日志通过flume封装的Agent进行采集;
    • source:接受数据源
    • channel:进行数据缓存的管道;
    • sink:连接目的地
  • 3、将数据输出到HDFS/HBASE/KAFKA上。

2、Flume架构组成

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yFHTtXjw-1652500698450)img

2.1、Event

Event是Flume的数据传输最基本单位,flume在数据传输过程中是使用Event将数据封装起来进行传输的。

  • 本质:生产的数据,可以是日志记录、 avro 对象等,如果是文本文件通常是一行记录

  • 组成:Event由Header和Body,Header使用k-v存放的Event的属性信息;Body以字节数组形成存储该条信息。

Event: { headers:{} body: 68 61 64 6F 6F 70      hadoop }
2.2、Agent

Agent是数据传输(接受-缓存-发送)形式Agent是一个JVM进程flume 以 Agent 为最小的独立运行单位,包含最基本的三个组件:Source、Chanel、Sink。

2.3、Source

Source是负责接收数据到Flume Agent的组件,Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、spooling directory、netcat、tailder…以及Custom Source。

2.4、Channel

Channel是为了Source和Sink之间的缓冲区。Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理多个Source的写入操作、多个Sink的读取操作。

  • Flume Channel官方提供:memory、jdbc、kafka、file…以及Custom Channel。
2.5、Sink

​ Sink不断轮询Channel中的事件且批量移除它们,并将这些事件批量写入到存储或者索引系统、或者被发送到到另一个Flume Agent。

  • Sink组件目的地:hdfs、loggeer、avro、thrift、ipc、file、hbase、solr、自定义Custom Sink
2.6、Flume NG架构

在这里插入图片描述

(1)每个web server产生日志文件

(2)每一台web server所对应的节点上开启一个进程,分别对应Agent1,Agent2、Agent3;

​ Avro Sources将数据序列化,写入到channel,之后将数据送往Avro Sink,以Avro sink的形式将数据送往下游进行处理。

(3)下游的avro Source接受上游的Avro Sink的数据,在经过下游的Flume汇总到HDFS中。

3、Flume安装配置

3.1、下载
  • 下载链接如下
https://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
3.2、上传并解压
  • 上传到测试环境后解压
tar -xzvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/flume

在这里插入图片描述

3.3、配置环境变量
sudo vim /etc/profile.d/my_env.sh 
  • 添加以下配置内容
export FLUME_HOME=/opt/module/flume/apache-flume-1.9.0-bin
export PATH=$PATH:$FLUME_HOME/bin
  • 使配置文件生效
source /etc/profile.d/my_env.sh 
  • 验证版本
flume-ng version

在这里插入图片描述

3.4、删除冗余Jar包
rm /lib/guava-11.0.2.jar 

二 Flume专题之企业常用组件及案例演示

1、入门案例之打印端口数据

1.1、案例需求

使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

1.2、需求分析

(1)通过netcat工具向本机的4444端口发送数据

(2)Flume监控本机的4444端口通过Flume的Source端读取数据

(3)Flume将获取数据通过Sink端写到控制台

在这里插入图片描述

1.3、配置信息

(1)安装 netcat 工具

sudo yum install -y nc

(2)判断 44444 端口是否被占用

sudo netstat -nlp | grep 44444 

(3)在 flume 目录下创建 job 文件夹并进入 job 文件夹。

mkdir job

cd job/ 

(4)在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。

vim flume-netcat-logger.conf 

(6)在 flume-netcat-logger.conf 文件中添加如下内容。

添加内容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(6)开启Flume监听端口

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
  • 参数说明:
    • –conf/-c:表示配置文件存储在 conf/目录
    • –name/-n:表示给 agent 起名为 a1
    • –conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件。
    • -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error。

在这里插入图片描述

(7)使用netcat工具向本机的4444端口发送内容

nc localhost 44444

在这里插入图片描述

(8)Flume监听页面结果

在这里插入图片描述

2、入门案例之实时监控单个追加文件

2.1、案例需求

​ 实时监控Hive日志,并上传到HDFS中

2.2、需求分析

(1)创建符合条件的flume配置文件;

(2)执行配置文件,开启监控

(3)开启Hive,生成日志

(4)查看HDFS上数据

  • Hive实时更新日志路径:/opt/module/hive/logs/hive.log

在这里插入图片描述

2.3、实现步骤

(1)配置文件编写

​ 要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive 日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件。

vim flume-file-hdfs.conf

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

(2)HDFS创建Flume监控目录

hadoop fs -mkdir -p /flume

(3)运行Flume

 bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

在这里插入图片描述

(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志

bin/hive

在这里插入图片描述

(5)查看HDFS上结果

在这里插入图片描述

3、实时监控目录下多个新文件

3.1、案例需求

​ 使用Flume监听整个目录的文件,并上传至HDFS

3.2、需求分析

(1)创建符合条件的flume配置文件

(2)执行配置文件,开启监控

(3)向upload目录中添加文件

(4)查看HDFS上数据

(5)查看/opt/module/flume/upload目录上传的文件是否已经标记为.COMPLETED结尾;.tmp后缀结尾文件没有上传。

在这里插入图片描述

3.3、实现步骤

(1)创建配置文件

vim flume-dir-hdfs.conf

a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

(2)启动监控文件夹命令

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
  • 说明:在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文 件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动。

在这里插入图片描述

(3)创建目录并添加文件

mkdir upload

touch atguigu.txt
touch atguigu.tmp
touch atguigu.log

(4)查看HDFS上的数据

在这里插入图片描述

4、实时监控目录下的多个追加文件

  • 重点:关于Source类型
    • Exec source 适用于监控一个实时追加的文件,不能实现断点续传;
    • Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;
    • Taildir Source 适合用于监听多个实时追加的文件,并且能够实现断点续传。
4.1、案例需求

​ 使用Flume监听整个目录的实时追加文件,并上传至HDFS

4.2、需求分析

在这里插入图片描述

4.3、实现步骤

(1)创建配置文件

vim flume-taildir-hdfs.conf

a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = 
hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

(2)启动监控文件夹命令

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf

(3)向files文件夹中追加文件

mkdir files
cd files/

#向files文件夹中追加文件
echo hello >> file1.txt
echo atguigu >> file2.txt

(4)查看HDFS上的数据

在这里插入图片描述

第四章 Flume专题之进阶概念&生产环境案例

1、进阶基础概念

1.1、Flume流程分析

在这里插入图片描述

(1)Source接收数据后将数据封装成Event对象。

(2)Event数据对象经过Inteceptor链对数据进行改造

(3)根据Channel selector确定Event要发送给那个Channel

(4)Channel通过SinkProcessor分发给对应Sink

1.2、Flume的数据传递过程事务

在这里插入图片描述

(1)推送事务流程

  • doPut:把批数据写入到临时缓冲区 putList doCommit,检查Channel容量是否充足;
    • 容量充足:把putList里的数据发送到Channel doRollBack中;
    • 容量不足:把数据回滚到PutList中

(2)拉取事务流程

  • doTake:把数据读取到临时缓冲区takeList,检查数据是否发送成功。
    • 成功:把Event从takeList中移除
    • 失败:将TakeList中的数据回滚到Channel中

2、Flume拓扑结构

2.1、简单串联

在这里插入图片描述

​ 简单串联就是将多个Flume顺序串联起来,从最初的Source开始到最终的Sink传送的目的存储系统。

  • 注意事项:此模式不建议桥接过多的flume数量,flume数量过多不仅会影响传输效率,而且一旦传输过程中某个节点Flume宕机,会影响整个传输系统。
2.2、复制和多路复用

在这里插入图片描述

Flume支持将事件流向一个或多个目的地,即1对多模式。

  • 注意事项:这种模式可以将相同数据复制到多个Channel中,或者将不同数据分发到不同Channel中,sink可以选择传送不同目的地。
2.3、负载均衡和故障转移

在这里插入图片描述

(1)负载均衡

​ 将多个sink逻辑上分为一个sink组,sink组配合不同的SinkProcessor将数据相对均匀的分发到指定目录或者其他agent实例

a1.sinkgroups.g1.processor.type =load_balance
a1.sinkgroups.g1.processor.backoff=true

(2)故障转移

​ 有主备agent,主agent负责数据的采集、传输、落地,备用agent一直处于监听状态,一旦主agent宕机,备用agent启动,进行主agent的工作,直到主agent恢复。

2.4、聚合

在这里插入图片描述

​ 每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。

3、生产实战之多路复用(多channel)

3.1、案例需求

​ nginx产生的日志数据需要供多个部门使用,如何处理一份数据发送两个系统来使用?

3.2、需求分析

在这里插入图片描述

3.3、具体实现

(1)创建目录

#创建文件夹
mkdir group1
cd group1

(2)创建flume1配置文件

  • 作用:配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir
#创建文件
vim flume-file-flume.conf

#文件内容如下
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

(3)创建Flume2的配置文件

  • 作用:配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。
vim flume-flume-hdfs.conf

#文件内容如下
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)创建flume3的配置文件

  • 作用:配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。
  • 注意事项:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目 录。
vim flume-flume-dir.conf
 
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

(5)分别执行配置文件

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

(5)启动Hive后检查文件

  • HDFS上数据

在这里插入图片描述

  • 本地磁盘文件

在这里插入图片描述

4、生产实战之故障转移(多Sink)

4.1、案例需求

​ 使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移功能。

4.2、需求分析

在这里插入图片描述

4.3、具体实现

(1)创建目录

mkdir group2
cd group2

(2)创建flume1配置文件

  • 作用:配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给 flume-flume-console1 和 flume-flume-console2。
 vim flume-netcat-flume.conf

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

(3)创建Flume2配置文件

  • 作用:配置上级 Flume 输出的 Source,输出是到本地控制台。
vim flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(4)创建flume3配置文件

  • 作用:配置上级 Flume 输出的 Source,输出是到本地控制台。
vim flume-flume-console2.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

(5)分别执行相关配置文件

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

(6)使用netcat工具向本机的44444端口发送内容

nc localhost 44444

#端口被占用
#1.查看端口是否被占用
netstat -anp |grep [端口号]

#2.查看占用的进程
lsof -i:[端口号]

#3.关闭进程
kill -9 [进程PID]

(7)正常工作

  • 作用:在flume-console2正常工作时间,netcat内容只会发送到flume-console2中。

  • 发送内容
    在这里插入图片描述

  • 发送结果:flume-console2打印结果

在这里插入图片描述

  • 发送结果:flume-console1不打印结果

在这里插入图片描述

(8)故障转移

  • 作用:在flume-console2出现故障后,数据发送到flume-console1.
#杀掉flume-console2的进程
kill -9 pid
  • netcat发送内容

在这里插入图片描述

  • flume-console1的结果输出打印
    在这里插入图片描述

5、生产实战之聚合操作

5.1、案例需求

​ 需求将来自不同服务器的日志数据采集回来存储在HDFS聚集到一起进行处理

5.2、需求分析

在这里插入图片描述

5.3、具体实现

待后续

三 Flume专题之常见面试题

1、上游Flume进程采集到数据后,干嘛不直接送到目的地?

(1)上游的Flume进程数十分庞大,直接送往目的地,目的地系统可能承载不了高并发的访问压力宕机

(2)上游的Flume将采集完的数据送往下游Flume后,下游Flume进程将数据送往目的地;

(3)下游Flume数可人为控制,一般都有备份

2、Flume结构简述

(1)Source:默认有的Avro(监听端口),thrift、Exec(执行linux命令)、JMS、SpoolingDirctory(监听目录)、第三方插件kafka;

(2)拦截器:所有events,增加头,类似json‘格式里的headers:{“keys”:“values”}:时间戳(头部插入时间戳),主机(头部插入主机名和IP),静态(头部插入指定KV),正则过滤(留下符合条件的)

(3)Channel:包括Memory、File、JDBC、Kafka等类型;

(4)拦截器:自定义拦截器,同上

(5)Sink:包括HDFS,hive,Avro,Hbase,kafka等类型

3、Flume HA机制(高可用机制)

(1)负载均衡

​ flumeNG通过设置sinkgroups将多个沉潜节点分到一组,然后设置该组启用负载均衡,沉潜时会自动选择节点,如果节点宕机可选择其他节点:进程在后台运行时轮询依次查询,每个sink送出去的压力基本保持一致。

(2)事务机制

​ 基于事务传输event(批量传输),使用两个独立的事务分别处理source到channel和channel到sink之间,失败时会将所有数据都回滚到source或channel进行重试。

在这里插入图片描述

  • Put事务:source到channel之间
  • Take事务:channel到sink之间
4、Flume的数据丢失&重复场景

(1)数据丢失场景

  • 是Channel 采用 memoryChannel,agent 宕机导致数据 丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失

(2)数据重复

  • 数据已经由Sink发出,但是没有接收到响应,Sink会再次发送数据,导致数据重复

​ 总的来说,Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出, 但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复

5、Flume的channel选择
  • 注意事项:Channel被设计为event中转临时缓冲区,存储source收集并且没有被sink读取的Event

(1)memory:读写速度快,但是存储数据量小,Flume进程挂掉、服务器宕机或者重启都会导致数据丢失;

(2)File:落地到磁盘,如果sink已经提交完成的事务,则可以删除file;

(3)Kafka

  • 日志收集层:只配置Source组件和kafka组件,不需要再配置Sink组件

    • ①减少了日志手机层启动的进程数
    • ②有效降低服务器内存、磁盘等资源的使用率
  • 日志汇聚层:只配置kafkachannel和sink,不需要再配置Source

6、Flume拦截器简述

(1)作用

  • Source将event写入到channel之前可以使用拦截器对event进行各种形式的处理
  • source和channel之间可以有多个拦截器,可以用不同规则进行定制拦截器

(2)拦截器类型:ETL拦截器、区分类型拦截器

(3)自定义拦截器步骤

  • Ⅰ、实现接口intercept
  • Ⅱ、重写四个方法(初始化,处理单个event,处理多个event-调用处理单个event方法,close方法资源释放-flume都会处于运行状态)
  • Ⅲ、实现静态内部类builder,定制相关参数
  • Ⅳ、将自定义拦截器打包,上传到flume的lib目录下
  • Ⅴ、修改flume的核心配置文件
7、Flume故障方案

在这里插入图片描述

​ 负载均衡和故障转移方案:配置sink组,同一个人sink组内有多个子sink,不同sink之间可以配置成负载均衡或故障转移

8、Flume优化

(1)Flume内存配置为4G:实际开发中,再flume-env.sh中设置JVMheap为4G或更高

(2)FileChannel优化:DataDirs指向多个路径,每个路径对应不同硬盘,增大Flume的吞吐量:Checkpoint和backCheckpointDir也尽量配置在不同硬盘对应目录,保证checkpoint坏掉后,可快速恢复

(3)Sink优化:HDFSSink小文件处理

  • 避免HDFS产生大量小文件,设置相关参数达到效果如下
    • ①tmp文件达到128M时会滚动生成正式文件;
    • ②tmp文件创建超10秒会滚动生成文件。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

第四章 Flume专题-日志采集工具 的相关文章

  • 【报告分享】2021年元宇宙发展报告-数据观(附下载)

    摘要 2021年被公认为是 元宇宙元年 这距被称为 虚拟现实元年 的2016年已过去5年之久 业界将元宇宙视为新增长点和下一个具有战略意义的竞争领域 将引发全球科技产业的新一轮洗牌 学术界和产业界普遍认为 元宇宙本身不是一种技术 而是整合多
  • Spark(七)——累加器和广播变量

    5 累加器 通过在驱动器中调用SparkContext accumulator initialValue 方法 创建出存有初始值的累加器 返回值为org apache spark Accumulator T 对象 其中 T 是初始值 ini
  • 工业母机扶持政策汇总来了,国家-广东省-深圳市

    工业母机是指金属加工机床 简单来说 工业母机是制造机器的机器 它体现了国家综合实力的重要基础性产业 代表了工业发展水平 目前 我国的机床相对来说还不够成熟 很多技术以及零件设备都是从国外引进 缺乏自主产权 国家和地方政府出台各项政策鼓励 工
  • 用Excel做简单的数据分析

    一 使用的数据 1 做月销量的柱状图 按住Ctrl可跨区选中 1 选中数据 2 选择柱状图 一般柱状图都使用2D 3 为图加上数据标签 更加直观 2 以同样的方式做饼图 1 如图 一般饼图都是做3D的 3 求月销售额 单价 X 月销量 1
  • Easy-Es核心功能深度介绍

    背景 近期随着项目开源后热度的不断上涨 越来越多小伙伴开始对框架核心功能感兴趣 今天就让我带大家深入源码和架构 一起探索Easy Es 简称EE 的核心功能是如何被设计和实现的 和众多ORM框架一样 EE最为核心的功能就是CRUD 增删改查
  • 大数据开发教程——ZooKeeper分布式协调组件

    ZooKeeper是什么 ZooKeeper是一个分布式的 开放源码的分布式应用程序协调服务 是Google的Chubby一个开源的实现 是Hadoop和Hbase Flink的重要组件 中文名 动物管理员 它是一个为分布式应用提供一致性服
  • 【精】彻底吃透HDFS写流程(5)-- DataStreamer线程类run方法分析以及如何构建pipeline?

    有关HDFS写流程的系列文章 精 彻底吃透HDFS写流程 1 BlockConstructionStage 精 彻底吃透HDFS写流程 2 Namenode侧create文件 精 彻底吃透HDFS写流程 3 DataStreamer线程和输
  • 大数据—— Flink 的优化

    目录 一 Flink内存优化 1 1 Flink 内存配置 二 配置进程参数 2 1 场景 2 2 操作步骤 三 解决数据倾斜 3 1 场景描述 3 2 解决方式 3 2 1 数据源的消费不均匀 调整并发度 3 2 2 数据分布不均匀 四
  • spark内存模型

    Spark 1 6 开始使用了统一内存管理模块 UnifiedMemoryManager 并引入了堆外内存 Off heap memory 1 6之前的内存管理就不进行介绍了 spark堆内和堆外内存模型的示意图 注意 堆外内存是依赖于wo
  • pycharm如何连接数据库并往数据库插入内容

    1 创建connection对象 2 创建cursor对象 游标对象 主要用于操作数据库 3 执行查询 4 关闭cursor对象 5 关闭connection 首先要先安装pumysql库 pip install pymysql 连接测试
  • Python Pandas导出Hbase数据到dataframe

    Python导出Hbase数据的思路 使用happybase连接Hbase 使用table scan 扫数据 将得到的数据整理为dataframe格式 将从Hbase中得到的byte类型的数据转为str类型的数据 示例代码 import h
  • docker搭建hadoop hdfs完全分布式集群

    1 制作hadoop镜像 参见 https www cnblogs com rmxd p 12051866 html 该博客中只参考制作镜像部分 固定IP及启动集群的部分应该跳过 这里注意 在做好的镜像里 要安装 which 工具 否则在执
  • 华为云,站在数字化背后

    一场新的中国数字化战斗 正在被缓缓拉开帷幕 作者 裴一多 出品 产业家 如果说最近的讨论热点是什么 那无疑是互联网云 在数字化进入纵深的当下 一种市面上的观点是互联网的云业务由于盈利等问题 正在成为 被抛弃 的一方 互联网公司开始重新回归T
  • 利用人工智能技术普及教学应用、拓展教师研训应用、增强教育系统监测能力

    2019年 中国教育现代化2035 指出 以人才培养为核心 通过提升校园智能化水平 探索新型教学形式 创新教育服务业态 推进教育治理方式变革 智能驱动教育创新发展 2021年教育部等六部门发布 关于推进教育新型基础设施建设构建高质量教育支撑
  • 在linux下jdk安装和建立Hadoop集群的过程实验报告(搭建Hadoop集群)。

    1 模板虚拟机环境准备 相关视频 半小时快速搭建Hadoop集群 哔哩哔哩 bilibilihttps www bilibili com video BV1x5411177Y spm id from 333 880 my history p
  • Flume-ng 拖尾文件

    我试图了解如何使用 Flume ng 尾部文件 以便可以将数据推送到 HDFS 中 在第一个实例中 我设置了一个简单的conf文件 tail1 sources source1 tail1 sinks sink1 tail1 channels
  • Cloudera 5.4.2:使用 Flume 和 Twitter 流时 Avro 块大小无效或太大

    当我尝试 Cloudera 5 4 2 时出现了一个小问题 基于这篇文章 Apache Flume 获取 Twitter 数据http www tutorialspoint com apache flume fetching twitter
  • 2023_Spark_实验二十九:Flume配置KafkaSink

    实验目的 掌握Flume采集数据发送到Kafka的方法 实验方法 通过配置Flume的KafkaSink采集数据到Kafka中 实验步骤 一 明确日志采集方式 一般Flume采集日志source有两种方式 1 Exec类型的Source 可
  • 由于 JAR 冲突,无法运行 Flume

    我已经手动安装了 Flume 和 Hadoop 我的意思是 不是 CDH 并且我正在尝试运行 twitterexample https github com cloudera cdh twitter example来自Cloudera In
  • 使用具有正斜杠的密钥连接到 s3 接收器时出现无效主机名错误

    我有一个forward slash在 aws 密钥中 当我尝试连接到 s3 接收器时 Caused by java lang IllegalArgumentException Invalid hostname in URI s3 xxxx

随机推荐

  • found input variables with inconsistene numbers of samples:[] 报错处理

    在用train text spilt进行机器学习的训练时候 出现了以下的报错 代码检查发现错误 train x train y test x test y train test split train x train y的行数不一致 应该改
  • 1分钟教你配置好你的python环境

    欢迎来到我们的系列博客 Python360全景 在这个系列中 我们将带领你从Python的基础知识开始 一步步深入到高级话题 帮助你掌握这门强大而灵活的编程语法 无论你是编程新手 还是有一定基础的开发者 这个系列都将提供你需要的知识和技能
  • 详解移植mjpg_streamer到arm板

    介绍 Mjpg streamer是一个开源软件 用于从webcam摄像头采集图像 把它们以流的形式通过基于ip的网络传输到浏览器如Firefox Cambozola VLC播放器 Windows的移动设备或者其他拥有浏览器的移动设备 mjp
  • 从0到1搭建自己的脚手架(java后端)

    一 脚手架是什么 脚手架是一种基础设施工具 用于快速生成项目的框架代码和文件结构 它是一种标准化的开发工具 使开发人员能够在项目的早期阶段快速搭建出一个具备基本功能和结构的系统 二 脚手架的意义 主流的微服务架构体系下很多公司会将原有的单体
  • SPSS 24安装后怎么打开的问题

    本人安装完spss 24之后打开发现还是需要许可证 再次输入完成就会全部关闭 解决方法 安装的步骤基本不会有问题 主要是针对出现安装完成 也填好许可证了的情况 可以通过下图对应的文件位置 双击打开 就可以使用了 安装包和教程可参考 链接 l
  • 多线程2(同步代码块+同步方法+同步锁+死锁)

    一 多线程同步 多线程的并发执行可以提高程序的效率 但是当多个线程去访问同一个资源时 有时也会引发一些安全性问题 例如 统计一个班上的学生人数时 学生有进有出会影响最终学生人数 为了解决这样的问题 需要实现多线程的同步 即限制某个资源在同一
  • 夯实C++基础之刷题:链表——相交链表

    一点点进步计划 首先要坚持刷题 刷题是一个将思路用代码实现的过程 2要自己看知识点 平时也看看面经 这样才与时俱进 先从每天能做一道题开始把 题目 1 相交链表 2 思路 看问题解析都用到了数学的双指针的方法 我是想不明白 但看解题的意思是
  • 数据仓库系列 - 缓慢渐变维度 (Slowly Changing Dimension) 常见的三种类型及原型设计...

    开篇介绍 在从 OLTP 业务数据库向 DW 数据仓库抽取数据的过程中 特别是第一次导入之后的每一次增量抽取往往会遇到这样的问题 业务数据库中的一些数据发生了更改 到底要不要将这些变化也反映到数据仓库中 在数据仓库中 哪些数据应该随之变化
  • STM32 USB HID 自定义设备 bulk 传输

    ST 意法半导体公司 为STM32系列处理器编写了外设USB的库 并提供了很好的参考例程 本文就是参考ST提供的例程 在STM32F4 discovery板子上实现usb bulk传输 Host端是在linux平台上利用libusb库函数写
  • mysql 临时表权限_MySQL临时表浅析

    一 MySQL如何使用内部临时表 在某些情况下 服务器会在处理query的时候组建内部临时表 这种表有两种存在形式 1 位于内存中 使用的是MEMORY存储引擎 内存临时表 2 位于磁盘上 使用MyISAM存储引擎 硬盘临时表 服务器可能在
  • 再介绍一种低成本的负电源电路

    前面介绍了几种产生负电源的方法 几种常用的产生负电源的方法 今天再来介绍一种低成本的负电源电路 用分离元件搭建 配合程序控制 实现正电源转负电源 先看电路 图中Q1 D1 L2和C1构成最基本的Buck Boost电路 L1 C2为一级LC
  • myeclipse非正常关闭处理办法

    myeclipse正常或非正常关闭后 再次运行 不显示启动时的logo和读条 进入主页面后程序基本就卡死 无法正常运行 解决办法 方法一 修改工作空间在刚启动Myeclipse的时候会有一个选择工作空间的地方 换一个新的工作空间即可 若是原
  • Redis7之介绍(一)

    一 介绍 1 1 基本了解 Remote Dictionary Server 远程字典服务 是完全开源的 使用ANSIC语言编写遵守BSD协议 是一个高性能的Key Value数据库提供了丰富的数据结构 例如String Hash List
  • 面试题: v-if和v-show有什么区别?

    面试题 v if和v show有什么区别 1 v if能够控制是否生成vnode 也就间接控制了是否生成对应的dom 当v if为true时 会生成对应的vnode 并生成对应的dom元素 当其为false时 不会生成对应的vnode 自然
  • openwrt 缺少 libc.so.6 libm.so.6 libpthread.so.0

    在开发openwrt时 编译内核的时候 自己写的代码在openwrt 编译报错 提示缺少依赖库文件 Package Gateway Auto is missing dependencies for the following librari
  • flutter版本号对比

    版本号对比 Future
  • 筛选素数之欧拉筛法 python实现 附带证明

    返回类型 列表 说明 返回小于upperBound的所有素数 def ouLaShai upperBound filter False for i in range upperBound 1 primeNumbers for num in
  • Java学习心得10——多态

    多态 一种类型的变量可以掌管多种类型的对象 这就是多态 说人话 直观理解成多种形态 人类就是多态的 黄种人 白种人 黑种人都是属于人类 人类这一个类可以表示黄种人 白种人 黑种人这三个类 这不就是多态多种形态吗 回到编程 Animal 动物
  • 【华为OD机试真题 python】数字加减游戏【2022 Q4

    题目描述 数字加减游戏 小明在玩一个数字加减游戏 只使用加法或者减法 将一个数字s变成数字t 在每个回合中 小明可以用当前的数字加上或减去一个数字 现在有两种数字可以用来加减 分别为a b a b 其中b没有使用次数限制 请问小明最少可以用
  • 第四章 Flume专题-日志采集工具

    一 Flume专题之组件及架构介绍 1 Flume概述 1 1 Flume定义 Flume是一种分布式的 高可靠的和高可用的服务 用于有效地收集 聚合和移动大量日志数据框架 Flume是一个简单灵活的基于流数据的体系结构 1 2 Flume