我正在测试 Flume 将数据加载到 hHase 中,并考虑使用 Flume 的选择器和拦截器进行并行数据加载,因为源和接收器之间的速度差距。
所以,我想要用 Flume 做的是
-
使用拦截器 regexp_extract 类型创建事件标头
-
使用选择器的多路复用类型将带有标头的事件多路复用到两个以上通道
在一个源-通道-接收器中。
并尝试如下配置。
agent.sources = tailsrc
agent.channels = mem1 mem2
agent.sinks = std1 std2
agent.sources.tailsrc.type = exec
agent.sources.tailsrc.command = tail -F /home/flumeuser/test/in.txt
agent.sources.tailsrc.batchSize = 1
agent.sources.tailsrc.interceptors = i1
agent.sources.tailsrc.interceptors.i1.type = regex_extractor
agent.sources.tailsrc.interceptors.i1.regex = ^(\\d)
agent.sources.tailsrc.interceptors.i1.serializers = t1
agent.sources.tailsrc.interceptors.i1.serializers.t1.name = type
agent.sources.tailsrc.selector.type = multiplexing
agent.sources.tailsrc.selector.header = type
agent.sources.tailsrc.selector.mapping.1 = mem1
agent.sources.tailsrc.selector.mapping.2 = mem2
agent.sinks.std1.type = file_roll
agent.sinks.std1.channel = mem1
agent.sinks.std1.batchSize = 1
agent.sinks.std1.sink.directory = /var/log/flumeout/1
agent.sinks.std1.rollInterval = 0
agent.sinks.std2.type = file_roll
agent.sinks.std2.channel = mem2
agent.sinks.std2.batchSize = 1
agent.sinks.std2.sink.directory = /var/log/flumeout/2
agent.sinks.std2.rollInterval = 0
agent.channels.mem1.type = memory
agent.channels.mem1.capacity = 100
agent.channels.mem2.type = memory
agent.channels.mem2.capacity = 100
但是,这不起作用!
当选择器部分被删除时,flume 的日志中有一些拦截器调试消息。
但是当选择器和拦截器在一起时,就什么都没有了。
有什么表达错误或者漏掉的地方吗?
谢谢阅读。 :)
我找到了。
在flume日志中,有如下警告消息。
2013-10-10 16:34:20,514 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:571)] Removed tailsrc due to Failed to configure component!
所以我附上了下面一行
agent.sources.tailsrc.channels = mem1 mem2
然后就可以了!!!!
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)