Flume-ng 拖尾文件

2023-12-02

我试图了解如何使用 Flume-ng 尾部文件,以便可以将数据推送到 HDFS 中。在第一个实例中,我设置了一个简单的conf文件:

tail1.sources = source1
tail1.sinks = sink1
tail1.channels = channel1

tail1.sources.source1.type = exec
tail1.sources.source1.command = tail -F /var/log/apache2/access.log
tail1.sources.source1.channels = channel1

tail1.sinks.sink1.type = logger

tail1.channels.channel1.type = memory
tail1.channels.channel1.capacity = 1000
tail1.channels.channel1.transactionCapacity = 100

tail1.sources.source1.channels = channel1
tail1.sinks.sink1.channel = channel1

这是一个测试,我的期望是我会在控制台上看到输出。我使用以下命令运行它:

flume-ng agent --conf-file tail1.conf -n tail1 -Dflume.root.logger=DEBUG,INFO,console

我得到以下输出:

12/12/05 11:01:07 信息生命周期.LifecycleSupervisor: 开始 生命周期主管 1 12/12/05 11:01:07 INFO 节点.FlumeNode: Flume 节点开始 - tail1 12/12/05 11:01:07 INFO nodemanager.DefaultLogicalNodeManager:节点管理器从 2005 年 12 月 12 日开始 11:01:07 信息生命周期.LifecycleSupervisor:开始生命周期 主管 8 12/12/05 11:01:07 信息 properties.PropertiesFileConfigurationProvider:配置提供程序 开始于 2005 年 12 月 12 日 11:01:07 信息 properties.PropertiesFileConfigurationProvider:正在重新加载 配置文件:tail1.conf 12/12/05 11:01:07 INFO conf.FlumeConfiguration:处理:sink1 12/12/05 11:01:07 信息 conf.FlumeConfiguration:处理:sink1 12/12/05 11:01:07 信息 conf.FlumeConfiguration:添加接收器:sink1 代理:tail1 12/12/05 11:01:07 INFO conf.FlumeConfiguration:验证后水槽 配置包含代理配置:[tail1] 12/12/05 11:01:07 信息属性.PropertiesFileConfigurationProvider:创建 通道 12/12/05 11:01:08 INFO 仪器。MonitoredCounterGroup: 监控的计数器组类型:CHANNEL,名称:channel1,已注册 成功地。 2005 年 12 月 12 日 11:01:08 信息 properties.PropertiesFileConfigurationProvider:创建的通道 通道1 12/12/05 11:01:08 信息接收器.DefaultSinkFactory:正在创建 接收器实例:sink1,类型:记录器 12/12/05 11:01:08 INFO nodemanager.DefaultLogicalNodeManager:开始新配置:{ sourceRunners:{source1=EventDrivenSourceRunner: { 来源:org.apache.flume.source.ExecSource@1839aa9 }} inkRunners:{sink1=SinkRunner:{ 策略:org.apache.flume.sink.DefaultSinkProcessor@11f0c98 counterGroup:{ name:null 计数器:{} } }} 频道:{channel1=org.apache.flume.channel.MemoryChannel@17​​40f55} } 12/12/05 11:01:08 信息 nodemanager.DefaultLogicalNodeManager:正在启动 频道频道1 12/12/05 11:01:08 信息 Instrumentation.MonitoredCounterGroup:组件类型:CHANNEL,名称: 频道 1 于 2005 年 12 月 12 日 11:01:08 开始 nodemanager.DefaultLogicalNodeManager:启动接收器 sink1 12/12/05 11:01:08 INFO nodemanager.DefaultLogicalNodeManager:启动源 source1 12/12/05 11:01:08 信息 source.ExecSource:Exec 源启动 使用命令:tail -F /var/log/apache2/access.log

然而,没有进一步发生。

我有另一个会话,其中有以下命令:

tail -F /var/log/apache2/access.log

我可以在哪里看到正在写入的文件:

192.168.1.81 - - [05/Dec/2012:10:58:07 +0000] "GET / HTTP/1.1" 200 483 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
192.168.1.81 - - [05/Dec/2012:10:58:07 +0000] "GET /favicon.ico HTTP/1.1" 404 502 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
192.168.1.81 - - [05/Dec/2012:10:58:21 +0000] "GET / HTTP/1.1" 304 209 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
192.168.1.81 - - [05/Dec/2012:10:58:22 +0000] "GET /favicon.ico HTTP/1.1" 404 502 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"

你能帮我吗?我彻底困惑了。


你的配置文件看起来不错。我在 CDH4 中使用它并按照您的预期工作,我所做的只是更改尾部的日志文件位置。我看到控制台上的输出。就我而言,新的日志数据被连续写入我跟踪的文件中。数据中的时间戳使得您的示例中的情况看起来可能并非如此。

