记flume部署过程中遇到的问题以及解决方法(持续更新)

2023-05-16

项目需求是将线上服务器生成的日志信息实时导入kafka,采用agent和collector分层传输,app的数据通过thrift传给agent,agent通过avro sink将数据发给collector,collector将数据汇集后,发送给kafka,拓扑结构如下:




现将调试过程中遇到的问题以及解决方法记录如下:

1、 [ERROR - org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:484)] Unexpected throwable while invoking!

java.lang.OutOfMemoryError: Java heap space

原因:flume启动时的默认最大的堆内存大小是20M,实际环境中数据量较大时,很容易出现OOM问题,在flume的基础配置文件conf下的flume-env.sh中添加

export JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

并且在flume启动脚本flume-ng中,修改JAVA_OPTS="-Xmx20m"JAVA_OPTS="-Xmx2048m"

此处我们将堆内存的阈值跳转到了2G,实际生产环境中可以根据具体的硬件情况作出调整


2、  [ERROR - org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.run(TThreadedSelectorServer.java:544)] run() exiting due to uncaught error
  java.lang.OutOfMemoryError: unable to create new native thread

  原因:如果App给flume的thrift source发送数据时,采用短连接,会无限地创建线程,使用命令 pstree 时发现java的线程数随着发送数据量的增长在不停增长,最终达到了65500多个,超过了linux系统对线程的限制,解决方法是在thrift source配置项中增加一个线程数的限制。

agent.sources.r1.threads = 50

重新启动agent发现java的线程数达到70多就不再增长了


3、 Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count

原因:这是memory channel被占满导致的错误,memory channel默认最多只缓存100条数据,在生产环境中明显不够,需要将capacity参数加大


4、warn:"Thrift source %s could not append events to the channel."。

原因:查看flume的配置文档可以发现,各种类型的sink(thrift、avro、kafka等)的默认batch-size都是100,file channel、memory channel的transactioncapacity默认也都是100,如果修改了sink的batch-size,需要将batch-size设置为小于等于channel的transactioncapacity的值,否则就会出现上面的warn导致数据无法正常发送


5、agent处报

(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
        at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 10.200.197.82, port: 5150 }: Failed to send batch
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:315)
        at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:376)
        ... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 10.200.197.82, port: 5150 }: Exception thrown from remote handler
        at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:397)
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:374)
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:303)
        ... 4 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Connection reset by peer
        at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
        at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:389)
        ... 6 more
Caused by: java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:59)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        ... 1 more


collector报

2017-08-21 16:36:43,010 (New I/O  worker #12) [WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:201)] Unexpected exception from downstream.
org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 349070535 items! Connection closed.
        at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
        at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:422)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:478)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:366)
        at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:399)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:721)
        at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:111)
        at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:66)
        at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
        at org.jboss.netty.channel.Channels.close(Channels.java:820)
        at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
        at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:202)
        at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:378)
        at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:533)
        at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

原因:当agent到collector的数据在agent的avro sink处进行压缩时,在collector的avro source处必须解压,否则数据无法发送



6、org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {ssp_package-0=388595} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size, or decrease the maximum message size the broker will allow.
2017-10-11 01:30:10,000 (PollableSourceRunner-KafkaSource-r1) [ERROR - org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:314)] KafkaSource EXCEPTION, {}

原因:配置kafka source时,flume作为kafka的consumer,在consumer消费kafka数据时,默认最大文件大小是1m,如果文件大小超过1m,需要手动在配置里面调整参数,

但是在flume官网的配置说明-kakka source中,并没有找到配置fetch size的地方,但是在配置的最后一行有一个

Other Kafka Consumer Properties--These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.consumer. For example: kafka.consumer.auto.offset.reset

此处配置用的是kafka的配置方法,在kafka官网的配置文档-consumer configs-max.partition.fetch.bytes有相关说明

agent.sources.r1.kafka.consumer.max.partition.fetch.bytes = 10240000

此处将consumer的fetch.byte加到10m


