KafkaStreams 同一应用程序中的多个流

2024-04-26

我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策。

假设我想将两个不同的事件放入其中KTables。我有一个制作人将这些消息发送给KStream那就是听那个话题。

据我所知,我不能对消息使用条件转发KafkaStreams,因此,如果流订阅了许多主题(例如,每个主题对应上述每条消息),我只能调用stream.to在单个接收器主题上 - 否则,我将不得不执行诸如 call 之类的操作foreach在流上并使用 a 发送消息KProducer到接收器主题。

上面建议使用单个流。我以为我可以在同一个应用程序中设置多个流,每个流监听一个主题,映射并转发到表接收器,但每次我尝试创建两个实例KafkaStreams,只有第一个初始化的订阅了其主题 - 另一个从客户端收到警告,表明其主题没有订阅。

我可以在同一个应用程序中设置多个流吗?如果可以,有什么特殊要求吗?

    class Stream(topic: String) {
      val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
      val streamsBuilder = new StreamsBuilder
      val topics = new util.ArrayList[String]
      topics.add(props.get("topic"))

      val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))

      def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
        builder.stream[String, String](
          topics,
          Consumed.`with`(String(), String())
        )
      }

      def init(): KafkaStreams = {
        val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)

        streams.start()

        streams
      }
    }

    class Streams() {

      val eventStream = new Stream("first_event") //looking good!
      val eventStream2 = new Stream("second_event") // no subscribers
      //if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
      val streams: KafkaStreams = eventStream.init()
      val streams2: KafkaStreams = eventStream2.init()

    }

流配置

    val streamConfig: Properties = {
        val properties = new Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
        properties
    }

我也希望有任何建议的替代方案


创建 KafkaStreams 时,您需要传递具有不同 application.id 的属性,例如:

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP1");
    StreamsBuilder builder = new SteamsBuilder();
    KStream stream1 = builder.stream("topic1");
    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

然后你应该创建另一个流:

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP2");
    StreamsBuilder builder = new SteamsBuilder();
    KStream stream2 = builder.stream("topic2");
    KafkaStreams streams2 = new KafkaStreams(builder, props);
    streams2.start();
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

