spring cloud Stream kafka:“调度程序没有订阅者”错误

2024-06-14

我正在使用kafka binder测试Spring Cloud Stream,但出现错误

引起原因:org.springframework.messaging.MessageDeliveryException:调度程序没有频道“unknown.channel.name”的订阅者。;

pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.4.0.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>1.1.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        <version>1.1.2.RELEASE</version>
    </dependency>
</dependencies>

该玩具应用程序是模仿秘书在员工和老板之间传递请求。

员工界面:

public interface SecretaryServingEmployee {
    @Output
    MessageChannel inbox();

    @Input
    SubscribableChannel rejected();

    @Input
    SubscribableChannel approved();

}

boss界面:

public interface SecretaryServingBoss {   
@Input
SubscribableChannel inbox();

@Output
MessageChannel rejected();

@Output
MessageChannel approved();      
}

应用程序属性

server.port=8080
spring.cloud.stream.bindings.inbox.destination=inbox
spring.cloud.stream.bindings.approved.destination=approved
spring.cloud.stream.bindings.rejected.destination=rejected

员工.java

@EnableBinding(SecretaryServingEmployee.class)
@Component
public class Employee {
    private static Logger logger = LoggerFactory.getLogger(Employee.class);

    private SecretaryServingEmployee adminAssistent;

    @Autowired
    public Employee(SecretaryServingEmployee adminAssistent) {
        this.adminAssistent = adminAssistent;
    }

    @InboundChannelAdapter(value = "inbox")
    public String messageSource() {
        return "You are handsome!!";  // This is the message sent to boss
    }

    @ServiceActivator(inputChannel="approved")
    public void checkApproved(String message) {
        logger.info(":-)");
    }

    @ServiceActivator(inputChannel="rejected")
    public void checkRejected(RejectionLetter letter) {
        logger.warn(":-(");
    } 
}

老板.java

@EnableBinding(SecretaryServingBoss.class)
@Component
public class Boss {
    private SecretaryServingBoss adminAssistent;

    @Autowired
    public Boss(SecretaryServingBoss adminAssistent) {
        this.adminAssistent = adminAssistent;
    }

    @ServiceActivator(inputChannel="inbox")
    public void sign(String content) {
        if (content.contains("You are handsome")) {
            adminAssistent.approved().send(message("nice work"));
        }
        else {
            adminAssistent.rejected().send(message("Don't send me shit"));
        }       
    }

    private <T> Message<T> message(T content) {
        return MessageBuilder.withPayload(content).build();
    }   

}

这是跟踪的一部分

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.5.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) ~[spring-retry-1.1.3.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:172) ~[spring-retry-1.1.3.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.5.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1800(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.5.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:772) [spring-kafka-1.0.5.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_121]
    at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_121]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_121]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    ... 29 common frames omitted

您的应用程序类似乎未正确进行组件扫描。

如果您将其作为 Spring Boot 应用程序运行,您能否确保要扫描的类是否已正确打包。

例如,默认情况下@SpringBootApplication的组件扫描会查看同一包下的类,其中@SpringBootApplication带注释的类是。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spring cloud Stream kafka:“调度程序没有订阅者”错误 的相关文章

