从 Kafka 消费失败迭代器处于失败状态

2023-12-14

我在使用来自 kafka 的消息时遇到异常。

org.springframework.messaging.MessagingException: Consuming from Kafka failed; nested exception is java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state    

我在应用程序上下文中有一个使用者和一个出站适配器。

应用程序上下文中的消费者配置

<int-kafka:consumer-context id="consumerContext" consumer-timeout="4000" zookeeper-connect="zookeeperConnect">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration group-id="GR1"
            value-decoder="valueDecoder" key-decoder="valueDecoder" max-messages="1000">
        <int-kafka:topic-filter pattern="SOME_TOPIC" streams="13"/>
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>    

和 1 个出站通道适配器

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
    kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="kafka" group-id="GR1">
    <int:poller fixed-delay="100" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>    

我正在 glassfish 3 服务器上部署我的应用程序

当消费者开始工作时,我在应用程序日志中看到以下异常。

2015-01-20 12:51:48.218 UTC || ERROR || [task-scheduler-3 ] || [ErrorHandler] - Error processing message Consuming from Kafka failed; nested exception is java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state org.springframework.messaging.MessagingException: Consuming from Kafka failed; nested exception is java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state
at org.springframework.integration.kafka.support.ConsumerConfiguration.executeTasks(ConsumerConfiguration.java:110) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at org.springframework.integration.kafka.support.ConsumerConfiguration.receive(ConsumerConfiguration.java:86) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at org.springframework.integration.kafka.support.KafkaConsumerContext.receive(KafkaConsumerContext.java:56) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at org.springframework.integration.kafka.inbound.KafkaHighLevelConsumerMessageSource.receive(KafkaHighLevelConsumerMessageSource.java:41) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:124) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:192) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) [spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.1.1.RELEASE.jar:4.1.1.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) [spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292) [spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-4.1.1.RELEASE.jar:4.1.1.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.1.1.RELEASE.jar:4.1.1.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [?:1.7.0_71]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) [?:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) [?:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) [?:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_71]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_71] Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.7.0_71]
at java.util.concurrent.FutureTask.get(FutureTask.java:188) ~[?:1.7.0_71]
at org.springframework.integration.kafka.support.ConsumerConfiguration.executeTasks(ConsumerConfiguration.java:97) ~[spring-integration-kafka-1.0.0.M1.jar:?]
... 22 more Caused by: java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.utils.IteratorTemplate.next(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.consumer.ConsumerIterator.next(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at org.springframework.integration.kafka.support.ConsumerConfiguration$1.call(ConsumerConfiguration.java:67) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at org.springframework.integration.kafka.support.ConsumerConfiguration$1.call(ConsumerConfiguration.java:61) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) ~[?:1.7.0_71]
... 3 more 2015-01-20 12:51:48.218 UTC || ERROR || [task-scheduler-3 ] || [LoggingHandler] - org.springframework.messaging.MessagingException: Consuming from Kafka failed; nested exception is java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state
at org.springframework.integration.kafka.support.ConsumerConfiguration.executeTasks(ConsumerConfiguration.java:110)
at org.springframework.integration.kafka.support.ConsumerConfiguration.receive(ConsumerConfiguration.java:86)
at org.springframework.integration.kafka.support.KafkaConsumerContext.receive(KafkaConsumerContext.java:56)
at org.springframework.integration.kafka.inbound.KafkaHighLevelConsumerMessageSource.receive(KafkaHighLevelConsumerMessageSource.java:41)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:124)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:192)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at org.springframework.integration.kafka.support.ConsumerConfiguration.executeTasks(ConsumerConfiguration.java:97)
... 22 more Caused by: java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(Unknown Source)
at kafka.utils.IteratorTemplate.next(Unknown Source)
at kafka.consumer.ConsumerIterator.next(Unknown Source)
at org.springframework.integration.kafka.support.ConsumerConfiguration$1.call(ConsumerConfiguration.java:67)
at org.springframework.integration.kafka.support.ConsumerConfiguration$1.call(ConsumerConfiguration.java:61)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
... 3 more    

我正在使用以下版本的卡夫卡和动物园管理员。

kafka_2.9.2-0.8.1.1

动物园管理员-3.4.6

http://www.springframework.org/schema/integration/kafka/spring-集成-kafka-1.0.xsd

玻璃鱼3

它工作正常,但突然开始持续失败。

其他 Spring XD 应用程序中的其他消费者从同一 kafka 服务器和 Zookeeper 读取数据工作正常。

运行在其他应用程序中的其他消费者组是完全不同的。


看起来这是一个问题:https://jira.spring.io/browse/INTEXT-112

您介意尝试使用您的应用程序吗M2发布?

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Kafka 消费失败迭代器处于失败状态 的相关文章

随机推荐