KafkaStreams 同一应用程序中的多个流 的相关文章

  • 从主题读取后立即异步提交消息

    我正在尝试在阅读主题后立即提交一条消息 我已点击此链接 https www confluence io blog apache kafka spring boot application https www confluent io blo
  • 在 Kafka 生产者上启用幂等性是否会降低吞吐量

    我有卡夫卡制作人启用幂等性 没有启用一次语义或事务 在休息端点调用中 我启用它的原因是因为我不希望卡夫卡重试导致任何重复 我担心以下几点 幂等性会减慢我的端点速度吗 这个端点需要非常快 我读了 kafka api 文档 启用幂等性将使重试无
  • 卡夫卡消费者陷入(重新)加入组

    如果 kafka 版本 0 10 消费者尝试重新加入消费者组 其默认行为是什么 我正在将单个消费者用于消费者组 但似乎它在重新加入时受到了打击 每 10 分钟后 它会在消费者日志中打印以下行 2016 08 11 13 54 53 803
  • RocksDb sst 文件的 GUI 查看器

    我正在与 Kafka 合作 将数据保存到rocksdb 中 现在我想看看 Kafka 创建的数据库键和值 我下载了 FastNoSQL 并尝试但失败了 该文件夹包含 sst 文件 日志文件 当前文件 身份文件 锁定文件 日志文件 清单文件
  • 如何在Golang中创建kafka消费者组?

    可用的库是sarama https github com Shopify sarama 或其扩展萨拉玛簇 https github com bsm sarama cluster 但是没有提供消费者组示例 不在sarama https god
  • Spring Cloud Stream动态通道

    我正在使用 Spring Cloud Stream 想要以编程方式创建和绑定通道 我的用例是 在应用程序启动期间 我收到要订阅的 Kafka 主题的动态列表 如何为每个主题创建一个频道 我最近遇到了类似的场景 下面是我动态创建 Subscr
  • 卡夫卡高级消费者 error_code=15

    当尝试使用高级消费者 使用全新的消费者组 从 Kafka 进行消费时 消费者永远不会开始运行 当我将日志记录级别切换为调试时 我可以看到以下两行一遍又一遍地重复 DEBUG AbstractCoordinator 09 43 51 192
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • GCP Dataproc 作业未找到存储在存储桶中的 SSL pem 证书

    我有一个 GCP Dataproc 集群 我正在尝试部署一个 pyspark 作业 该作业使用 SSL 生成一个主题 pem 文件存储在存储桶 gs dataproc kafka code code 中 我正在使用下面所示的代码访问 pem
  • 批量插入成功后更新 Kafka 提交偏移量

    我有一个 spring kafka 消费者 它读取记录并将其移交给缓存 计划任务会定期清除缓存中的记录 我想仅在批次成功保存到数据库后更新 COMMIT OFFSET 我尝试将确认对象传递给缓存服务以调用确认方法 如下所示 public c
  • 带有 Kafka 消费者的 Spring Boot 作业调度程序

    我正在开发一个 POC 我想使用来自 Kafka 主题 用户 的消息 尝试实现消费者应该从 Kafka 主题读取消息 一旦 spring boot 调度程序在预定时间或 cron 时间触发 那么我们应该开始从 kafka 主题中一一消费现有
  • Kafka的消息键有什么特别的地方吗?

    我没有看到任何提及消息键 org apache kafka clients producer ProducerRecord key 除了它们可以用于主题分区 我可以自由地将我喜欢的任何数据放入密钥中 还是有一些我应该遵守的特殊语义 该密钥似
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • Kafka 一遍又一遍地重放消息 - 心跳会话已过期 - 标记协调器已死亡

    使用 python kafka api 从只有少量消息的主题中读取消息 Kafka 不断地一遍又一遍地重放队列中的消息 它从我的主题接收一条消息 返回每条消息内容 然后抛出ERROR Heartbeat session expired ma
  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 在 Confluence 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件?

    我已经成功地将用 Java 编写的简单自定义 Partitioner 类用于 Confluence 3 2 x Kafka 0 10 x 上的 Kafka Connect 接收器 我想升级到 Confluence 4 1 Kafka 1 1
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • 有没有办法重新分区 Kafka 流中的输入主题?

    我有一个由 byte 键控的主题 我想对其进行重新分区并通过消息正文中字段中的另一个键处理该主题 我发现有KGroupedStream and groupby功能 但它需要一个聚合函数来转换为 KTable KStream 我不需要聚合 我

