在接收器发生故障后,如何强制 Flume-NG 处理积压的事件?

2023-12-28

我正在尝试设置 Flume-NG 从一组服务器(主要运行 Tomcat 实例和 Apache Httpd)收集各种日志,并将它们转储到 5 节点 Hadoop 集群上的 HDFS 中。设置如下所示:

每个应用程序服务器将相关日志跟踪到一个执行源(每种日志类型一个:java、httpd、syslog),该源通过 FileChannel 将它们输出到 Avro 接收器。在每台服务器上,不同的源、通道和接收器由一个代理管理。事件由驻留在 Hadoop 集群(还托管 secondaryNameNode 和 Jobtracker 的节点)上的 AvroSource 拾取。对于每种日志类型,都有一个 AvroSource 监听不同的端口。事件通过 FileChannel 进入 HDFS Sink,HDFS Sink 使用 FlumeEventAvro EventSerializer 和 Snappy 压缩来保存事件。

问题:Hadoop 节点上管理 HDFS 接收器(同样,每种日志类型一个)的代理在几个小时后失败,因为我们没有更改 JVM 的堆大小。从那时起,大量事件被收集到该节点上的 FileChannel 中,之后也在应用程序服务器上的 FileChannel 上收集,因为 Hadoop 节点上的 FileChannel 已达到其最大容量。当我修复问题时,我无法让 Hadoop 节点上的代理足够快地处理积压的工作,从而使其恢复正常运行。 FileChannel 在接收事件之前保存事件的 tmp 目录的大小一直在增长。另外,HDFS 写入速度似乎非常慢。 有没有办法强制 Flume 在摄取新事件之前先处理积压的事件?以下配置是最佳配置吗?可能相关:写入 HDFS 的文件非常小,大约 1 - 3 MB 左右。对于 64MB 的 HDFS 默认块大小以及未来的 MR 操作来说,这当然不是最佳选择。我应该使用什么设置来收集对于 HDFS 块大小足够大的文件中的事件? 我感觉 Hadoop 节点上的配置不正确,我怀疑 BatchSize、RollCount 和相关参数的值已关闭,但我不确定最佳值应该是什么。

应用程序服务器上的示例配置:

agent.sources=syslogtail httpdtail javatail
agent.channels=tmpfile-syslog tmpfile-httpd tmpfile-java
agent.sinks=avrosink-syslog avrosink-httpd avrosink-java

agent.sources.syslogtail.type=exec
agent.sources.syslogtail.command=tail -F /var/log/messages
agent.sources.syslogtail.interceptors=ts
agent.sources.syslogtail.interceptors.ts.type=timestamp
agent.sources.syslogtail.channels=tmpfile-syslog
agent.sources.syslogtail.batchSize=1

...

agent.channels.tmpfile-syslog.type=file
agent.channels.tmpfile-syslog.checkpointDir=/tmp/flume/syslog/checkpoint
agent.channels.tmpfile-syslog.dataDirs=/tmp/flume/syslog/data

...

agent.sinks.avrosink-syslog.type=avro
agent.sinks.avrosink-syslog.channel=tmpfile-syslog
agent.sinks.avrosink-syslog.hostname=somehost
agent.sinks.avrosink-syslog.port=XXXXX
agent.sinks.avrosink-syslog.batch-size=1

Hadoop 节点上的示例配置

agent.sources=avrosource-httpd avrosource-syslog avrosource-java
agent.channels=tmpfile-httpd tmpfile-syslog tmpfile-java
agent.sinks=hdfssink-httpd hdfssink-syslog hdfssink-java

agent.sources.avrosource-java.type=avro
agent.sources.avrosource-java.channels=tmpfile-java
agent.sources.avrosource-java.bind=0.0.0.0
agent.sources.avrosource-java.port=XXXXX

...

agent.channels.tmpfile-java.type=file
agent.channels.tmpfile-java.checkpointDir=/tmp/flume/java/checkpoint
agent.channels.tmpfile-java.dataDirs=/tmp/flume/java/data
agent.channels.tmpfile-java.write-timeout=10
agent.channels.tmpfile-java.keepalive=5
agent.channels.tmpfile-java.capacity=2000000

