flume实验

2023-11-14

1、上传flume-ng-1.5.0-cdh5.3.6.tar.gz 至/opt/modules/cdh/ 并解压
2、编辑 /conf/flume-env.sh

export JAVA_HOME=/usr/java/jdk1.7.0_79

3、编辑/etc/profile

export FLUME_HOME=/opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin
export PATH=$PATH:$FLUME_HOME/bin

4、flume业务情景
在这里插入图片描述
flume是日志分析型项目的数据起点
5、下图为事件的处理过程,事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。在这里插入图片描述
当节点出现故障时,日志能够被传送到其他节点上而不会丢失。
6、概念和原理
Client:Client生产数据,运行在一个独立的线程。
Event:一个数据单元,消息头和消息体组成。(日志记录、 avro 对象等)
Flow: Event从源点到达目的点的迁移的抽象。
Agent:一个独立的Flume进程,包含组件Source、 Channel、 Sink。
Source: 数据收集组件。(source从Client收集数据,传递给Channel)
Channel:临时存储,保存由Source组件传递过来的Event队列。
Sink: 从Channel中读取并移除Event,将Event传递到下一个Agent(如果有的话)
在这里插入图片描述

7、Avro二进制序列化数据系统
Avro使用模式来实现数据结构定义。一个存储文件由两部分组成:头信息(Header)和数据块(Data Block)。头信息又由三部分构成:四个字节的前缀(类似于Magic Number),文件Meta-data信息和随机生成的16字节同步标记符。
8、Avro支持两种序列化编码方式:二进制编码和JSON编码。使用二进制编码会高效序列化,并且序列化后得到的结果会比较小。而JSON一般用于调试系统或是基于WEB的应用。
9、消息从客户端发送到服务器端需要经过传输层(Transport Layer),它发送消息并接收服务器端的响应。到达传输层的数据就是二进制数据。通常以HTTP作为传输模型,数据以POST方式发送到对方去。

10、单一流测试示例
通过flume来监控一个目录,当目录中有新文件时,将文件内容输出到控制台
编辑/conf/a1.properties

#配置一个agent,agent的名称可以自定义(如a1)
#指定agent的sources(如s1)、sinks(如k1)、channels(如c1)
#分别指定agent的sources,sinks,channels的名称 名称可以自定义
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  
   
#描述source
#配置目录scource
a1.sources.s1.type =spooldir  
a1.sources.s1.spoolDir =/opt/modules/cdh/hadoop-2.5.0-cdh5.3.6/logs 
a1.sources.s1.fileHeader= true  
a1.sources.s1.channels =c1  
   
#配置sink 
a1.sinks.k1.type = logger  
a1.sinks.k1.channel = c1  
   
#配置channel(内存做缓存)
a1.channels.c1.type = memory

启动命令

flume-ng agent --conf conf --conf-file /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/conf/a1.properties --name a1 -Dflume.root.logger=INFO,console

上传一个lining07.log文件至/opt/modules/cdh/hadoop-2.5.0-cdh5.3.6/logs

监控台将实时显示此日志能容
在这里插入图片描述
11、source 配置总结
a、安装netcat,读取nc发送的数据当做源数据

#类别
a1.sources.r1.type = netcat
#连接名
a1.sources.r1.bind = localhost
#端口
a1.sources.r1.port = 5566

可以通过nc localhost 5566来发送数据

b、监听目录做源数据

a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/logs
a1.sources.r1.fileHeader = true

c、监听文件做源数据

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/test-flume.log
a1.sources.r1.shell = /bin/bash -c

d、监听kafka生产者的数据作为源数据

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.channels = mem_channel
a1.sources.r1.batchSize = 5000
#配置kafka端口号
a1.sources.r1.servers = node02:9092,node03:9092,node04:9092
#设置kafka的topic(主题)
a1.sources.r1.topics = hellotopic
#配置groupid
a1.sources.r1.consumer.group.id = flume_test_id

12、单文件单流 AGENT 写入 HDFS

# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type= memory
agent1.channels.ch1.capacity = 100000
agent1.channels.ch1.transactionCapacity = 100000
agent1.channels.ch1.keep-alive = 30
  

