Flume之:二、企业开发案例

2023-11-19

Flume之:二、企业开发案例



—>Spark知识点总结导航<—


三、企业开发案例

1. 监控端口数据官方案例

(1) 案例需求:首先,Flume监控本机44444端口,然后通过telnet工具向本机44444端口发送消息,最后Flume将监听的数据实时显示在控制台。

(2) 需求分析:

(3) 实现步骤:
  ①. 安装telnet工具
    a) 检查是否已经安装telnet

rpm -qa | grep telnet

  如果什么都没有,就是没有安装接着下一步吧。

    b) 安装telnet及telnet-server,注意,需要root权限来安装。(在线安装,需要网络)

yum install telnet -y
				
yum install telnet-server -y

    c) 因为装好telnet服务之后,默认是不开启服务的,下面我们需要修改文件来开启服务。

vim /etc/xinetd.d/telnet

 修改 disable = yes 为 disable = no

    d) 需要激活xinetd服务

service xinetd restart

 如果已经开启,就是这样:

    e) 需要测试telnet是否成功开启

telnet localhost

 如果前面的操作都没问题,输入用户名密码能登录成功。

  当你使用其他机器远程telnet的时候,如果不成功,那么很有可能是防火墙的问题,下面我们来修改防火墙的设置
  关闭防火墙

  ②. 判断44444端口是否被占用

sudo netstat -tunlp | grep 44444

 被占用:

  功能描述:netstat命令是一个监控TCP/IP网络的非常有用的工具,它可以显示路由表、实际的网络连接以及每一个网络接口设备的状态信息。

  基本语法:netstat [选项]

  选项参数:

				-t或--tcp:显示TCP传输协议的连线状况; 
				-u或--udp:显示UDP传输协议的连线状况;
				-n或--numeric:直接使用ip地址,而不通过域名服务器; 
				-l或--listening:显示监控中的服务器的Socket; 
				-p或--programs:显示正在使用Socket的程序识别码和程序名称;

  ③. 创建Flume Agent配置文件flume-telnet-logger.conf
    在flume目录下创建job文件夹并进入job文件夹。

mkdir jobconf
		
cd jobconf/

 在job文件夹下创建Flume Agent配置文件flume-telnet-logger.conf。

			touch flume-telnet-logger.conf

 在flume-telnet-logger.conf文件中添加如下内容。

			vim flume-telnet-logger.conf

 添加内容如下:

		# Name the components on this agent
		a1.sources = r1
		a1.sinks = k1
		a1.channels = c1
		
		# Describe/configure the source
		a1.sources.r1.type = netcat
		a1.sources.r1.bind = localhost
		a1.sources.r1.port = 44444
		
		# Describe the sink
		a1.sinks.k1.type = logger
		
		# Use a channel which buffers events in memory
		a1.channels.c1.type = memory
		a1.channels.c1.capacity = 1000
		a1.channels.c1.transactionCapacity = 100
		
		# Bind the source and sink to the channel
		a1.sources.r1.channels = c1
		a1.sinks.k1.channel = c1

 注:配置文件来源于官方手册http://flume.apache.org/FlumeUserGuide.html

  ④. 先开启flume监听端口

		bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume-telnet-logger.conf -Dflume.root.logger=INFO,console

 出现问题:(不需要解决)

		Error: Could not find or load main class org.apache.flume.tools.GetJavaProperty

 参数说明:

			--conf conf/  :表示配置文件存储在conf/目录
			--name a1    :表示给agent起名为a1
			--conf-file job/flume-telnet.conf :flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。
			-Dflume.root.logger==INFO,console :-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。

  ⑤. 使用telnet工具向本机的44444端口发送内容

			[duck@Cloud00 ~]$ telnet localhost 44444

  ⑥.在Flume监听页面观察接收数据情况


2. 实时读取本地文件到HDFS案例

(1) 案例需求:实时监控Hive日志,并上传到HDFS中

(2) 需求分析:

(3) 实现步骤:
  ①. Flume要想将数据输出到HDFS,必须持有Hadoop相关jar包
    将hadoop-common-2.7.2.jar (~/software/hadoop/share/hadoop/common)、

		commons-configuration-1.6.jar  (~/software/hadoop/share/hadoop/common/lib)、
		commons-io-2.4.jar、
		hadoop-auth-2.7.2.jar、
		htrace-core-3.1.0-incubating.jar、
		hadoop-hdfs-2.7.2.jar (~/software/hadoop/share/hadoop/hdfs)、

 拷贝到/home/duck/software/flume/lib文件夹下。

  ②. 创建flume-file-hdfs.conf文件
      创建文件

		 touch flume-file-hdfs.conf

  注:要想读取Linux系统中的文件,就得按照Linux命令的规则执行命令。由于Hive日志在Linux系统中所以读取文件的类型选择:exec即execute执行的意思。表示执行Linux命令来读取文件。

		vim flume-file-hdfs.conf

		添加如下内容
		# Name the components on this agent
		a2.sources = r2
		a2.sinks = k2
		a2.channels = c2
		 
		# Describe/configure the source
		a2.sources.r2.type = exec
		a2.sources.r2.command = tail -F /home/duck/software/hive/logs/hive.log
		a2.sources.r2.shell = /bin/bash -c
		 
		# Describe the sink
		a2.sinks.k2.type = hdfs
		a2.sinks.k2.hdfs.path = hdfs://Cloud00:9000/flume/%Y%m%d/%H
		#上传文件的前缀
		a2.sinks.k2.hdfs.filePrefix = logs-
		#是否按照时间滚动文件夹
		a2.sinks.k2.hdfs.round = true
		#多少时间单位创建一个新的文件夹
		a2.sinks.k2.hdfs.roundValue = 1
		#重新定义时间单位
		a2.sinks.k2.hdfs.roundUnit = hour
		#是否使用本地时间戳
		a2.sinks.k2.hdfs.useLocalTimeStamp = true
		#积攒多少个Event才flush到HDFS一次
		a2.sinks.k2.hdfs.batchSize = 1000
		#设置文件类型,可支持压缩
		a2.sinks.k2.hdfs.fileType = DataStream
		#多久生成一个新的文件
		a2.sinks.k2.hdfs.rollInterval = 600
		#设置每个文件的滚动大小
		a2.sinks.k2.hdfs.rollSize = 134217700
		#文件的滚动与Event数量无关
		a2.sinks.k2.hdfs.rollCount = 0
		#最小冗余数
		a2.sinks.k2.hdfs.minBlockReplicas = 1
		 
		# Use a channel which buffers events in memory
		a2.channels.c2.type = memory
		a2.channels.c2.capacity = 1000
		a2.channels.c2.transactionCapacity = 100
		 
		# Bind the source and sink to the channel
		a2.sources.r2.channels = c2
		a2.sinks.k2.channel = c2

  ③. 执行监控配置

		bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume-file-hdfs.conf
		
		bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume-file-hdfs.conf -Dflume.root.logger=INFO,console

  ④. 开启Hadoop和Hive并操作Hive产生日志

		sbin/start-dfs.sh
		sbin/start-yarn.sh
		bin/hive

  ⑤. 在HDFS上查看文件。


3. 实时读取目录文件到HDFS案例

(1) 案例需求:使用Flume监听整个目录的文件

(2) 需求分析:

(3) 实现步骤:
  ①. 创建配置文件flume-dir-hdfs.conf
     创建一个文件

				 touch flume-dir-hdfs.conf

   打开文件

				 vim flume-dir-hdfs.conf

   添加如下内容

				a3.sources = r3
				a3.sinks = k3
				a3.channels = c3
				
				# Describe/configure the source
				a3.sources.r3.type = spooldir
				a3.sources.r3.spoolDir = /home/duck/software/flume/upload
				a3.sources.r3.fileSuffix = .COMPLETED
				a3.sources.r3.fileHeader = true
				#忽略所有以.tmp结尾的文件,不上传
				a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
				
				# Describe the sink
				a3.sinks.k3.type = hdfs
				a3.sinks.k3.hdfs.path = hdfs://Cloud00:9000/flume/upload/%Y%m%d/%H
				#上传文件的前缀
				a3.sinks.k3.hdfs.filePrefix = upload-
				#是否按照时间滚动文件夹
				a3.sinks.k3.hdfs.round = true
				#多少时间单位创建一个新的文件夹
				a3.sinks.k3.hdfs.roundValue = 1
				#重新定义时间单位
				a3.sinks.k3.hdfs.roundUnit = hour
				#是否使用本地时间戳
				a3.sinks.k3.hdfs.useLocalTimeStamp = true
				#积攒多少个Event才flush到HDFS一次
				a3.sinks.k3.hdfs.batchSize = 100
				#设置文件类型,可支持压缩
				a3.sinks.k3.hdfs.fileType = DataStream
				#多久生成一个新的文件
				a3.sinks.k3.hdfs.rollInterval = 600
				#设置每个文件的滚动大小大概是128M
				a3.sinks.k3.hdfs.rollSize = 134217700
				#文件的滚动与Event数量无关
				a3.sinks.k3.hdfs.rollCount = 0
				#最小冗余数
				a3.sinks.k3.hdfs.minBlockReplicas = 1
				
				# Use a channel which buffers events in memory
				a3.channels.c3.type = memory
				a3.channels.c3.capacity = 1000
				a3.channels.c3.transactionCapacity = 100
				
				# Bind the source and sink to the channel
				a3.sources.r3.channels = c3
				a3.sinks.k3.channel = c3

  ②. 启动监控文件夹命令

		bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf -Dflume.root.logger=INFO,console

   说明: 在使用Spooling Directory Source时
    a) 不要在监控目录中创建并持续修改文件
    b) 上传完成的文件会以.COMPLETED结尾
    c) 被监控文件夹每500毫秒扫描一次文件变动

  ③. 向upload文件夹中添加文件
   在/home/duck/software/flume目录下创建upload文件夹

			mkdir upload
			向upload文件夹中添加文件
			vim text1.txt
			vim text1.tmp
			vim text1.log
			(随便编辑内容)

  ④. 查看HDFS上的数据

  ⑤. 等待1s,再次查询upload文件夹


4. flume监控Kafka

(1)

(2)

(3)
  ①. 开启zookeeper集群、Kafka集群

  ②

. bin/flume-ng agent --conf conf/ --name a4 --conf-file jobconf/flume-exec-kafka.conf -Dflume.root.logger=INFO,console

  ③. 开启消费者

未完待续…


