flume自定义拦截器实现定制收集日志需求

2023-05-16

    flume默认提供了timestamp,host,static,regex等几种类型的拦截器,timestamp,host,static等拦截器,其实就是在消息头中增加了时间戳,主机名,键值对信息,这些信息可以作用于信宿中。比如有时间戳的话,我们可以存储消息的时候,按照日期文件夹的形式来存放每天的日志。这些拦截器功能有时候,不一定能够满足所有用户需求,因此flume支持用户自定义拦截器,来更加精确的收集日志信息。

    flume自定义拦截器,需要继承Interceptor接口,并实现相关方法,同时还需要自定义Builder,返回我们的自定义拦截器。下面看具体的代码,因为自定义flume拦截器,需要加入flume依赖,我们加入flume-ng-core(org.apache.flume)依赖,如果是maven工程,我们可以在pom.xml配置文件中加入如下配置:

<dependency>
  <groupId>org.apache.flume</groupId>
  <artifactId>flume-ng-core</artifactId>
  <version>1.9.0</version>
</dependency>

我这里使用的是flume版本是1.9.0,所以对应加入的依赖也是1.9.0版本。自定义拦截器代码如下:

package com.xxx.flume;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
/***
 * 拦截器:判断消息体是否以"aa"开头,如果是,返回为新的消息体,如果不是,返回空。
 * 新的消息体:aaxxx	timestamp=15xxxxxxxxxxx	host=localhost	type=type
 */
public class ZoneInterceptor implements Interceptor{

	@Override
	public void close() {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void initialize() {
		// TODO Auto-generated method stub
		
	}

	@Override
	public Event intercept(Event event) {
		// TODO Auto-generated method stub
		try {
			Map<String, String> header = event.getHeaders();
			String str = new String(event.getBody(),"UTF-8");
			String timestamp = "";
			String host = "";
			String type="";
			if(header.containsKey("timestamp")){
				timestamp = "timestamp="+header.get("timestamp");
			}
			if(header.containsKey("host")){
				host = "host="+header.get("host");
			}
			
			if(header.containsKey("type")){
				type="type="+header.get("type");
			}
			if(str.startsWith("aa")){
				ByteArrayOutputStream output = new ByteArrayOutputStream();
				output.write(event.getBody());
				output.write("\t".getBytes());
				output.write(timestamp.getBytes());
				output.write("\t".getBytes());
				output.write(host.getBytes());
				output.write("\t".getBytes());
				output.write(type.getBytes());
				event.setBody(output.toByteArray());
				return event;
			}
		} catch (Exception e) {
			// TODO: handle exception
		}
		return null;
	}

	@Override
	public List<Event> intercept(List<Event> list) {
		// TODO Auto-generated method stub
		List<Event> result = new ArrayList<Event>();
		for(Event event:list){
			Event e = intercept(event);
			if(e!=null){
				result.add(e);
			}
		}
		return result;
	}

	public static class Builder implements Interceptor.Builder{

		@Override
		public void configure(Context context) {
			// TODO Auto-generated method stub
			
		}

		@Override
		public Interceptor build() {
			// TODO Auto-generated method stub
			return new ZoneInterceptor();
		}
		
	}
}

    简单介绍一下这个拦截器的功能,我们假定收集来自网络消息,判断消息是否以aa开头,如果是,则将消息收集,否则放弃。收集的消息,我们再进行一次改变,在消息体后面通过制表符隔开的方式,新增时间戳,主机,type=flume内容。时间戳,主机和type这些信息来源于其他的拦截器,timestamp,host,static这三个拦截器。我们从消息头部的映射中得到这些变量的值,然后添加到消息体中。

    自定义拦截器的思路,就是实现intercept(Event event)方法和intercept(List<Event> list)方法。我们可以在intercept(Event event) 方法中做判断,然后拼接消息体。