#define source monitor a file
agent1.sources.avro-source1.type= exec
agent1.sources.avro-source1.shell = /bin/bash-c
agent1.sources.avro-source1.command= tail-n +0 -F /opt/test/testlog.txt
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.threads = 5
  
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type= hdfs
agent1.sinks.log-sink1.hdfs.path = hdfs://192.168.198.131:8020/flumeTest
agent1.sinks.log-sink1.hdfs.writeFormat = Text
agent1.sinks.log-sink1.hdfs.fileType = DataStream
agent1.sinks.log-sink1.hdfs.rollInterval = 0
agent1.sinks.log-sink1.hdfs.rollSize = 1000000
agent1.sinks.log-sink1.hdfs.rollCount = 0
agent1.sinks.log-sink1.hdfs.batchSize = 1000
agent1.sinks.log-sink1.hdfs.txnEventMax = 1000
agent1.sinks.log-sink1.hdfs.callTimeout = 60000
agent1.sinks.log-sink1.hdfs.appendTimeout = 60000
  
# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

执行命令

flume-ng agent --conf /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/conf/a1.properties -n agent1 -Dflume.root.logger=INFO,console

13、多 agent 汇聚写入 HDFS

因环境所限,未进行此实验。

14、flume将数据导入到Hbase
建表

hbase(main):001:0> create 't2','f2'

flume agent 配置

a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /opt/test/hbase.txt 
a1.sources.r1.channels = c1 

# Describe the sink 
a2.sources = r2 
a2.sinks = k2 
a2.channels = c2 

# Describe/configure the source 
a2.sources.r2.type = exec 
a2.sources.r2.command = tail -F /opt/test/hbase.txt 
a2.sources.r2.channels = c2 

# Describe the sink 
a2.sinks.k2.type = logger 
a2.sinks.k2.type = hbase 
a2.sinks.k2.table = t2  # 与hbase中创建的表名相同
a2.sinks.k2.columnFamily = f2  # 与hbase中创建的表的列簇相同
a2.sinks.k2.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer 
a2.sinks.k2.channel = memoryChannel 

# Use a channel which buffers events in memory 
a2.channels.c2.type = memory 
a2.channels.c2.capacity = 1000 
a2.channels.c2.transactionCapacity = 100 

# Bind the source and sink to the channel 
a2.sources.r2.channels = c2 
a2.sinks.k2.channel = c2

