Python
Java
PHP
IOS
Android
Nodejs
JavaScript
Html5
Windows
Ubuntu
Linux
具有替代方案的重载方法值表
我有编译器抱怨的以下代码 val state KTable String String builder table BARY PATH Materialized as PATH STORE 错误信息 error home developer
Java
scala
ApacheKafka
apachekafkastreams
KafkaStreams 同一应用程序中的多个流
我正在尝试根据 KafkaStreams 的惯例和合理性做出实用的设计决策 假设我想将两个不同的事件放入其中KTables 我有一个制作人将这些消息发送给KStream那就是听那个话题 据我所知 我不能对消息使用条件转发KafkaStrea
ApacheKafka
apachekafkastreams
如何处理Kafka流中的不同时区?
因此 我正在评估 Kafka Streams 及其功能 看看它是否适合我的用例 因为我需要每 15 分钟 每小时 每天聚合传感器数据 并发现它由于其窗口功能而很有用 因为我可以通过应用创建窗口windowedBy on KGroupedSt
Java
ApacheKafka
apachekafkastreams
Kafka Streams - 减少大型状态存储的内存占用
我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
ApacheKafka
apachekafkastreams
使用 Kafka Streams 在输出中设置时间戳无法进行转换
假设我们有一个变压器 用 Scala 编写 new Transformer String V String V var context ProcessorContext override def init context Processor
scala
ApacheKafka
apachekafkastreams
即使没有消费者,消费者群体仍陷入“再平衡”
我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
ApacheKafka
apachekafkastreams
如何评估kafka流应用程序的消耗时间
我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
ApacheKafka
apachekafkastreams
使用kafka lib反序列化PRIMITIVE AVRO KEY
我目前无能力反序列化 avro PRIMITIVE 密钥在 KSTREAM 应用程序中 使用 avro 模式编码的密钥 在模式注册表中注册 当我使用 kafka avro console consumer 时 我可以看到密钥已正确反序列化
Java
ApacheKafka
Avro
apachekafkastreams
confluentschemaregistry
Kafka Streams 在 HDFS 上查找数据
我正在使用 Kafka Streams v0 10 0 1 编写一个应用程序 并希望通过查找数据来丰富我正在处理的记录 该数据 带时间戳的文件 每天 或每天 2 3 次 写入 HDFS 目录 我怎样才能将其加载到Kafka Streams应
使用 Kafka Streams 进行 OpenTracing - 如何?
我正在尝试将 Jaeger 跟踪集成到 K Streams 中 我计划将跟踪添加到几个最重要的管道中 并且想知道将 Traceid 从一个管道传递到另一个管道的好方法是什么 这是我到目前为止所做的 在流处理管道开始时 我启动一个服务器范围并
Java
ApacheKafka
apachekafkastreams
jaeger
如何在KafkaStream应用程序中获取partitionId和TopicName
我们如何从 KafkaStream 获取主题名称和分区 id 对于任何其他 Kafka 消费者 我们可以获得主题名称和分区 ID 如下所示 ConsumerRecords
ApacheKafka
apachekafkastreams
RocksDb sst 文件的 GUI 查看器
我正在与 Kafka 合作 将数据保存到rocksdb 中 现在我想看看 Kafka 创建的数据库键和值 我下载了 FastNoSQL 并尝试但失败了 该文件夹包含 sst 文件 日志文件 当前文件 身份文件 锁定文件 日志文件 清单文件
ApacheKafka
apachekafkastreams
rocksdb
rocks
KTable 应该发出的事件
我正在尝试测试一个拓扑 该拓扑作为最后一个节点 具有 KTable 我的测试是使用成熟的 Kafka 集群 通过 confluence 的 Docker 镜像 所以我not使用TopologyTestDriver 我的拓扑有键值类型的输入S
scala
ApacheKafka
apachekafkastreams
Kafka Streams - SerializationException:未知的魔术字节
我正在尝试创建一个处理 Avro 记录的 Kafka Streams 应用程序 但出现以下错误 Exception in thread streams application c8031218 8de9 4d55 a5d0 81c30051
Java
ApacheKafka
Avro
apachekafkastreams
confluentschemaregistry
如何以自定义方式从主题恢复全局存储?
假设我在从主题获取数据后将数据存储在 Globalstore 中时正在进行一些自定义处理 即我正在根据 message 的值创建自定义键 在本地删除状态后 它会以相同的方式再次恢复 Globalstore 吗 override def pr
ApacheKafka
apachekafkastreams
Kafka Streams - 低级处理器 API - RocksDB TimeToLive(TTL)
我正在尝试使用低级处理器 API 我正在使用处理器 API 对传入记录进行数据聚合 并将聚合记录写入 RocksDB 但是 我想保留在rocksdb中添加的记录仅在24小时内处于活动状态 24 小时后 记录应被删除 这可以通过更改 ttl
ApacheKafka
apachekafkastreams
rocksdb
Kafka Streams 应用程序无尽的重新平衡
我们正在运行一个卡夫卡流应用程序并遇到一个奇怪的问题 我们正在使用全局状态存储和多个其他状态存储 我们的应用程序已加载所有数据 状态存储中现在有大量信息 现在 当我们尝试关闭应用程序并再次将其恢复 一些配置更改 时 它会进入无休止的重新平衡
Java
ApacheKafka
apachekafkastreams
有什么办法可以让kafka流暂停一段时间然后再恢复吗?
我们有一个要求 即使用 Kafka Streams 从 Kafka 主题读取数据 然后通过会话池通过网络发送数据 然而 有时 网络调用有点慢 我们需要经常暂停流 以确保网络不会过载 目前 我们将数据捕获到流中并将其加载到执行器服务 然后通过
Java
ApacheKafka
apachekafkastreams
我可以在 Kafka Broker 所在的同一台机器上运行 Kafka Streams 应用程序吗?
我有一个 Kafka Streams 应用程序 它从几个主题获取数据并连接数据并将其放入另一个主题中 卡夫卡配置 5 kafka brokers Kafka Topics 15 partitions and 3 replication fa
ApacheKafka
kafkaconsumerapi
kafkaproducerapi
apachekafkastreams
事件计数的窗口聚合
我对我的卡夫卡事件进行了分组 private static void createImportStream final StreamsBuilder builder final Collection
Java
ApacheKafka
kafkaconsumerapi
apachekafkastreams
1
2
3
4
5
»