Kafka 流过滤:代理端还是消费者端?

2023-12-02

我正在研究卡夫卡流。我想使用选择性非常低(几千分之一)的过滤器来过滤我的流。我正在看这个方法:https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStream.html#filter(org.apache.kafka.streams.kstream.Predicate)

但我找不到任何证据,如果过滤器将由消费者评估(我真的不想将大量 GB 转移给消费者,只是将它们扔掉),或者在经纪人内部(耶!)。

如果在消费者方面进行评估,有什么办法,如何在经纪人中做到这一点?

Thanks!


Kafka不支持broker端过滤。如果您使用 Streams API,过滤将在您的应用程序中完成(谓词不会由KafkaConsumer但在拓扑的“处理器节点”内——即在 Streams API 运行时代码内)。

这可能有帮助:https://docs.confluence.io/current/streams/architecture.html

不支持代理端过滤的原因是,代理仅使用(1)字节数组作为键和值数据类型,并使用(2)零复制机制来实现高吞吐量。需要代理端过滤,以在代理端反序列化数据,这将严重影响性能(反序列化成本和无零复制优化)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 流过滤:代理端还是消费者端? 的相关文章

  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab
  • 是否可以使用Kafka传输文件?

    我每天都会生成数千个文件 我想使用 Kafka 进行流式传输 当我尝试读取该文件时 每一行都被视为一条单独的消息 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息 以及消费者如何将 Kafka 主题中的每条消息写入单独的文件中
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • Kafka 主题删除不起作用

    我使用的是 Kafka 0 8 2 版本 在开发过程中 我想我可能需要删除一个主题 所以我所做的是将以下行放入服务器配置文件中并启动两个 kafka 服务器 delete topic enable true 当我需要删除一个主题并运行以下命
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • Apache kafka - 消费者延迟选项

    我想在 Kafka 中为特定主题稍稍延迟启动一个消费者 具体来说 我希望消费者在从生成消息的时间起经过特定的时间延迟后开始使用该主题的消息 Kafka 中有任何属性或选项可以启用它吗 我们对火花流做了同样的事情 我希望 这种方法也适合您 这
  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 使用 kafka java api 的 Avro 序列化器和反序列化器

    Kafka Avro 序列化器和反序列化器无法工作 我尝试使用 kafka 控制台消费者消费消息 我可以看到发布的消息 public class AvroProducer
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • kafka消费者群体正在重新平衡

    我正在使用 Kafka 9 和新的 java 消费者 我正在循环内进行轮询 当代码尝试执行 Consumer commitSycn 时 由于组重新平衡 我收到 commitfailedexcption 请注意 我将 session time
  • 我的 Kafka 流应用程序刚刚退出,代码为 0,什么也不做

    为了尝试 Kafka 流 我这样做了 public static void main String args final StreamsBuilder builder new StreamsBuilder final Properties
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • 卡夫卡流:RocksDB TTL

    据我了解 默认 TTL 设置为无穷大 非正数 但是 如果我们需要在存储中保留数据最多 2 天 我们可以使用 RocksDBConfigSetter 接口实现 即 options setWalTtlSeconds 172800 进行覆盖吗 或
  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • Spring Boot 和 Kafka,Producer 抛出 key='null' 异常

    我正在尝试使用Spring Boot with Kafka and ZooKeeper with Docker docker compose yml version 2 services zookeeper image wurstmeist
  • 为什么我无法从外部连接到 Kafka?

    我在 ec2 实例上运行 kafka 所以amazon ec2实例有两个ip 一个是内部ip 第二个是外部使用的 我从本地计算机创建了生产者 但它重定向到内部 IP 并给我连接不成功的错误 任何人都可以帮助我在 ec2 实例上配置 kafk