拷贝jar包

 cp /opt/modules/cdh/hbase-0.98.6-cdh5.3.6/lib/* /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/lib

启动flume a2

flume-ng agent --conf conf --conf-file /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/conf/filetohbase.conf --name a2 -Dflume.root.logger=INFO,console

向文件写入数据

echo 'Iamzhangli'>>/opt/test/hbase.txt

在hbase 中查看

scan "t2"
hbase(main):054:0> scan "t2"
ROW                                          COLUMN+CELL
 1595656365661-eFw9ub1SBT-0                  column=f2:payload, timestamp=1595656368932, value=lkjglsjfdsflkjglsjfdsf
 1595656393887-eFw9ub1SBT-1                  column=f2:payload, timestamp=1595656396890, value=gsdfasdfasglkjglsjfdsf
 1595656399523-eFw9ub1SBT-2                  column=f2:payload, timestamp=1595656402523, value=gsdfasdfasg
 1595656448582-eFw9ub1SBT-3                  column=f2:payload, timestamp=1595656451582, value=gdsfasdgsddlkjglsjfdsf
 1595656453566-eFw9ub1SBT-4                  column=f2:payload, timestamp=1595656456559, value=gsdfasdfasg
 1595656453566-eFw9ub1SBT-5                  column=f2:payload, timestamp=1595656456559, value=gdsfasdgsdd
 1595656520483-eFw9ub1SBT-6                  column=f2:payload, timestamp=1595656523485, value=dgdfsafdfdlkjglsjfdsf
 1595656525633-eFw9ub1SBT-7                  column=f2:payload, timestamp=1595656528630, value=gsdfasdfasg
 1595656525634-eFw9ub1SBT-8                  column=f2:payload, timestamp=1595656528630, value=gdsfasdgsdd
 1595656525635-eFw9ub1SBT-9                  column=f2:payload, timestamp=1595656528630, value=dgdfsafdfd
 1595656662762-eFw9ub1SBT-10                 column=f2:payload, timestamp=1595656665761, value=oiuyiuijiiussdgsadfsadfssdgsadfsadfsaIamzhangli
 1595656695816-eFw9ub1SBT-11                 column=f2:payload, timestamp=1595656698814, value=mmmmmjngli
 1595656746553-eFw9ub1SBT-12                 column=f2:payload, timestamp=1595656749549, value=gdgdgdgdgdgd>>/opt/test/hbase.txt
 1595656750865-eFw9ub1SBT-13                 column=f2:payload, timestamp=1595656753855, value=echo gdgdgdgdgdgd
 1595656762901-eFw9ub1SBT-14                 column=f2:payload, timestamp=1595656765894, value=gdgdgdgdgdgd
 1595656795947-eFw9ub1SBT-15                 column=f2:payload, timestamp=1595656798939, value=ooooooooooooooo
16 row(s) in 0.0340 seconds

hbase(main):055:0>

日志文件的数据成功导入hbase
结束监听服务 ctrl + c

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flume实验 的相关文章

  • 读取文件并获取 key=value 而不使用 java.util.Properties

    我正在构建一个 RMI 游戏 客户端将加载一个包含一些键和值的文件 这些键和值将用于多个不同的对象 它是一个保存游戏文件 但我不能为此使用 java util Properties 它符合规范 我必须读取整个文件并忽略注释行和与某些类不相关
  • 如何用Java写入OS系统日志?

    Mac OS 有一个名为 Console 的应用程序 其中包含记录的消息 错误和故障 我相信 Windows 中的等效项是事件查看器 我想 Linux 上也有一个 但我不知道它是什么 也不知道它在哪里 是否可以像这样从 Java 输出获取消
  • 正确配置JDK环境变量后仍然找不到java命令

    我在 Windows 虚拟机启动时安装 JDK 使用 cloudinit 用户数据将 PowerShell 脚本传输到 Windows 计算机 然后运行该脚本来安装 JDK softwares Get ItemProperty HKLM S
  • 如何在java中压缩/解压tar.gz文件

    谁能告诉我在java中压缩和解压缩tar gzip文件的正确方法我一直在搜索 但我能找到的最多的是zip或gzip 单独 我写了一个包装器公共压缩 http commons apache org compress called jarchi
  • Java Sqlite Gradle

    我对 gradle 和 java 还很陌生 我有一个使用 sqlite 的项目 它通过 intellij idea 运行良好 但我无法从终端运行它 它会抛出异常 java lang ClassNotFoundException org sq
  • 如何防止在 CXF Web 服务客户端中生成 JAXBElement

    我正在尝试使用 CXF 创建一个 Web 服务客户端来使用 WCF Web 服务 当我使用 wsdl2java 时 它生成具有 JAXBElement 类型而不是 String 的对象 我读到有关使用 jaxb bindings xml 文
  • 在 Tomcat 上部署 Java Web 项目,无需 WAR 或 EAR

    我有一个 Java Web 项目 Struts Spring 在我的本地主机上完美运行 我必须将其部署在我的网站上 但虚拟主机提供的 Tomcat Manager 界面显示 由于安全原因 它无法上传 WAR 文件 当联系技术支持时 我被告知
  • H264 字节流到图像文件

    第一次来这里所以要温柔 我已经在给定的 H 264 字节流上工作了几个星期 一般注意事项 字节流不是来自文件 它是从外部源实时提供给我的 字节流使用 Android 的媒体编解码器进行编码 当将流写入扩展名为 H264的文件时 VLC能够正
  • JBoss AS 5 中的共享库应该放在哪里?

    我是 Jboss 新手 但我有多个 Web 应用程序 每个应用程序都使用 spring hibernate 和其他开源库和 portlet 所以基本上现在每个 war 文件都包含这些 jar 文件 如何将这些 jar 移动到一个公共位置 以
  • 如何在Mac上使用eclipse安装jetty

    我是一个新手 jetty 和 RESTful API 我想使用 Jetty 创建 REST 服务 并希望将嵌入式 jetty 与 eclipse 一起使用 任何人都可以建议我在 Mac OS 中使用 Eclipse 安装 Jetty Jet
  • 从剪贴板获取图像 Awt 与 FX

    最近 我们的 Java FX 应用程序无法再从剪贴板读取图像 例如 用户在 Microsofts Paint 中选择图像的一部分并按复制 我不是在谈论复制的图像文件 它们工作得很好 我很确定它过去已经有效 但我仍然需要验证这一点 尽管如此
  • FFmpeg 不适用于 android 10,直接进入 onFailure(String message) 并显示空消息

    我在我的一个项目中使用 FFmpeg 进行视频压缩 在 Android 10 Google Pixel 3a 上 对于发送执行的任何命令 它会直接进入 onFailure String message 并显示空消息 所以我在我的应用程序 g
  • 更改 JTextPane 的大小

    我是Java新手 刚刚在StackOverflow中找到了这段代码 ResizeTextArea https stackoverflow com questions 9370561 enabling scroll bars when jte
  • 如何在命令提示符中检查 JAVA_OPTS 值?

    我们的应用程序部署 JBoss 服务器然后抛出错误 PermGen space 然后在 jboss bat 和配置文件中设置 permgen 变量中的 java OPTS JAVA OPTs 中是否有值 assige 如何检查 如何在命令提
  • XSLT:我们可以使用abs值吗?

    我想知道在 XSLT 中我们是否可以使用 math abs 我在某处看到过这个 但它不起作用 我有类似的东西
  • 为什么 RMI 注册表忽略 java.rmi.server.codebase 属性

    我正在运行 java RMI 的 Hello World 示例 1 我在空文件夹中运行注册表 motta motta laptop tmp rmiregistry 2 我启动 HTTP 服务器以在运行时检索类 下载文件夹包含客户端 服务器的
  • 当我在 Java 中输入 IP 时无法连接到我的服务器

    好的 我正在尝试学习 Java 客户端 服务器的内容 并且正在浏览教程代码 如下所示 当我将 localhost 更改为我的 IP 时 它会停止工作 请帮忙 编辑 127 0 0 1 似乎也可以工作 但不是我的真实IP Copyright
  • 条件查询:按计数排序

    我正在尝试执行一个标准查询 该查询返回 stackoverflow 中回答最多的问题 例如常见问题解答 一个问题包含多个答案 我正在尝试使用标准查询返回按每个问题的答案数排序的回答最多的问题 任何人都知道我应该在 hibernate cri
  • C/C++ 通过 Android NDK 在 JNI 中看不到 Java 方法

    我正在尝试从使用 NDK 构建的 C 类文件调用 Java 方法 它不断抛出常见的 未找到非静态方法 错误并导致整个 Android 应用程序崩溃 下面的代码片段 有些东西可能不需要 但我按原样保留它们 因为焦点 问题在于refreshJN
  • Java:基于 Web 的应用程序中的单例类实例

    我在 Web Application 中有这个 Singleton 类 public class MyDAO private static MyDAO instance private MyDAO public static MyDAO g

随机推荐

  • Java的Properties属性集、获取项目路径的3种方式(干货满满)

    属性集介绍 集合家族中有个成员java util Properties 它继承于Hashtable Properties是使用键值结构存储数据的 但它最大的特点是具有持久化功能 持久化 内存 gt 硬盘 持久化的过程必须依赖于IO流 对IO
  • MyBatis执行器与新增返回主键问题

    前提 在写需求时碰到一个问题 在新增加一条数据时需要返回主键并进行后续操作 发现当前项目并不能返回主键 正常返回主键代码 1
  • PTA C 7-3 计算职工工资

    给定N个职员的信息 包括姓名 基本工资 浮动工资和支出 要求编写程序顺序输出每位职员的姓名和实发工资 实发工资 基本工资 浮动工资 支出 输入格式 输入在一行中给出正整数N 随后N行 每行给出一位职员的信息 格式为 姓名 基本工资 浮动工资
  • C++继承

    继承的概念 继承 inheritance 机制是面向对象程序设计使代码可以复用的重要的手段 它允许程序员在保持原有类特性的基础上进行扩展 增加功能 这样产生新的类 称为派生类 继承呈现了面向对象程序设计的层次结构 体现了由简单到复杂的认知过
  • maven 报错Failed to execute goal org.apache.maven.pluginsmaven-archetype-plugin3.2

    新手走过各种各样的坑 idea中maven基础配置中总是出现各种各样的错误 在网上找了一些资料 发现并没有找到切入主题的解决方法 走过的坑总是记忆尤新 idea第一次配置maven 提示如下所示错误 仔细检查了一个maven的配置文件 发现
  • 正态分布函数_从微积分角度证明“正态分布密度函数”

    本篇我们来证明一个常见的优美的积分等式 聪明你是否看出如下等式曾在哪里出现过呢 没错如下和正态分布中概率密度函数很像 但我们仅从积分学的角度来分析正面它 证明它灵活的数学技巧 你准备好了吗 因为e x 2是关于x的偶函数 所以我们明显可以想
  • 安装SAS可能遇到的各种问题

    近日 为了提升数据分析的效率 准备开始学习SAS相关内容 结合自身已经掌握的Python 希望在数据分析 挖掘方向走的越来越远 下面 来分享下我安装SAS过程中遇到的各种问题 真是一个一个坑走过来的 系统环境 Windows 10 安装版本
  • 在对话框中实现预览图形文件的功能

    一 使用 acdbDisplayPreviewFromDwg 函数 1 引用说明 此功能获取由指定的图形的预览图像 如果有 pszDwgfilename 将其显示在由HWND参数pPreviewWnd标识的窗口中 图像尺寸最大变化不超过25
  • Anaconda3最新换国内源教程,中科大源或者清华源

    环境 ubuntu16 04 anaconda python 3 7 中科大源 conda config add channels https mirrors ustc edu cn anaconda pkgs main conda con
  • esp32 完整开发指南_【安信可ESP32语音开发板专题①】ESP32-A1S音频开发板之离线语音识别控制LED灯

    本博客学习由 安信可开源团队 潜心编写 做ESP32 A1S离线语音初步入门技术交流分享 如有不完善之处 请留言 本团队及时更改 一 前言 离线语音 顾名思义 在不连网络的状态下 产品能识别语音指令并执行相应的控制输出 安信可基于乐鑫ESP
  • @SpringQueryMap注解 feign的get传参方式

    SpringQueryMap注解 feign的get传参方式 问题 启动服务 传入参数测试 发现feign远程调用的方法入参失败 排查发现是feign接口调用controller方法的时候就没进来参 原因 spring cloud项目使用f
  • armeabi-v7a、arm64-v8a、armeabi、x86、x86_64的区别

    1 armeabi v7a 第七代及以上的ARM处理器 2011年以后生产的大部分Android设备都使用 2 arm64 v8a 第8代 64位ARM处理器 很少设备 三星GalaxyS6是其中之一 3 armeabi 第5代 第6代的A
  • go-zero 基础 -- 进阶指南

    版本 1 4 0 1 目录拆分 1 1 系统结构分析 在上文提到的商城系统中 每个系统在对外 http 提供服务的同时 也会提供数据给其他子系统进行数据访问的接口 rpc 因此每个子系统可以拆分成一个服务 而且对外提供了两种访问该系统的方式
  • FreeRTOS之软件定时器

    FreeRTOS之软件定时器 声明 本人按照正点原子的FreeRTOS例程进行学习的 欢迎各位大佬指责和批评 谢谢 include sys h include delay h include usart h include led h in
  • WIN7打开或关闭Windows功能后空白问题解决

    问题描述 打开或关闭Windows功能界面 一片空白 问题如下 解决方法 参考百度出来的几个办法 都无法解决 可能在下的系统的注册表问题比较严重 参考另一个方法 完美解决 windows7打开或关闭Windows功能后空白的问题 下载win
  • Python指南——类

    http blog csdn net ccat article details 8364 译者 至此Python指南的正文部分就全部译完了 感谢Clover姐姐 Sickkid 尹伟铭 面面 珂珂等朋友在翻译过程中给我提供的帮助和支持 特别
  • 用nodejs到底做什么?

    如何解决学了之后无法解决问题的状态 前端的内容很多 有html css javascript三个大模块 但是如何能去解决问题 核心还是根据你的兴趣 或者你根据一个你能看到的实际项目好好研究一下代码 了解其中运作的机制 然后尝试着修改一下代码
  • EduCoder_web实训作业--CSS样式规则

    由于时间关系 我只写第四题啦 2020 12 31 已将缺失关卡补全 第一关 B D C A B 第二关 h1 style font family 楷体 text align center line height 2 静夜思 h1 h2 s
  • Pandas数据处理与分析

    文章目录 前言 1 导入数据 2 审阅数据 3 数据预处理 4 数据分析 5 pandas数据可视化 这里不再过多的讲解pandas可视化 因为pandas中的数据可视化已经可以满足我们大部分的要求了 也就省下了我们很多自己使用 如 mat
  • flume实验

    1 上传flume ng 1 5 0 cdh5 3 6 tar gz 至 opt modules cdh 并解压 2 编辑 conf flume env sh export JAVA HOME usr java jdk1 7 0 79 3