随机推荐

  • 设置一个值来指示线程已完成安全吗?

    我想将一个耗时的进程委托给我的 C 程序中的一个单独的线程 使用 boost 库 我编写了如下代码 thrd new boost thread boost bind myclass mymethod this finished flag W
  • Ajax.BeginForm 可以重定向到新页面并传递路由值

    此链接显示如何重定向 Ajax BeginForm 但不显示如何将路由值传递到新视图 可以重定向到新页面的 Ajax BeginForm https stackoverflow com questions 9391201 ajax begi
  • 如何检查值 null 并在 NSDictionary 中替换它

    现在 我正在使用 NSUserDefault 和 NSDictionary 我将 NSDictionary 保存在 NSUserDefault 中 不幸的是我不能 因为 NSDictionary 返回 Json 具有空值 我需要检查 NSD
  • 使 Java JScrollpane 仅垂直滚动

    我希望我的整个 JFrame 能够垂直滚动 我添加了以下代码 但它只创建了一个水平滚动条 frame setContentPane new JScrollPane new GradeQuickResource 我想做相反的事情 我只想要一个
  • 任务名称在 AppEngine 中墓碑化的时间有多长?

    将任务添加到 AppEngine 中的队列时 任务名称在一定时间内无法重复使用 称为 逻辑删除 我在文档中找不到墓碑的持续时间或任何修改它的方法 附近唯一引人注目的是TaskOptions Builder etaMillis 它设置任务的预
  • powershell 将文件添加到 zip

    我正在尝试使用 powershell 将文件添加到 zip 文件 我可以创建 zip 文件 但无法弄清楚如何将我的文件添加到其中 我在用着 zipfilename c cwRsync backup zip file c cwRsync ba
  • 子组调用索引是否映射到 gl_LocalInitationIndex?

    我需要计算吗gl SubgroupID gl SubgroupSize gl SubgroupInvocationID 或者我可以使用gl LocalInvocationIndex 单个子组内的调用是否连续gl SubgroupInvoca
  • C:从 char 数组打印会产生错误字符

    K N King s 的解决方案C 编程 现代方法 第二版 第 8 章 编程项目 14 产生不同的输出 包括正确的和错误的 示例如下所示 Reversal of sentence you can t swallow a cage can y
  • 本地 Flask 实例中有多个静态路径[重复]

    这个问题在这里已经有答案了 是否可以为我的本地开发 Flask 实例添加更多静态路径 我想要默认static用于存储站点的 js css images 文件的文件夹和另一个文件夹 例如designs保留我的特定资产 我不想放置designs
  • 随机分配工作地点,每个地点不得超过指定员工人数

    我正在尝试在位置列表中选择唯一的随机发布 招聘员工位置 所有员工都已发布在这些位置 我正在尝试为他们生成一个新的随机发布位置 其 位置 条件为 员工新 随机位置将不等于他们的家乡 并且随机选择的员工及其职称必须小于或等于 地点 表中的 地点
  • 开启TK onRenderFrame和onUpdateFrame的区别?

    我目前正在使用 OpenTK 框架和 OpenGL 用 C 编写 Jump n Run 游戏 Open TK 提供预设功能 例如GameWindow Run or GameWindow onUpdateFrame onRenderFrame
  • gnuplot - 删除行标题

    我尝试搜索 但找不到针对这种特定情况的解决方案 在我的图中 我正在比较两条痕迹 我使用的是折线图 两条迹线都用不同的颜色绘制 plot delay try1 dat using 1 2 title With CloneScale with
  • 唤醒单个线程而不是 pthread 中的忙等待

    我不确定标题是否反映了我在这里提出的问题 但这是我在没有很长的标题的情况下能做的最好的事情 我正在尝试实施一个worker thread模型中pthreads 我想从中产生一组线程main函数以及此后的main线程将工作委托给工作人员 并等
  • ffmpeg 找不到 vcodec h264

    我是 ffmpeg 的新手 正在 Windows 上使用它 我尝试使用 H 264 vcodec h264 转换 avi 文件 收到此错误 未知编码器 h264 使用 acodec 的 mp3 也会出现 未知编码器 错误 有人可以帮我解决这
  • Python 多元简单线性回归

    注意这是not关于多元回归的问题 这是一个关于在 Python NumPy 2 7 中多次进行简单 单变量 回归的问题 我有两个m x n arrays x and y 这些行彼此对应 每对都是用于测量的 x y 点的集合 那是 plt p
  • 如何在 Qt 应用程序中嵌入 Python 解释器?

    有没有一种简单的方法可以将 Python 解释器嵌入到 Qt 应用程序中 如果可能的话 我希望有一个跨平台的解决方案 这就是目的PythonQt http pythonqt sourceforge net 它支持 Windows Linux
  • 文件输出流到文件输入流

    将 FileOutputStream 转换为 FileInputStream 的最简单方法是什么 一段代码就很好 这可能对您有帮助 http ostermiller org convert java outputstream inputst
  • C# HttpClient 自定义标头每个请求

    我注意到使用 HttpClient 是NOT修改时线程安全HttpClient DefaultRequestHeaders但我想提出尽可能多的请求 我每个请求都需要一个自定义标头 其他两个标头始终相同 URL 也发生了一些变化 http e
  • Xcode 未创建 .h 文件

    我使用的是 Xcode 6 0 1 我创建了一个自动生成 h 和 m 文件的新项目 当我尝试添加新的 Objective C 文件时 它仅创建 m 文件 h 文件未生成 我尝试了所有首选项 但没有找到一个显示 生成 h 文件 的选项 出于好
  • spring cloud Stream kafka:“调度程序没有订阅者”错误

    我正在使用kafka binder测试Spring Cloud Stream 但出现错误 引起原因 org springframework messaging MessageDeliveryException 调度程序没有频道 unknow