这是一个更完整的conf示例,更符合我认为您想要做的事情。它将跟踪文件并每 10 分钟或 10K 记录写入一个新的 HDFS 文件。改变agent1.sources.source1.命令到你的 tail 命令并改变agent1.sinks.sink1.hdfs.path and agent1.sinks.sink1.hdfs.filePrefix基于您的 HDFS 配置。

# A single-node Flume configuration
# uses exec and tail and will write a file every 10K records or every 10 min
# Name the components on this agent
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# Describe/configure source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /home/cloudera/LogCreator/fortune_log.log

# Describe sink1
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost/flume/logtest/
agent1.sinks.sink1.hdfs.filePrefix = LogCreateTest
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent1.sinks.sink1.hdfs.rollInterval = 600
# File size to trigger roll, in bytes (0: never roll based on file size) 
agent1.sinks.sink1.hdfs.rollSize = 0
#Number of events written to file before it rolled (0 = never roll based on number of events) 
agent1.sinks.sink1.hdfs.rollCount = 10000
# number of events written to file before it flushed to HDFS 
agent1.sinks.sink1.hdfs.batchSize = 10000 
agent1.sinks.sink1.hdfs.txnEventMax = 40000
# -- Compression codec. one of following : gzip, bzip2, lzo, snappy
# hdfs.codeC = gzip
#format: currently SequenceFile, DataStream or CompressedStream
#(1)DataStream will not compress output file and please don't set codeC
#(2)CompressedStream requires set hdfs.codeC with an available codeC
agent1.sinks.sink1.hdfs.fileType = DataStream 
agent1.sinks.sink1.hdfs.maxOpenFiles=50
# -- "Text" or "Writable"
#hdfs.writeFormat
agent1.sinks.sink1.hdfs.appendTimeout = 10000
agent1.sinks.sink1.hdfs.callTimeout = 10000
# Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
agent1.sinks.sink1.hdfs.threadsPoolSize=100 
# Number of threads per HDFS sink for scheduling timed file rolling
agent1.sinks.sink1.hdfs.rollTimerPoolSize = 1 
# hdfs.kerberosPrin--cipal Kerberos user principal for accessing secure HDFS
# hdfs.kerberosKey--tab Kerberos keytab for accessing secure HDFS
# hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
# hdfs.roundValue1 Rounded down to the highest multiple of this (in the unit configured using
# hdfs.roundUnit), less than current time.
# hdfs.roundUnit second The unit of the round down value - second, minute or hour.
# serializer TEXT Other possible options include AVRO_EVENT or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
# serializer.*