--->有问题请联系QQ1436281495^_^

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flume之:二、企业开发案例 的相关文章

  • pyspark 连接远程hive集群配置

    今天本地spark连接远程hive集群 直接把配置导入进去 本地直接应用远程环境 1 安装spark 设置spark环境变量 2 拿到远程集群配置文件 将配置文件放在spark conf 目录下 xml 一共五个文件 3 将mysql co
  • 学习大数据spark——心得体会

    总结与体会 1 项目总结 本次项目实现了Spark 单机模式Python版的安装 介绍了与Spark编程有关的一些基本概念 特别对RDD的创建 转换和行动操作做了比较详细的说明 对从RDD 到DataFrame的实现进 行了案例训练 包括
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • 大数据相关常用软件下载地址集锦

    文章目录 每日一句正能量 前言 一 软件下载地址如下 二 文档地址如下 结语 每日一句正能量 生命中有一些人与我们擦肩了 却来不及遇见 遇见了 却来不及相识 相识了 却来不及熟悉 熟悉了 却还是要说再见 前言 由于大数据开发中经常需要用到Z
  • dolphinschedule使用shell任务结束状态研究

    背景 配置的dolphin任务 使用的是shell shell里包含了spark submit 如下截图 dolphin shell 介绍完毕 开始说明现象 有天有人调整了集群的cdp配置 executor cores max 1 我之前这
  • spark报Got an error when resolving hostNames. Falling back to /default-rack for all

    一 报错代码如下 21 06 01 20 13 36 INFO yarn SparkRackResolver Got an error when resolving hostNames Falling back to default rac
  • Spark Job写文件个数的控制以及小文件合并的一个优化

    文章目录 背景说明 通过引入额外Shuffle对写入数据进行合并 EnsureRepartitionForWriting Rule CoalesceShufflePartitions Rule OptimizeShuffleWithLoca
  • 大数据spark开发入门教程

    大数据是互联网发展的方向 大数据人才是未来的高薪贵族 随着大数据人才的供不应求 大数据人才的薪资待遇也在不断提升 如果你也想进入大数据行业 也想学习大数据技术 大数据讲师认为 可以先从spark技术开始 一 Spark是什么 Spark是一
  • spark中repartition和coalesce的区别

    总的来讲 两者 对是否允许shuffle 不同 coalesce numPartitions shuffle false repartition numPartitions repartition 其实是 coalesce 中参数shuff
  • spark_hadoop集群搭建自动化脚本

    bin bash 脚本使用说明 1 使用脚本前需要弄好服务器的基础环境 2 在hadoop的每个节点需要手动创建如下目录 data hdfs tmp 3 修改下面的配置参数 4 脚本执行完备后需要收到格式化namenode
  • 学习笔记-Spark环境搭建与使用

    一 20 04 Ubuntu安装 清华源ISO源 https mirrors tuna tsinghua edu cn ubuntu releases 20 04 下载链接 https mirrors tuna tsinghua edu c
  • 使用Flink1.16.0的SQLGateway迁移Hive SQL任务

    使用Flink的SQL Gateway迁移Hive SQL任务 前言 我们有数万个离线任务 主要还是默认的DataPhin调度CDP集群的Hive On Tez这种低成本任务 当然也有PySpark 打Jar包的Spark和打Jar包的Fl
  • 大数据手册(Spark)--Spark基本概念

    文章目录 Spark 基本概念 Hadoop 生态 Spark 生态 Spark 基本架构 Spark运行基本流程 弹性分布式数据集 RDD Spark安装配置 Spark基本概念 Spark基础知识 PySpark版 Spark机器学习
  • Spark 任务调度机制

    1 Spark任务提交流程 Spark YARN Cluster模式下的任务提交流程 如下图所示 图YARN Cluster任务提交流程 下面的时序图清晰地说明了一个Spark应用程序从提交到运行的完整流程 图Spark任务提交时序图 提交
  • spark-3.1.2兼容多版本hive

    2 3 9版本Hive的支持 直接在实例化SparkSession时 启用hive支持即可 例如 val spark SparkSession builder appName Spark Hive Example config spark
  • 通过yarn提交作业到spark,运行一段时间后报错。

    加粗样式
  • spark hadoop环境及运行

    hadoop配置 在Ubuntu20 04里安装Hadoop详细步骤 图文 亲测成功 ubuntu20 04安装hadoop 菜鸡的学习之路的博客 CSDN博客 启动hadoop root ubuntu usr local hadoop s
  • Spark常用参数解释

    Spark的默认配置文件位于堡垒机上的这个位置 SPARK CONF DIR spark defaults conf 用户可以自行查看和理解 需要注意的是 默认值优先级最低 用户如果提交任务时或者代码里明确指定配置 则以用户配置为先 用户再
  • Cloudera 5.4.2:使用 Flume 和 Twitter 流时 Avro 块大小无效或太大

    当我尝试 Cloudera 5 4 2 时出现了一个小问题 基于这篇文章 Apache Flume 获取 Twitter 数据http www tutorialspoint com apache flume fetching twitter
  • Spark 中 BroadCast 导致的内存溢出(SparkFatalException)

    背景 本文基于 Spark 3 1 1 open jdk 1 8 0 352 目前在排查 Spark 任务的时候 遇到了一个很奇怪的问题 在此记录一下 现象描述 一个 Spark Application Driver端的内存为 5GB 一直

