Python
Java
PHP
IOS
Android
Nodejs
JavaScript
Html5
Windows
Ubuntu
Linux
如何配置 Spring Boot Kafka 客户端使其不尝试连接
这与Spring Boot Kafka 客户端有 断路器 吗 https stackoverflow com q 69914621 2886891 但我仍然认为这是一个不同的问题 我们需要配置 Spring Boot Kafka 客户端 以
Java
springboot
configuration
apachekafkastreams
springKafka
如何将 Kafka 承诺的消费者偏移量更改为所需的偏移量
我有卡夫卡流应用程序 我的应用程序正在成功处理事件 如何使用所需的偏移量更改 Kafka 提交的消费者偏移量以重新处理 跳过事件 我试过如何更改主题的起始偏移量 https stackoverflow com questions 29791
ApacheKafka
apachezookeeper
kafkaconsumerapi
apachekafkastreams
卡夫卡消费者重新平衡时间太长
我有一个 Kafka Streams 应用程序 它从几个主题获取数据并连接数据并将其放入另一个主题中 卡夫卡配置 5 kafka brokers Kafka Topics 15 partitions and 3 replication fa
ApacheKafka
apachekafkastreams
如果一个代理关闭,流应用程序中的 KafkaStream EXACTLY_ONCE 会导致重新平衡失败
我有一个 Kafka 流应用程序 其中 kafka streams 和 kafka clients 均为 2 4 0 具有以下配置 properties put StreamsConfig BOOTSTRAP SERVERS CONFIG
Java
ApacheKafka
apachekafkastreams
升级到 kafka-streams:5.5.0-css (Apache Kafka 2.5.0) 后获取 GlobalKTable 的存储崩溃 [已解决]
我有一个使用 GlobalKTable 的 Spring Boot 应用程序 它工作正常 直到从 5 3 2 css 更新到 kafka streams 5 5 0 css 与 Apache Kafka 2 5 0 兼容的 Confluen
Java
Spring
ApacheKafka
apachekafkastreams
springKafka
从 JSON 到 Avro 的 Kafka 流
我尝试使用 Kafka Stream 将带有 String JSON 消息的主题转换为另一个主题 作为 Avro 消息 流主要方法 streamsConfiguration put StreamsConfig KEY SERDE CLASS
json
ApacheKafka
Avro
apachekafkastreams
具有状态存储的 Kafka Streams - 应用程序重新启动时重新处理消息
我们有以下带有两个变压器的拓扑 每个变压器都使用持久状态存储 kStreamBuilder stream inboundTopicName transform gt new FirstTransformer FIRST STATE STOR
ApacheKafka
apachekafkastreams
InvalidStateStoreException:状态存储未在 Kafka 流中打开
StreamsBuilder builder new StreamsBuilder Map
ApacheKafka
apachekafkastreams
了解 kafka 流分区分配器
我有两个主题 一个有 3 个分区 一个有 48 个分区 最初我使用默认分配器 但是当消费者 kubernetes 中的 pod 崩溃时我遇到了一些问题 发生的情况是 当 Pod 再次启动时 它从具有 3 个分区的主题重新分配分区 并从具有
Java
springboot
ApacheKafka
apachekafkastreams
了解 Kafka Streams 中处理器实现中的事务
在使用 Kafka Streams 的处理器 API 时 我使用如下内容 context forward key value context commit 实际上 我在这里所做的就是每分钟从状态存储发送一个状态到接收器 在 init 方法中
ApacheKafka
apachekafkastreams
如何实现通用 Kafka Streams 反序列化器
我喜欢 Kafka 但讨厌编写大量序列化器 反序列化器 所以我尝试创建一个GenericDeserializer
genericprogramming
apachekafkastreams
通过查找数据丰富 KStream 的理想方法
我的流有一个名为 类别 的列 并且我在不同商店中为每个 类别 提供了额外的静态元数据 它每隔几天更新一次 进行此查找的正确方法是什么 Kafka 流有两种选择 在 Kafka Streams 之外加载静态数据并使用KStreams map
apachekafkastreams
新建的 KTable 不返回任何内容
我正在尝试使用 KTable 来消费来自 Kafka 主题的事件 但是 它什么也没返回 当我使用 KStream 时 它返回并打印对象 这实在是太奇怪了 生产者和消费者可以在这里找到 https github com pavankjadda
apachekafkastreams
Kafka Spring Cloud Stream 的多个 @EnableBinding
我正在尝试设置一个侦听 Kafka 的 Spring Boot 应用程序 我正在使用 Kafka Streams Binder 用一个简单的 EnableBinding EnableBinding StreamExample StreamP
springboot
ApacheKafka
apachekafkastreams
springcloudstream
处理 Kafka 流中的异常
已经浏览了多个帖子 但其中大多数与处理错误消息相关 而不是处理它们时的异常处理 我想知道如何处理流应用程序收到的消息并且处理消息时出现异常 异常可能是由于多种原因造成的 例如网络故障 RuntimeException 等 有人可以建议什么是
Java
ApacheKafka
apachekafkastreams
Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:“监听器”不能为空
我是 Kafka Streams 和 Spring Cloud Stream 的新手 但在将集成相关代码移至属性文件中方面阅读过有关它的好文章 以便开发人员可以主要关注事物的业务逻辑方面 这里我有我的简单应用程序类 package com
Spring
springboot
ApacheKafka
apachekafkastreams
springcloudstream
Kafka Stream与KTable一对多关系Join
我有一个卡夫卡流 比如说博客和一个卡夫卡表 比如说与这些博客相关的评论 来自 kafka 流的键可以映射到 Kafka 表中的多个值 即一个博客可以有多个评论 我想将这两个连接起来并创建一个带有评论 id 数组的新对象 但是当我进行连接时
ApacheKafka
apachekafkastreams
Kafka Streams 可以配置为等待 KTable 加载吗?
我正在使用物化 KTable 与我的 KStream 进行左连接 而流位于左侧 但是 它似乎立即处理 无需等待当前版本的 KTable 加载 我的 KTable 源主题中有很多值 当我启动应用程序时 很多连接失败 好吧 不是真的 因为它是左
ApacheKafka
apachekafkastreams
Kafka Stream groupBy 行为:聚合的许多中间输出/更新
我正在尝试使用 Kafka Stream 来聚合人们的某些属性 我有一个像这样的卡夫卡流测试 new ConsumerRecordFactory Array Byte Character input new ByteArraySeriali
ApacheKafka
apachekafkastreams
状态存储可能已迁移到另一个实例
当我尝试从流访问状态存储时 出现以下错误 状态存储 计数存储可能已迁移到另一个实例 当我尝试从商店访问 ReadOnlyKeyValueStore 时 收到迁移到其他服务器时的错误消息 但我只有一个经纪人正在运行 package com m
ApacheKafka
apachekafkastreams
«
1
2
3
4
5
»