随机推荐

  • 使用 Java 在 GUI 编程中混合 awt 和 swing

    我在 SO 上读到混合 awt 和 swing 对于 Java 中的 GUI 编程来说并不是一个好的方法 但我无法找到任何在使用 swing 时不使用某些 awt 组件的示例 例如 即使使用 swing 我遇到的大多数示例也会使用 awt
  • MySQL:如何查询父子关系?

    假设有如下表记录 TABLE foo foo id foo parent id 1 NULL 2 NULL 3 1 4 2 5 1 6 1 7 2 8 1 9 NULL
  • Log4J 2 查找值在加载/呈现之前在配置中使用

    我正在使用 SystemPropertiesLookup 查找来配置我的 Log4J2 配置 系统属性被设置为我的主要方法中的第一行 问题是 当 Log4J 加载配置时 尚未调用 main 方法 因此系统属性尚未填充 这是我的 log4j2
  • 模板类 - 无法解析的外部符号[重复]

    这个问题在这里已经有答案了 我经常收到这个错误 但我不知道为什么 有人可以帮我找到原因吗 编辑 删除代码 将实现 您的方法定义 与类声明一起放入标头中 请参阅这在 C 常见问题解答中 一些编译器支持 export 关键字来按照您的方式执行此
  • 获取上次打开的 MS Word 文档对象

    我有一个从 MS Word 2003 模板 dot 中的 VBA AutoNew 子函数调用的 python 脚本 因此每次从此 Word 模板创建文档时它都会运行 第三方应用程序从此模板创建文档 第三方应用程序如何设置文档存在许多格式问题
  • 多态类中的虚拟析构函数

    我知道只要有一个多态基类 该基类就应该定义一个虚拟析构函数 这样当一个指向派生类对象的基类指针被删除时 它会先调用派生类的析构函数 如果我在这里错了 请纠正我 另外 如果基类析构函数是非虚拟的 则删除指向派生对象的基类指针将是未定义的行为
  • 如何在 .NET MAUI Flyout Shell 应用程序中自定义标题?

    如何缩小标题间距 当我更改汉堡图标时 其图标颜色始终为白色 怎么了 我可以设置标题字体大小和字体系列吗 在 Android 上可以设置contentInsetLeft contentInsetStart contentInsetStartW
  • 如何在 Python 中对文本文件中的数字求和

    我有一个代码依赖于我读取一个文本文件 在有数字的地方打印数字 在有字符串而不是数字的地方打印特定的错误消息 然后将所有数字相加并打印它们的总和 然后只保存编号到新的文本文件 我已经尝试这个问题几个小时了 我得到了下面写的内容 我不知道为什么
  • PyQt/PySide中连接点击信号时lambda和partial的区别

    当将一组按钮中的多个单击信号连接到带有参数的单个槽函数时 我遇到了信号槽问题 lambda and functools partial可以使用如下 user user button clicked connect lambda callus
  • 如何在 IntelliJ 中调试多线程应用程序?

    我在 IntelliJ IDEA 14 0 2 中遇到了一个关于多线程和断点的奇怪问题 断点之后的代码会在断点处停止之前执行 import java util concurrent atomic AtomicInteger public c
  • 使用 jQuery、JSON 和 AJAX 填充下拉列表

    就像标题所说 我正在尝试使用 jQuery JSON 和 AJAX 创建一个下拉菜单 尽管我熟悉理论但尚未将其付诸实践 任何建议 演示代码片段或教程将不胜感激 因为我希望有一个最好的开始 提前致谢 您需要执行 getJSON 调用以在 do
  • 无法使用 Flask 从下拉列表中获取在 python 中选择的值

    这个问题可能是重复的 但我已经检查了此类相关问题的所有答案 但无法解决 我试图从由数字组成的下拉菜单中获取值 然后我想将数字与值进行比较并根据比较显示文本 Eg if value selected from dropdown gt 3 di
  • 在设定的时间从睡眠中唤醒应用程序

    我想让我的应用程序进入睡眠状态 然后在设定的时间唤醒它 我让它睡觉但不会醒来 这设置了唤醒锁 private void setWakeLock System out println wakelock PowerManager pm Powe
  • ReactJS、event.currentTarget 与 Vanilla Javascript 的行为不同

    我想知道是否有类似的事情event currentTargetReactJS中存 在问题 使用event targetonclick 是我得到的childDiv而不是parentDiv 普通 JavaScript 示例 document g
  • 如何使用 bash 脚本计算单词中最常出现的 3 个字母序列

    我有一个示例文件 例如 XYZAcc ABCAccounting Accounting firm Accounting Aco Accounting Acompany Acoustical consultant 这里我需要 grep 一个单
  • 根据一个向量对多个向量进行排序[重复]

    这个问题在这里已经有答案了 我有四个向量 其中包含圆心的 x y 半径和重量信息 我想按重量顺序对它们进行排序 从最高到最低 但我真的不知道如何或从哪里开始 我可以把所有的向量放在一个Eigen Tensor如果有帮助的话 将收集的数据保存
  • Jqgrid 许可证(具有 MIT/GPL v2)与 Guriddo jqGrid JS(知识共享许可证)

    我在网站应用程序的开发环境中使用 jqGrid 4 5 4 版本 并希望在商业网站上发布应用程序时使用相同的版本 我的疑问是 最近我看到了 Guriddo Jqgrid 4 7 1 的最新版本及其新许可证 知识共享许可 请告诉我现在可以在商
  • 如何使Code Runner在外部终端(命令提示符)中运行?

    所以 基本上Visual Studio Code中的Code Runner可以在集成终端中运行 我怎样才能让它在外部终端中运行 这是命令提示符 因为我需要向我的同学展示我的程序的输出 所以通过集成的终端显示它并不方便 我知道有一个像 Dev
  • C++ 中奇怪的 double 到 int 转换行为

    以下程序显示了我在 C 中看到的奇怪的 double 到 int 转换行为 include
  • Kafka 流过滤:代理端还是消费者端?

    我正在研究卡夫卡流 我想使用选择性非常低 几千分之一 的过滤器来过滤我的流 我正在看这个方法 https kafka apache org 0100 javadoc org apache kafka streams kstream KStr