...

agent.sinks.hdfssink-java.type=hdfs
agent.sinks.hdfssink-java.channel=tmpfile-java
agent.sinks.hdfssink-java.hdfs.path=/logs/java/avro/%Y%m%d/%H
agent.sinks.hdfssink-java.hdfs.filePrefix=java-
agent.sinks.hdfssink-java.hdfs.fileType=DataStream
agent.sinks.hdfssink-java.hdfs.rollInterval=300
agent.sinks.hdfssink-java.hdfs.rollSize=0
agent.sinks.hdfssink-java.hdfs.rollCount=40000
agent.sinks.hdfssink-java.hdfs.batchSize=20000
agent.sinks.hdfssink-java.hdfs.txnEventMax=20000
agent.sinks.hdfssink-java.hdfs.threadsPoolSize=100
agent.sinks.hdfssink-java.hdfs.rollTimerPoolSize=10

我在您的配置中看到一些可能导致问题的内容:

  1. 您的第一个代理似乎有一个批次大小为 1 的 avro 接收器。您应该将其增加到至少 100 或更多。这是因为第二个代理上的 avro 源将以批量大小为 1 提交到通道。每次提交都会导致 fsync,从而导致文件通道性能较差。执行源上的批处理大小也是 1,导致该通道也很慢。您可以增加批处理大小(或使用假脱机目录源 - 稍后会详细介绍)。

  2. 您可以让多个 HDFS 接收器从同一通道读取数据以提高性能。您应该确保每个接收器写入不同的目录或具有不同的“hdfs.filePrefix”,以便多个 HDFS 接收器不会尝试写入相同的文件。

  3. HDFS接收器的批量大小是20000,这是相当高的,并且你的callTimeout是默认的10秒。如果你想保持这么大的批量大小,你应该增加“hdfs.callTimeout”。我建议将批量大小减少到 1000 左右,并设置大约 15-20 秒的超时。 (请注意,在当前批处理大小下,每个文件仅保存 2 个批处理 - 因此减少批处理大小,增加 rollInterval 和 timeOut)

如果您使用 tail -F,我建议尝试新的假脱机目录源。要使用此源,请将日志文件轮换到假脱机目录源处理的目录。该源只会处理不可变的文件,因此您需要将日志文件轮换出来。将 tail -F 与 exec source 一起使用会出现问题,如 Flume 用户指南中所述。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在接收器发生故障后,如何强制 Flume-NG 处理积压的事件? 的相关文章

