Python
Java
PHP
IOS
Android
Nodejs
JavaScript
Html5
Windows
Ubuntu
Linux
Kafka Stream:KTable 物化
如何确定主题的 KTable 实现何时完成 例如假设 KTable 有几百万行 伪代码如下 KTable
ApacheKafka
apachekafkastreams
Kafka Streams 重新平衡高吞吐量 kafka-streams 服务上的延迟峰值
我们开始使用 Kafka 流 我们的服务是一个非常简单的无状态消费者 我们对延迟的要求很严格 当消费者组重新平衡时 我们面临着过高的延迟问题 在我们的场景中 重新平衡会相对频繁地发生 滚动更新代码 扩大 缩小服务 容器被集群调度程序洗牌 容
Java
ApacheKafka
apachekafkastreams
如何删除/清除 Kafka Streams 中的状态存储?
我有一个习惯Transformer在我的 kafka streams DSL 的末尾实现 并带有持久的变更日志KeyValueStore绑定到它 几周以来 我在商店里放了太多的数据 现在 每当我加载应用程序时 它就会消耗太多的内存 然而 应
ApacheKafka
apachekafkastreams
AVRO 原始类型的 Serde 类
我正在用 Java 编写一个 Kafka 流应用程序 它接受由连接器创建的输入主题 该连接器使用架构注册表和 avro 作为键和值转换器 连接器生成以下模式 key schema int value schema type record n
Java
ApacheKafka
Avro
apachekafkastreams
confluentplatform
与 KafkaStreams 的窗口结束外连接
我有一个 Kafka 主题 我希望消息具有两种不同的密钥类型 旧的和新的 IE 1 new 1 old 2 new 2 old 密钥是唯一的 但有些可能会丢失 现在 使用 Kotlin 和 KafkaStreams API 我可以记录具有相
ApacheKafka
outerjoin
apachekafkastreams
卡夫卡流 RoundRobinPartitioner
我编写了一个kafka流代码 使用kafka 2 4 kafka客户端版本和kafka 2 2服务器版本 我的主题和内部主题有 50 个分区 我的 kafka 流代码具有 selectKey DSL 操作 并且我有 200 万条使用相同 K
Java
ApacheKafka
apachekafkastreams
Kafka Streams:如何使用 persistenceKeyValueStore 从磁盘重新加载现有消息?
我的代码当前使用 InMemoryKeyValueStore 这避免了对磁盘或 kafka 的任何持久化 我想使用rocksdb Stores persistentKeyValueStore 以便应用程序将从磁盘重新加载状态 我正在尝试实现
apachekafkastreams
rocksdb
KafkaStreams serde异常
我正在使用 Kafka 和流技术 我为 KStream 创建了一个自定义序列化器和反序列化器 我将使用它来接收来自给定主题的消息 现在的问题是我正在以这种方式创建一个 serde JsonSerializer
Java
ApacheKafka
apachekafkastreams
为什么 Kafka Streams 强制对 GlobalKTable 状态存储禁用日志记录?
为什么全局表不能在 kafka 中启用日志记录 code if loggingEnabled throw new TopologyException StateStore storeName for global table must no
apachekafkastreams
Kafka流处理器线程安全吗?
我知道这个问题之前在这里被问过 卡夫卡流并发 但这对我来说很奇怪 根据文档 或者也许我遗漏了一些东西 每个分区都有一个任务 意味着不同的处理器实例 并且每个任务都由不同的线程执行 但是当我测试它时 我发现不同的线程可以获得不同的处理器实例
Java
Multithreading
apachekafkastreams
在 Kafka 流作业中进行同步数据库查询或静态调用是一个好习惯吗?
我使用Kafka Streams来处理实时数据 在Kafka Streams任务中 我需要访问MySQL来查询数据 并且需要调用另一个Restful服务 所有操作都是同步的 恐怕同步调用会降低流任务的处理能力 这是一个好的做法吗 或者有什么
ApacheKafka
Bigdata
Streaming
apachekafkastreams
嵌入式Kafka:KTable+KTable leftJoin产生重复记录
我来寻求神秘的知识 首先 我有两对主题 每对中的一个主题融入另一个主题 后面的主题形成两个KTable 用于KTable KTable leftJoin 问题是 当我向任一 KTable 生成一条记录时 leftJoin 会生成三个记录 我
Kafka 比较键的连续值
我们正在构建一个应用程序来从传感器获取数据 数据被传输到 Kafka 消费者将其发布到不同的数据存储 每个数据点将具有代表传感器状态的多个属性 在其中一个消费者中 我们希望仅当值发生变化时才将数据发布到数据存储 例如如果有温度传感器每 10
ApacheKafka
apachekafkastreams
使用 Apache Kafka Streaming 解析 JSON 数据
我有一个从 Kafka 主题读取 JSON 数据的场景 通过使用 Kafka 0 11 版本 我需要编写 Java 代码来流式传输 Kafka 主题中存在的 JSON 数据 我的输入是包含字典数组的 Json 数据 现在我的要求是获取 文本
Parsing
jsonschema
apachekafkastreams
我可以依靠 Kafka 流中的内存 Java 集合通过微调标点和提交间隔来缓冲事件吗?
一个自定义处理器 以简单的方式缓冲事件java util List in process 该缓冲区不是状态存储 每 30 秒 WALL CLOCK TIME punctuate 对此列表进行排序并刷新到接收器 假设只有单个分区源和接收器 需
ApacheKafka
apachekafkastreams
exactlyonce
punctuator
Kafka Streams stateStores 容错一次?
我们正在尝试使用 Kafka Streams 实现重复数据删除服务 总体而言 它将使用它的rocksDB状态存储来在处理过程中检查现有的密钥 如果我错了 请纠正我 但为了使这些 stateStore 也具有容错能力 Kafka Stream
ApacheKafka
apachekafkastreams
faulttolerance
为什么 Spark 应用程序会失败并出现“线程“main”java.lang.NoClassDefFoundError: ...StringDeserializer 中的异常”?
我正在开发一个 Spark 应用程序 该应用程序使用 Spark 和 Java 监听 Kafka 流 我使用kafka 2 10 0 10 2 1 我为 Kafka 属性设置了各种参数 bootstrap servers key deser
Java
Maven
apachespark
SparkStreaming
apachekafkastreams
变更日志/重新分区主题的复制因子应该是多少
我知道可以为 kafka 流配置复制因子这些内部主题 我们的应用程序用于复制因子为 3 的普通应用程序主题 但到目前为止我还没有为变更日志 重新分区主题配置复制因子 而我的假设是如果一个经纪人死亡 或由于某种原因领导者发生变化 kafka
ApacheKafka
apachekafkastreams
KStream 和 KTable 之间的时间语义
我正在尝试构建以下拓扑 使用 Debezium 连接器 我拉出 2 个表 我们称它们为表 A 和 DA 根据 DBZ 存储表行的主题具有结构 before after 我的拓扑中的第一步是根据这两个 表 主题创建 干净 的 KStream
apachekafkastreams
Kafka 流过滤:代理端还是消费者端?
我正在研究卡夫卡流 我想使用选择性非常低 几千分之一 的过滤器来过滤我的流 我正在看这个方法 https kafka apache org 0100 javadoc org apache kafka streams kstream KStr
ApacheKafka
kafkaconsumerapi
apachekafkastreams
«
1
2
3
4
5
»