随机推荐

  • CLOB 与 VARCHAR2 以及还有其他替代方案吗?

    我正在为我的应用程序使用 DevArt 的 dotConnect 和 Entity Developer 我已经使用实体优先功能创建了表 我注意到许多列类型都设置为 CLOB 我只有 MySQL 和 Microsoft SQL Server
  • 如何在按下/单击 TextField 时使用 TextFieldValue (带 FocusRequester)将光标设置到文本的任何部分

    我试图环顾四周 但找不到办法 force a focus set the cursor到文本末尾 并且仍然能够设置cursor到文本的任何部分时 按下 点击 With FocusRequester光标设置在文本的开头 但是TextField
  • Spring框架中Service和DAO接口的主要用途是什么?

    我是 Spring 框架的新手 我在谷歌上搜索了很多关于 spring mvc 的网站 我注意到为每个服务和 dao 创建了接口 但我有一些疑问如下 1 为每个service和dao创建接口的主要目的是什么 2 spring mvc的项目结
  • SQL Server VSS 编写器拒绝启动

    我已在 Windows 7 64 位计算机上安装了 Visual Studio 2012 我正在尝试安装 SQL Server Express LocalDB 但当安装程序尝试启动服务时出现错误 这是确切的错误消息 服务 SQL Serve
  • 日期时间类型转换器

    我有下面的代码将字符串转换为 T 类型 它适用于所有其他类型 但当 T 为 DateTime 类型时会出现错误 TypeConverter c TypeDescriptor GetConverter typeof T return T c
  • Avro 架构和生成的文件中的十进制数据类型支持

    这个问题涉及 Avro 版本 1 8 1 我们的 AVRO 模式中有以下字段 name sale price type bytes null logicalType decimal precision 18 scale 17 如您所见 该字
  • scalaz 中的 Store 是什么

    我试图理解Lenses in scalaz 令人惊讶的是没有找到类似的东西cats core 我遇到了所谓的Store这是一个类型别名 type StoreT F A B IndexedStoreT F A A B type Indexed
  • 如何在 Cython 中传递指向 c 函数的指针?

    我正在尝试打电话qsort在 Cython 中使用自定义比较函数 但我不明白如何传递函数引用 首先 我有一个结构 cdef struct Pair int i j float h 比较函数排序依据h cdef int compare con
  • javascript旋转数组元素[重复]

    这个问题在这里已经有答案了 大家好 我有一个任务 我有一个数组 4 7 3 6 9 我必须创建一个像这样的数组 4 7 3 6 9 9 4 7 3 6 6 9 4 7 3 3 6 9 4 7 7 3 6 9 4 我必须编写一个程序 其中数组
  • AspectJ 加载时间编织不适用于 Spring beans

    我正在开发一个项目 该项目使用 Spring 配置的 Java 而不是 xml 风格来连接依赖项 它还具有分析逻辑 应通过 AspectJ 将其编织到所需的方法上 通过注释 设置正在运行 我可以看到我想要的包中的类正在编织 并且分析信息已从
  • 一个目录下可以有两个oozieworkflow.xml文件吗?

    一个目录下可以有两个oozieworkflow xml文件吗 如果是这样 我如何指示 oozie runner 运行哪一个 您可以有两个工作流程文件 只需为它们指定唯一的名称 然后您可以通过设置oozie wf application pa
  • Qt for Android:无法签署应用程序的发布版本

    我正在使用 Qt 5 13 和 Qt Creator 4 9 2 我可以成功构建 Android 应用程序的调试版本 但是当我尝试编译发布版本时 我得到 16 57 35 过程 opt Qt 5 13 0 android armv7 bin
  • iOS Voice Over 和 Android 无法播报 Span 标签中的文本

    我们希望屏幕阅读器在节点关闭后宣布 项目已关闭 有趣的是 Chrome 上的 NVDA 正确地播报了该消息 而 Android 和 iOS Voice Over 则未能播报此消息 这是打字稿代码 HostListener keydown t
  • 在 Scala / Spark 中将纪元转换为日期时间

    我使用以下方法将表示 DateTime 的 String 转换为 unix time 纪元 def strToTime x String Long DateTimeFormat forPattern YYYY MM dd HH mm ss
  • Spacy 中的自定义句子分割

    I want spaCy使用我提供的句子分割边界而不是它自己的处理 例如 get sentences Bob meets Alice SentBoundary They play together gt Bob meets Alice Th
  • JDK 1.6.x G1 的经验(“垃圾优先”)

    我想知道最新JDK中G1垃圾收集器的体验如何 我懂了NullPointerException尽管代码没有改变并且在早期的 JDK 中表现正常 但我的程序中抛出了这个问题 垃圾收集器只会影响表现您的应用程序 而不是它的正确性 我一直在 Ecl
  • Python中的非阻塞套接字?

    是我 还是我找不到关于Python中非阻塞套接字的好教程 我不确定如何准确地工作 recv和 send在里面 根据 python 文档 至少是我的理解 recv ed or send ed 数据可能只是部分数据 那么这是否意味着我必须以某种
  • 数组对象内相同值的重复分组

    id year 2017 month 4 Confirm 0 id year 2017 month 4 Expired 25 id year 2017 month 4 Pending 390 id year 2017 month 5 Pen
  • 构建启用 COM 互操作的项目,而无需在构建过程中注册它

    在 Visual Studio 2010 中 我尝试构建一个启用 COM 互操作的 C 项目 但在构建过程中不注册它 但我DO需要程序集的类型库 tlb 文件 因此我可以从解决方案中的另一个 C 项目导入它 我还没有找到一种方法来做到这一点
  • KafkaStreams 同一应用程序中的多个流

    我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策 假设我想将两个不同的事件放入其中KTables 我有一个制作人将这些消息发送给KStream那就是听那个话题 据我所知 我不能对消息使用条件转发KafkaStrea