    最重要的是,我们还需要定义一个静态Builder类实现Interceptor.Builder接口,然后在它的方法build()中返回我们自定义的拦截器。

打包自定义拦截器代码,然后上传到flume安装目录下的lib目录,然后我们来做一个配置文件:

aa.sources=netsource
aa.channels=c
aa.sinks=k

aa.sources.netsource.type=netcat
aa.sources.netsource.bind=0.0.0.0
aa.sources.netsource.port=30000
aa.sources.netsource.interceptors=timestamp host static zi
aa.sources.netsource.interceptors.timestamp.type=timestamp
aa.sources.netsource.interceptors.host.type=host
aa.sources.netsource.interceptors.static.type=static
aa.sources.netsource.interceptors.static.key=type
aa.sources.netsource.interceptors.static.value=flume
aa.sources.netsource.interceptors.zi.type=com.xxx.flume.ZoneInterceptor$Builder

aa.channels.c.type=memory
aa.channels.c.capacity=1000
aa.channels.c.transactionCapacity=100
aa.sinks.k.type=FILE_ROLL
aa.sinks.k.sink.directory=/home/software/flume/files
aa.sinks.k.sink.rollInterval=0
aa.sinks.k.sink.rollCount=3

aa.sources.netsource.channels=c
aa.sinks.k.channel=c

在自定义拦截器前面,我们增加了三个拦截器分别是timestamp,host,static,这些拦截器会将timestamp,host,type三个变量加入消息头的map中,我们在自定义拦截器中就正好可以使用这些变量。

启动flume,加载配置文件:

bin/flume-ng agent -c conf -f conf/custom_interceptors.conf -n aa -Dflume.root.logger=INFO,console

启动成功,通过telnet连接127.0.0.1 30000,发送5条消息,其中3条是满足拦截器的条件的,即以"aa"开头的消息:

发送成功,我们通过查看保存的文件内容,看看拦截器的效果,保存的文件中,只有三条以aa开头的消息,并且后面拼接了timestamp,host,type信息:

至此,一个简单的flume自定义拦截器就实现了,并且通过测试,验证了拦截器的正确性。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flume自定义拦截器实现定制收集日志需求 的相关文章

  • flume日志收集系统常见配置

    前面介绍了flume入门实例 xff0c 介绍了配置netcat信源 xff0c 以及memory信道 xff0c logger信宿 xff0c 其实flume常见的信源信道信宿有很多 xff0c 这里介绍flume常用信源的三种方式 xf
  • Flume实战

    前言 在一个完整的大数据处理系统中 xff0c 除了hdfs 43 mapreduce 43 hive组成分析系统的核心之外 xff0c 还需要数据采集 结果数据导出 任务调度等不可或缺的辅助系统 xff0c 而这些辅助工具在hadoop生
  • 大数据技术面试-Flume、kafka

    大数据技术面试 Flume kafka 1 Flume组成有哪些 2 Flume拦截器有哪些知识点 3 Flume采集数据会丢失吗 4 FileChannel如何优化 5 如何控制Kafka丢不丢数据 6 Kafka分区分配策略默认哪两种
  • day01(Flume)

    简介 一 概述 Flume是Apache提供的一套用于进行日志收集 汇聚和传输的框架 2 Flume的版本 Flume ng 和Flume og 不兼容 a Flume1 x Flume ng b Flume0 X Flume og htt
  • Flume 数据流监控——Ganglia的安装与部署

    1 Ganglia的安装 1 安装 dhttpd 服务与 php yasin hadoop102 flume sudo yum y install httpd php 2 安装其他依赖 atguigu hadoop102 flume sud
  • 基于Flume日志收集系统架构和设计(一)

    问题导读 1 Flume NG与Scribe对比 Flume NG的优势在什么地方 2 架构设计考虑需要考虑什么问题 3 Agent死机该如何解决 4 Collector死机是否会有影响 5 Flume NG可靠性 reliability
  • Flink Table API 与 Flink SQL 实现Kafka To Kafka 版本1.12

    Table API版本 0 前提 1 创建流和表执行环境 2 连接Source并创建Table 3 筛选Table对象中的数据 4 连接Sink并创建临时表 5 将Table对象写入临时表 测试 杠精打住 SQL 版本 最近有铁汁问我 一闪
  • hadoop学习——flume的简单介绍

    flume介绍 概述 Flume最早是Cloudera提供的日志收集系统 后贡献给Apache 所以目前是Apache下的项目 Flume支持在日志系统中定制各类数据发送方 用于收集数据 Flume是一个高可用的 高可靠的 鲁棒性 robu
  • 【大数据入门核心技术-Impala】(一)Impala简介

    目录 一 Impala介绍 二 Impala优势 三 Impala主要功能 一 Impala介绍 Impala是Cloudera公司主导开发的新型查询系统 它提供SQL语义 能查询存储在Hadoop的HDFS和HBase中的PB级大数据 已
  • flume实验

    1 上传flume ng 1 5 0 cdh5 3 6 tar gz 至 opt modules cdh 并解压 2 编辑 conf flume env sh export JAVA HOME usr java jdk1 7 0 79 3
  • flume使用(二):采集远程日志数据到MySql数据库

    本文内容可查看目录 本文内容包含单节点 单agent 和多节点 多agent 采集远程日志 说明 一 环境 linux系统 Centos7 Jdk 1 7 Flume 1 7 0 二 安装 linux中jdk mysql的安装不多赘述 fl
  • Flume之:二、企业开发案例