7、2017-10-13 01:19:47,991 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:240)] Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2606058 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2606058 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

原因:与上一点类似,此处是kafka sink时,flume作为producer,也要设置文件的fetch大小,同样是参考kafka官网的配置

agent.sinks.k1.kafka.producer.max.request.size = 10240000


8、java.io.IOException: Too many open files
        at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
        at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
        at org.mortbay.jetty.nio.SelectChannelConnector$1.acceptChannel(SelectChannelConnector.java:75)
        at org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:686)
        at org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:192)
        at org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124)
        at org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:708)
        at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)

原因:文件句柄占用太多,首先查看flume占用句柄个数

lsof -p pid | wc -l 

pid是flume进程号,

vim /etc/security/limits.conf 

在最后加入  
* soft nofile 4096  
* hard nofile 4096  

最前的 * 表示所有用户,改完后重启下flume服务


9、(kafka-producer-network-thread | producer-1) [ERROR - org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:130)] Uncaught 
error in kafka producer I/O thread:
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'throttle_time_ms': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
        at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
        at java.lang.Thread.run(Thread.java:744)

原因:kafka集群版本较老,flume版本较新,此处kafka使用的版本是较老的0.8.2, flume使用1.7则会报上述错误,只能将flume降为1.6版本


9、sink到kafka上的数据没有均匀的分布在各个partition上,而是全部放在了同一个partition上

原因:这是老版本flume遗留下的一个bug,需要在event中构造一个包含key为 key 的header 键值对就能达到目的

a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
真正没有随机的原因本文并没有直接去找到,是借助另一种方式解决了问题
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

记flume部署过程中遇到的问题以及解决方法(持续更新) 的相关文章