随机推荐

  • Pandas - Python - 如何减去两个不同的日期列

    尝试用今天的日期减去created date列来填充列 但出现以下错误 TypeError unsupported operand type s for str and str import datetime now datetime da
  • 连接 Haskell 和 C++

    我想在 C 程序中调用一些 Haskell 函数 为此 我已申请these https github com jarrett cpphs说明并将其调整为我的代码和系统 我目前所掌握的内容如下 主程序 cpp 共享头文件和cpp文件 make
  • 在 setup.cfg 中嵌套或组合 setuptools 的 extras_require

    是否可以重用已在中指定的依赖项 options extras require 对于其他条目 options extras require 举例来说 一个开发团队正在使用mypy在开发过程中检查它们的类型注释 以及black自动格式化他们的代
  • 我在哪里可以获得 VB6 IDE [重复]

    这个问题在这里已经有答案了 可能的重复 如何编译旧版 VB6 代码 https stackoverflow com questions 229868 how to compile legacy vb6 code 在哪里可以找到 VB6 的
  • 生成拼写错误的单词(打字错误)

    我已经实现了模糊匹配算法 我想使用一些带有测试数据的示例查询来评估其召回率 假设我有一个包含文本的文档 text The quick brown fox jumps over the lazy dog 我想看看是否可以通过测试诸如 sox
  • 如何在 CentOS 7 中安装 pip?

    CentOS 7 EPEL 现在包含 Python 3 4 yum install python34 然而 当我尝试这样做时 即使 Python 3 4 安装成功 它似乎也没有安装 pip 这很奇怪 因为pipPython 3 4 应该默认
  • 谷歌财经 API 从 2017 年 9 月 6 日起不再工作

    我使用谷歌金融 API 来获取股票报价并在我的网站上显示内容 从 2017 年 9 月 6 日起 此功能突然停止工作 我用来获取股票报价的网址是 以前 我使用的是雅虎财经 API 它不一致 所以 我切换到了谷歌金融API 你能帮我解决这个问
  • 在功能组件中使用回调来响应 setState

    我在类组件中编写了一个非常简单的示例 setErrorMessage msg this setState error message msg gt setTimeout gt this setState error message 5000
  • 更改 valueBoxes 的字体大小

    我想更改值的字体大小和副标题valueBoxes 以下是我的尝试 但对于如何以类似于默认外观的方式更改它的任何建议 我将不胜感激 下面是我的可重现的例子 require shinydashboard valueBox2 lt functio
  • 将表达式树解析为嵌套列表

    我对 F 比较陌生 在解析包含嵌套列表的表达式树时确实遇到了困难 根据网上的零碎资料 我拼凑了以下内容 我的标准类型定义为 type Return Real of float Func of string Return list 我对外部应
  • 将 bcrypt 添加到 package.json 时,如何解决使用 docker node alpine Image 时“找不到任何可使用的 Python 安装”的问题?

    在我将 bcrypt 添加到 package json 之前 一切正常 现在 我收到以下错误消息 这是我的 package json 的摘录 dependencies bcrypt 3 0 6 express 4 17 1 mongodb
  • 自定义属性:必须是明显我做错的事情

    我一直在添加不同实体的部分类 以毫无问题地添加各种有用的方法 根据我见过的示例 尝试添加属性似乎很简单 但我的失败得很惨 Updated例子 public List
  • 在javascript中识别firefox浏览器的选定选项卡URL

    我正在开发一个网络应用程序 我有兴趣获取选定的选项卡 URL 我正在使用火狐浏览器 我通过执行以下 javascript 语句实现了这一点 Get the URL of page which is currently loaded in a
  • 带 CASE 条件和 SUM() 的 SELECT 查询

    我目前正在使用这些sql语句 我的表有 CPaymentType 字段 其中包含 现金 或 支票 我可以通过执行 2 个 SQL 语句来汇总付款金额 如下所示 在这种情况下 用户甚至不会注意到执行2条sql语句或仅执行1条sql语句时的速度
  • JS原型与继承

    在业余时间我尝试学习一点 JS 但我还是坚持主题中的主题 var person new Person Bob Smith 52 var teacher new Teacher Adam Greff 209 function Humans f
  • 在 Unity C# WWW 中显示进度条

    我有这段代码可以从服务器下载视频 但我需要显示进度条 这可能吗 我知道我不能有 WriteAllBytes 的进度条 private IEnumerator DownloadStreamingVideoAndLoad string strU
  • App.config:用户与应用程序范围

    我已经在我的项目中添加了 App config 文件 我从 项目 gt 属性 gt 设置 面板创建了两个设置 我注意到 当我添加设置时 我可以将范围定义为User or Application User 应用 如果我将设置定义为User它去
  • 在派生类中扩展枚举[重复]

    这个问题在这里已经有答案了 我有一个类层次结构 其中的每个类都有一个异常类 在并行层次结构中派生 因此 class Base class Derived public Base class BaseException public std
  • C# 用韩文编码保存文件

    具有以下代码块 用于使用所选编码保存文件 当文件在文本编辑器中打开时 它显示编码为 ASCII StringBuilder sb new StringBuilder sb Append Lots of korean text here En
  • 在接收器发生故障后,如何强制 Flume-NG 处理积压的事件?

    我正在尝试设置 Flume NG 从一组服务器 主要运行 Tomcat 实例和 Apache Httpd 收集各种日志 并将它们转储到 5 节点 Hadoop 集群上的 HDFS 中 设置如下所示 每个应用程序服务器将相关日志跟踪到一个执行