随机推荐

  • 控制理论个人学习笔记-非线性系统理论

    文章目录 非线性系统理论 非线性系统的一般概念 相平面基础 非线性系统的相平面分析 描述函数法基础 非线性系统的描述函数法分析 非线性系统理论 非线性系统的一般概念 典型非线性 死区 饱和 间隙 摩擦 继电特性 继电特性使得系统产生振荡 死
  • 利用Java访问WEB Service

    最近在学习Web Service 发现了一个国内的Web Service提供站点 其中最简单的是查询QQ在线状态服务 我通过Java直接发送SOAP请求文件访问Web Service成功 这种方式实现比较简单 不需要第三方的软件包 impo
  • STEP_7计数器相关

    计数器的使用
  • 阿里云服务器租用费用清单表(CPU内存带宽磁盘)

    阿里云服务器租用费用包括CPU内存 公网带宽和系统盘三部分 云服务器购买可以选择活动机型也可以选择自定义购买 活动机型配置固定选择不自由 自定义购买配置自由选择但是费用贵的一批 阿里云百科来详细说下云服务器1核2G 2核4G 4核8G 8核
  • VMware vSphere 6.7先睹为快

    vSphere是老朋友了 还用再多介绍吗 最新的好消息是 VMware vSphere推出了最新版本6 7 相较两年前推出的VMware vSphere 6 5版本 新增了很多强大的功能 作为业内领先的虚拟化和云平台 vSphere的一举一
  • nginx root&alias文件路径配置

    nginx指定文件路径有两种方式root和alias 这两者的用法区别 使用方法总结了下 方便大家在应用过程中 快速响应 root与alias主要区别在于nginx如何解释location后面的uri 这会使两者分别以不同的方式将请求映射到
  • 第4章 用GPT-2生成文本

    BERT 是基于双向 Transformer 结构构建 而 GPT 2 是基于单向 Transformer 这里的双向与单向 是指在进行注意力计算时 BERT会同时考虑被遮蔽词左右的词对其的影响 融合了双向上下文信息 它比较适合于文本生成类
  • IO流介绍和异常处理

    IO流 1 1IO的分类 根据数据的流向分为 输入流和输出流 输入流 把数据从其他设备上读取到内存中的流 输出流 把数据从内存中写到其他设备上的流 根据功能类型分为 字节流和字符流 字节流 以字节为单位 读写数据的流 字符流 以字符为单位
  • tomcat无法启动,也没找到错误日志

    最近做项目的时候 遇到一个问题 项目启动不了 并且没有任何错误日志 1 bug描述 在做项目的时候 启动Tomcat时报错 2 bug信息 Connected to server 2017 11 16 09 28 36 551 Artifa
  • Python:用tkinter制做一个音乐下载小软件

    人生苦短 我用Python 平常我们下载的歌曲 都是各种妖魔鬼怪的格式横行 想下载下来用一下都不行 还只能在它的播放器内听 这谁受得了 学Python是用来干嘛的 当然是解决问题咯 于是我直接写了一手音乐下载软件 强制全部保存mp3 这样就
  • netty服务端的代码

    client code 客户端的ChannelHandler集合 由子类实现 这样做的好处 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers 获取到handlers之后方便ChannelPipelin
  • Android应用开发(35)SufaceView基本用法

    Android应用开发学习笔记 目录索引 参考Android官网 https developer android com reference android view SurfaceView 一 SurfaceView简介 SurfaceV
  • c语言答案计算鸡兔同笼,鸡兔同笼-题解(C语言代码,思路清晰,简单易懂)

    解题思路 设鸡和兔子的数量为x y 则有x y n 2x 4y m 即可得x 4n m 2 y m 2n 2 只有x y为分数 或者为负数时 即为无解情况 详细代码如下 include int main double n m chicken
  • solc安装指定版本

    1 系统linux ubuntu20 04 2 solc安装指定版本 在编译的时候报错 Error Data location must be storage or memory for constructor parameter but
  • 残差神经网络(ResNet)

    残差神经网络的主要贡献是发现了退化现象 并针对退化现象发明了快捷连接 shortcut connection 极大的消除了深度过大的神经网络训练困难问题 1 神经网络越深准确率越高 假设一个层数较少的神经网络已经达到了较高准确率 可以在这个
  • TB-RK3399pro(Fedora28)图形界面与字符界面的切换

    TB RK3399pro Fedora28 使用的是LXDE图形界面 使用时默认打开7个屏幕 分别是tty1到tty6 加上一个没名字的tty7 LXDE为tty1号屏幕 若要切换至字符界面 使用快捷键 Ctrl Alt F2 F2也可以为
  • Wps ppt中无法打开超链接外部文件的解决办法。

    今天突然发现 在原来的Wps ppt中的所有超链接视频或照片都无法打开了 以下是解决办法 供参考 主要原因是Windows10系统升级出现的冲突问题 请卸载这两个补丁 KB5015807和KB5016066 或者卸载其中之一即可打开
  • Oracle 设定允许访问的IP地址

    开启按ip地址访问 修改 oracle10 app db network admin sqlnet ora 在文件最后加下列2行 vim sqlnet ora tcp validnode checking yes tcp invited n
  • 滑雪(记忆化搜索)

    题目 题解 记忆化搜索模板题 记忆化搜索的核心 本质是带剪枝的深搜 当某点的dp已赋值时 返回该值 其他情况进行深度搜索 模板 dfs u点 if u点的 dp 已经有值了 return u点的 dp 值 else 说明第一次到达u 则为u
  • Flume之:二、企业开发案例

    Flume之 二 企业开发案例 文章目录 Flume之 二 企业开发案例 三 企业开发案例 1 监控端口数据官方案例 2 实时读取本地文件到HDFS案例 3 实时读取目录文件到HDFS案例 4 flume监控Kafka gt Spark知识