随机推荐

  • 解析串口-接收完整数据帧

    在linux下编写串口通讯程序 xff0c 采用select监听串口的可读事件 xff0c 一旦可读 xff0c 调用read 但是我们会发现 xff0c read一次得到的数据通常不是完整的一个数据帧 比如完整数据帧为 但是实际上需要re
  • STL 基本容器 优缺点比较

    总结在先 xff1a xff11 如果需要高效的随机存取 xff0c 不在乎插入和删除的效率 xff0c 使用vector xff1b 2 如果需要大量的插入和删除元素 xff0c 不关心随机存取的效率 xff0c 使用list xff1b
  • STL源码剖析--vector容器

    写在前面 vector是我们在STL中最常用的容器 xff0c 我们对它的各种操作也都了然于胸 然而我们在使用vector的时候总会有一种很虚的感觉 xff0c 因为我们不清楚接口内部是如何实现的 在我们眼里宛如一个黑箱 xff0c 既危险
  • TCP/UDP调试工具的使用

    TCP UDP调试工具下载链接 前文 当我们写好一个TCP UDP的程序时 但是无法通信时 光看代码又找不出原因时 我们可以借助调试工具来检查是服务端还是客户端出现了问题 这样就很大的减少了错误的排查范围 再次感叹一下 这个工具真的很好用
  • 关于利用结构体和联合体数据收发的两种方法

    关于利用结构体和联合体数据收发的两种方法 关于最近接手的小项目 xff0c 有了一些经验 xff0c 所以进行一下记录 文章目录 关于利用结构体和联合体数据收发的两种方法前言一 联合体法二 结构体法小tips 前言 在我们利用自己的板子进行
  • RESTful初探之四(Restlets)

    size 61 large Restlets Restlet项目为 建立REST概念与Java类之间的映射 提供了一个轻量级而全面的框架 它可用于实现任何种类的REST式系统 xff0c 而不仅仅是REST式Web服务 color 61 r
  • FreeRTOS中的堆栈设置”与“系统启动文件中堆栈”的关系

    FreeRTOS中的堆栈设置 与 系统启动文件中堆栈 的关系 在STM32CubeMX生成工程时发现 xff0c 在FreeRTOS的配置中同样有TOTAL HEAP SIZE堆的大小配置 xff0c 这个堆与之前系统的堆空间有什么区别呢
  • nodejs 实现http账号密码Digest登录认证

    const http 61 require 39 http 39 const qs 61 require 39 querystring 39 const md5 61 require 39 md5 node 39 第一步 xff1a 获取n
  • PCB Layout各层含义与分层原则

    内容包括 PCB绘图软件各层含义的详细介绍以及一些在实际工作中的应用 xff0c Layout时多层板分层原则与阻抗匹配 紫色文字是超链接 xff0c 点击自动跳转至相关博文 持续更新 xff0c 原创不易 xff01 目录 xff1a 一
  • 2021-01-18

    求助 xff0c 关于Ubuntu20 04安装网络调试助手打不开的问题 我在虚拟机上安装了Ubuntu20 04并安装了网络调试助手 xff0c 但却打不开 xff0c 运用了sudo apt get libqtgui4 amd64也没用
  • 笔记:VSCode C/C++ 格式化修改设置

    VSCode安装C C 43 43 插件后 xff0c 就有了格式化代码的能力 xff0c 这个功能很好用 xff0c 一般来说不用改就可以用的很嗨皮 为啥要改默认设置 xff0c 只因不喜欢函数内大括号独占一行 xff0c 如此一屏显示的
  • NEMA 协议:GPRMC数据格式

    NEMA协议的由来 NMEA协议是为了在不同的GPS xff08 全球定位系统 xff09 导航设备中建立统一的BTCM xff08 海事无线电技术委员会 xff09 标准 xff0c 由美国国家海洋电子协会 xff08 NMEA The
  • 大小端(网络字节序)等概念

    1 大小端定义 大端存储模式 xff1a 是指数据的低位字节序保存在内存的高地址中 xff0c 而数据的高位字节序保存在内存的低地址中 小端存储模式 xff1a 是指数据的低位字节序保存在内存的低地址中 xff0c 而数据的高位字节序保存在
  • [李景山php]RHEL\CentOS 7\ubuntu16.04 下 MySQL 连接数被限制为214个

    问题 项目中 xff0c 由于连接数过多 xff0c 提示 Too many connections xff0c 需要增加连接数 我在 etc my cnf中修改了 max connections 61 2000 但是 xff0c 实际连接
  • 关系型数据库和非关系型数据库的特性以及各自的优缺点

    数据库 类型特性优点缺点关系型数据库 SQLite Oracle mysql1 关系型数据库 xff0c 是指采用了关系模型来组织 数据的数据库 xff1b 2 关系型数据库的最大特点就是事务的一致性 xff1b 3 简单来说 xff0c
  • 采用libuv的epoll方式实现的异步高性能libcurl发送数据的方法

    Libcurl较为基本的用法是easyinterface xff0c 它是最简单的同步接口 xff0c 容易理解 xff0c 实现代码简单 xff0c 但是性能低下 curl multi perform 43 select xff1a 可以
  • 各种浏览器语言包、国际化如何配置

    size 61 large 如果web项目使用了国际化多语言包 xff0c 切换浏览器语言包可以切换语言 xff1a color 61 red Firefox如何中英文切换 color 首先得下载语言包 xff0c 网址 xff1a url
  • libcurl采用curl_multi_perform() + curl_multi_wait()方式实现异步高性能l发送数据的方法

    前两篇文章 c c 43 43 调用libcurl库发送http请求的两种基本用法 采用libuv的epoll方式实现的异步高性能libcurl发送数据的方法 讲述了采用libcurl发送数据的基础方法和高性能方法 xff0c 基础方法较为
  • 网络库libevent、libev、libuv对比

    Libevent libev libuv 三个网络库 xff0c 都是c 语言实现的异步事件库Asynchronousevent library xff09 异步事件库本质上是提供异步事件通知 xff08 Asynchronous Even
  • 记flume部署过程中遇到的问题以及解决方法(持续更新)

    项目需求是将线上服务器生成的日志信息实时导入kafka xff0c 采用agent和collector分层传输 xff0c app的数据通过thrift传给agent xff0c agent通过avro sink将数据发给collector