# Use a channel which buffers events to a file
# -- The component type name, needs to be FILE.
agent1.channels.channel1.type = FILE 
# checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
# dataDirs ~/.flume/file-channel/data The directory where log files will be stored
# The maximum size of transaction supported by the channel
agent1.channels.channel1.transactionCapacity = 1000000 
# Amount of time (in millis) between checkpoints
agent1.channels.channel1.checkpointInterval 30000
# Max size (in bytes) of a single log file 
agent1.channels.channel1.maxFileSize = 2146435071
# Maximum capacity of the channel 
agent1.channels.channel1.capacity 10000000 
#keep-alive 3 Amount of time (in sec) to wait for a put operation
#write-timeout 3 Amount of time (in sec) to wait for a write operation

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flume-ng 拖尾文件 的相关文章

  • 为什么 Dockerized Hadoop 数据节点注册了错误的 IP 地址?

    我有 Hadoop 2 7 1 名称节点和数据节点的单独 Docker 1 9 1 映像 我可以从中创建容器 并让它们通过用户定义的 Docker 网络进行通信 然而 数据节点似乎报告自己拥有网络网关的 IP 地址 而不是它自己的 IP 地
  • 如何在Hadoop中设置数据块大小?改变它有好处吗?

    如果我们可以更改 Hadoop 中的数据块大小 请告诉我如何操作 更改块大小是否有利 如果是 请告诉我为什么以及如何更改 如果没有 请告诉我为什么以及如何 您可以随时更改块大小 除非dfs blocksize参数在 hdfs site xm
  • 无法使用 scala 将字符串写入 hdfs 文件

    我编写了一些代码在 hdfs 中创建一个文件并向其写入字节 这是代码 def write uri String filePath String data String Unit System setProperty HADOOP USER
  • 解析数百万个小 XML 文件

    我有 1000 万个小 XML 文件 300KB 500KB 我在 Mapreduce 中使用 Mahaout 的 XML 输入格式来读取数据 并使用 SAX 解析器进行解析 但处理速度非常慢 使用输入文件的压缩 lzo 有助于提高性能吗
  • 尝试在 h5py 中打开 pandas 创建的 hdf 时缺少列

    这就是我的数据框的样子 第一列是一个整数 第二列是 512 个整数的单个列表 IndexID Ids 1899317 0 47715 1757 9 38994 230 12 241 12228 22861131 0 48156 154 63
  • 无法创建目录 /home/hadoop/hadoopinfra/hdfs/namenode/current

    我收到错误 Cannot create directory home hadoop hadoopinfra hdfs namenode current 尝试在我的本地 Mac 上安装 hadoop 时 这可能是什么原因 仅供参考 我将我的
  • 如何修改 Kotlin 序列的前缀但保留尾部?

    Kotlin 提供take and takeWhile先采取的方法n的项目Sequence
  • HDFS如何计算可用块?

    假设块大小为 128MB 则集群有 10GB 因此大约 80 个可用块 假设我创建了 10 个小文件 这些文件总共占用磁盘上 128MB 块文件 校验和 复制 和 10 个 HDFS 块 如果我想向HDFS添加另一个小文件 那么HDFS使用
  • 如何使用 Head 和 Tail 打印文件的特定行

    我想说输出文件的第 5 10 行 作为传入的参数 我怎样才能使用head and tail去做这个 where firstline 2 and lastline 3 and filename 1 运行它应该如下所示 lines sh fil
  • hadoop 连接在端口 9000 上被拒绝

    我想设置一个伪分布式模式的hadoop集群进行开发 由于端口 9000 上的连接被拒绝 尝试启动 hadoop 集群失败 这些是我的配置 非常标准 站点核心 xml
  • HBase如何实现对HDFS的随机访问?

    鉴于HBase是一个数据库 其文件存储在HDFS中 那么它如何实现对HDFS中单个数据的随机访问呢 这是通过什么方法实现的呢 From Apache HBase 参考指南 http hbase apache org book archite
  • 如何为 HDFS 递归列出子目录?

    我在 HDFS 中递归创建了一组目录 如何列出所有目录 对于普通的 UNIX 文件系统 我可以使用以下命令来做到这一点 find path type d print 但我想为 HDFS 得到类似的东西 递归列出目录内容hadoop dfs
  • Cat 文件与 HDFS 中的模式不匹配?

    我正在尝试 cat 与 hadoop HDFS 中的以下模式不匹配的文件 hdfs dfs cat gz 如何捕获所有不以 gz 结尾的文件 编辑 抱歉 但我需要在 Hadoop 中管理文件 显然 hdfs 附带的命令非常少 编辑2 所有文
  • Curl下载到HDFS

    我有这个代码 curl o fileName csv url xargs hdfs dfs moveFromLocal 1 somePath 当我执行此代码时 curl 将请求中的值放入 fileName csv 中 该文件将移动到 HDF
  • 连接到 Hive 时使用 Spark 进行 Kinit

    我正在尝试从独立的 Spark 连接到 Hive hadoop 集群具有 kerberos 身份验证 有人可以让我知道如何在 Spark 程序中执行 kinit 我可以连接到配置单元吗 更新 我的 Spark 与 Hadoop 位于不同的集
  • 方案中的尾递归幂函数

    我在方案中编写尾递归幂函数时遇到问题 我想使用辅助函数来编写该函数 我知道我需要一个参数来保存累计值 但在那之后我就陷入了困境 我的代码如下 define pow tr a b define pow tr h result if b 0 r
  • 如何获取 linux 实用程序 tail 的源代码?

    这个命令确实非常有用 但是我可以在哪里获取源代码以查看内部发生的情况 thanks tail 实用程序是 Linux 上 coreutils 的一部分 源压缩包 ftp ftp gnu org gnu coreutils coreutils
  • 如何在 Python 中跟踪日志文件?

    我想在 Python 中提供 tail F 或类似内容的输出 而无需阻塞或锁定 我找到了一些非常旧的代码来做到这一点here http code activestate com recipes 436477 filetailpy 但我认为现
  • 如何从hdfs读取文件[重复]

    这个问题在这里已经有答案了 我在 project1目录下的hadoop文件系统中有一个文本文件名mr txt 我需要编写 python 代码来读取文本文件的第一行 而不将 mr txt 文件下载到本地 但我无法从 hdfs 打开 mr tx
  • 当我将文件存储在 HDFS 中时,它们会被复制吗?

    我是 Hadoop 新手 当我使用以下方式存储 Excel 文件时hadoop fs putcommoad 它存储在HDFS中 复制因子为3 我的问题是 是否需要3份并分别存储到3个节点中 这是 HDFS 工作的漫画 https docs

随机推荐