    Flume之 二 企业开发案例 文章目录 Flume之 二 企业开发案例 三 企业开发案例 1 监控端口数据官方案例 2 实时读取本地文件到HDFS案例 3 实时读取目录文件到HDFS案例 4 flume监控Kafka gt Spark知识
  • 第四章 Flume专题-日志采集工具

    一 Flume专题之组件及架构介绍 1 Flume概述 1 1 Flume定义 Flume是一种分布式的 高可靠的和高可用的服务 用于有效地收集 聚合和移动大量日志数据框架 Flume是一个简单灵活的基于流数据的体系结构 1 2 Flume
  • 大数据案例--电信日志分析系统

    目录 一 项目概述 1 概述 二 字段解释分析 1 数据字段 2 应用大类 3 应用小类 三 项目架构 四 数据收集清洗 1 数据收集 2 数据清洗 五 Sqoop使用 1 简介 2 Sqoop安装步骤 3 Sqoop的基本命令 六 数据导
  • Flume-ng 拖尾文件

    我试图了解如何使用 Flume ng 尾部文件 以便可以将数据推送到 HDFS 中 在第一个实例中 我设置了一个简单的conf文件 tail1 sources source1 tail1 sinks sink1 tail1 channels
  • 在接收器发生故障后,如何强制 Flume-NG 处理积压的事件?

    我正在尝试设置 Flume NG 从一组服务器 主要运行 Tomcat 实例和 Apache Httpd 收集各种日志 并将它们转储到 5 节点 Hadoop 集群上的 HDFS 中 设置如下所示 每个应用程序服务器将相关日志跟踪到一个执行
  • Flume - 整个文件可以被视为 Flume 中的一个事件吗?

    我有一个用例 需要将目录中的文件提取到 HDFS 中 作为 POC 我在 Flume 中使用了简单的目录假脱机 其中我指定了源 接收器和通道 并且它工作得很好 缺点是我必须为进入不同文件夹的多种文件类型维护多个目录 以便更好地控制文件大小和
  • 运行 fatjar 时无法加载 log4j2

    我正在开发一个使用 log4j2 日志记录的项目 在 intellij 中开发时 一切正常 并且日志记录按预期完成 log4j2 xml 通过在启动时通过 intellij 设置传递给 jvm 的 java 属性进行链接 但是一旦我尝试运行
  • java.io.IOException:无法获取 LocationBlock 的块长度

    我正在使用 HDP 2 1 对于集群 我遇到了以下异常 并且 MapReduce 作业因此失败 实际上 我们定期使用 Flume 版本的数据创建表 1 4 我检查了映射器尝试读取的数据文件 但我找不到任何内容 2014 11 28 00 0
  • 为 Flume-ng 编写自定义 HTTPSource 处理程序

    是否有任何资源可以帮助我为 Flume ng 的 HTTPSource 编写自定义处理程序 我阅读了文档 其中有一个 Json 示例处理程序 但我想知道是否有人需要编写一个处理程序来从 XML 消息正文创建 Flume 事件 HttpSou

随机推荐

  • mailto 参数说明

    mailto 可以调用系统内置软件发送电子邮件 参数说明 mailto xff1a 收件人地址 xff0c 可多个 xff0c 用 分隔 cc xff1a 抄送人地址 xff0c 可多个 xff0c 用 分隔 bcc xff1a 密件抄送人
  • mysql 导入导出数据库

    mysql 导入导出数据库 1 导出数据 导出test 数据库 R 表示导出函数和存储过程 xff0c 加上使导出更完整 mysqldump u root p R test gt test sql 导出test数据库中user表 mysql
  • php 广告加载类

    php 广告加载类 xff0c 支持异步与同步加载 需要使用Jquery ADLoader class php lt php 广告加载管理类 Date 2013 08 04 Author fdipzone Ver 1 0 Func publ
  • 使用<img>标签加载php文件,记录页面访问讯息

    原理 xff1a 通过 lt img gt 标标签加载php文件 xff0c php文件会使用gd库生成一张1x1px的空白透明图片返回 xff0c 并记录传递的参数写入log文件 lt img src 61 34 sitestat php
  • tput 命令行使用说明

    什么是 tput xff1f tput 命令将通过 terminfo 数据库对您的终端会话进行初始化和操作 通过使用 tput xff0c 您可以更改几项终端功能 xff0c 如移动或更改光标 更改文本属性 xff0c 以及清除终端屏幕的特
  • ROS2学习笔记(二)-- 多机通讯原理简介及配置方法

    在ROS1中由主节点 master 负责其它从节点的通信 xff0c 在同一局域网内通过设置主节点地址也可以实现多机通讯 xff0c 但是这种多机通讯网络存在一个严重的问题 xff0c 那就是所有从节点强依赖于主节点 xff0c 一旦运行主
  • 使用shell实现阿里云动态DNS

    https github com timwai aliyunDDNS shell 脚本全部使用基础的命令实现 xff0c 支持在openwrt中使用 修改以下参数为你自己的参数 ACCESS KEY ID 61 你的AccessKeyId
  • Java-两个较大的List快速取交集、差集

    工作中经常遇到需要取两个集合之间的交集 差集情况 xff0c 但是普通的retainAll 和removeAll 无法满足数据量大的情况 xff0c 由此就自己尝试运用其他的方法解决 注 xff1a 如果数据量小的情况下 xff0c 还是使
  • Xubuntu15.04更新系统源时出现错误提示W: GPG 错误:http://archive.ubuntukylin.com:10006 xenial InRelease: 由于没有公钥,无法验证

    在更新系统源后 xff0c 输入sudo apt get update之后出现提示 xff1a W GPG 错误 xff1a http archive ubuntukylin com 10006 xenial InRelease 由于没有公
  • ubuntu开启SSH服务远程登录

    ssh secure shell xff0c 提供安全的远程登录 从事嵌入式开发搭建linux开发环境中 xff0c ssh的服务的安装是其中必不可少的一步 ssh方便一个开发小组中人员登录一台服务器 xff0c 从事代码的编写 编译 运行
  • Python实现让视频自动打码,再也不怕出现少儿不宜的画面了

    人生苦短 我用Python 序言准备工作代码解析完整代码 序言 我们在观看视频的时候 xff0c 有时候会出现一些奇怪的马赛克 xff0c 影响我们的观影体验 xff0c 那么这些马赛克是如何精确的加上去的呢 xff1f 本次我们就来用Py
  • Docker安装nextcloud实验

    Docker安装nextcloud实验 修改验证方式 xff1a 从密钥到密码 sudo passwd root su root vi etc ssh sshd config 去掉下面前的 或修改yes no port 22 Address
  • Tesseract-OCR 字符识别---样本训练

    Tesseract是一个开源的OCR xff08 Optical Character Recognition xff0c 光学字符识别 xff09 引擎 xff0c 可以识别多种格式的图像文件并将其转换成文本 xff0c 目前已支持60多种
  • FPGA与OPENCV的联合仿真

    对于初学者来说 xff0c 图像处理行业 xff0c 最佳仿真方式 xff1a FPGA 43 OPENCV xff0c 因为OPENCV适合商业化 xff0c 适合自己写算法 1 xff09 中间交互数据介质 txt文档 2 xff09
  • 华硕P8Z77-V LX老主板转换卡升级NVMe M2硬盘经验,老主机的福音,质的飞跃

    每年双十一都是淘货升级老家伙的时候 xff0c 今年也不例外 xff0c 随着日子长久 xff0c 软件的增多 xff0c 虽然已经尽量装在系统盘以外的盘 xff0c 但C盘还是日渐不够用 xff0c 从以前的30G系统盘升到60G xff
  • linux 更换 软件源后 GPG错误

    linux 更换 软件源后 GPG错误 linux 软件源 GPG 签名 密钥 linux 更换 软件源后 GPG错误 http my oschina net emptytimespace blog 83633 如文章 1 中提到 xff1
  • ROS2学习笔记(四)-- 用方向键控制小车行走

    简介 xff1a 在上一节的内容中 xff0c 我们通过ROS2的话题发布功能将小车实时视频信息发布了出来 xff0c 同时使用GUI工具进行查看 xff0c 在这一节内容中 xff0c 我们学习一下如何订阅话题并处理话题消息 xff0c
  • flume大数据框架数据采集系统

    flume是cloudera开源的数据采集系统 xff0c 现在是apache基金会下的子项目 xff0c 他是hadoop生态系统的日志采集系统 xff0c 用途广泛 xff0c 可以将日志 网络数据 kafka消息收集并存储在大数据hd
  • flume日志收集系统常见配置

    前面介绍了flume入门实例 xff0c 介绍了配置netcat信源 xff0c 以及memory信道 xff0c logger信宿 xff0c 其实flume常见的信源信道信宿有很多 xff0c 这里介绍flume常用信源的三种方式 xf
  • flume自定义拦截器实现定制收集日志需求

    flume默认提供了timestamp host static regex等几种类型的拦截器 xff0c timestamp host static等拦截器 xff0c 其实就是在消息头中增加了时间戳 xff0c 主机名 xff0c 键值对