Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时抛出错误

2024-01-08

我目前正在 Dataproc 上运行 Spark 作业,在尝试重新加入组并从 kafka 主题读取数据时遇到错误。我做了一些挖掘,但不确定问题是什么。我有auto.offset.reset set to earliest所以它应该从最早可用的非提交偏移量中读取,最初我的火花日志如下所示:

19/04/29 16:30:30 INFO     
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-11 to offset 5553330.
19/04/29 16:30:30 INFO     
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-2 to offset 5555553.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-3 to offset 5555484.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-4 to offset 5555586.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-5 to offset 5555502.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-6 to offset 5555561.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-7 to offset 5555542.```

但是接下来的下一行我尝试从服务器上不存在的偏移量读取时遇到错误(您可以看到分区的偏移量与上面列出的偏移量不同,所以我不知道为什么它会尝试读取表单该偏移量,这是下一行的错误:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
out of range with no configured reset policy for partitions: 
{demo.topic-11=4544296}

有什么想法可以解释为什么我的 Spark 工作不断回到这个偏移量(4544296),而不是它最初输出的偏移量(5553330)?

这似乎是自相矛盾的 w a) 它所说的实际偏移量和它尝试读取的偏移量 b) 说没有配置重置策略


这个答案迟了一年,但希望能帮助其他面临类似问题的人。

通常,当消费者尝试读取 Kafka 主题中不再存在的偏移量时,就会出现此行为。偏移量不再存在,通常是因为它已被 Kafka Cleaner 删除(例如由于保留或压缩策略)。然而,消费者组仍然是 Kafka 已知的,并且 Kafka 保留了主题“demo.topic”及其所有分区的组“demo-group”的最新消费消息的信息。

因此,auto.offset.reset配置不会有任何影响,因为不需要重置。相反,卡夫卡了解消费者组。

除此之外Fetcher只告诉您主题的每个分区内最新的可用偏移量。确实如此not自动意味着它实际上轮询直到此偏移量的所有消息。 Spark 决定每个分区实际消耗和处理多少消息(基于例如配置maxRatePerPartition).

要解决此问题,您可以更改消费者组(在这种特殊情况下这可能不是您想要的),或者通过使用手动重置消费者组“演示组”的偏移量

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group demo-group --topic demo.topic --partition 11 --to-latest

根据您的要求,您可以使用该工具重置主题每个分区的偏移量。帮助功能或文档解释了所有可用选项。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时抛出错误 的相关文章

  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • 如何将模型结果保存到文本文件?

    我正在尝试将从模型生成的频繁项集保存到文本文件中 该代码是 Spark ML 库中 FPGrowth 示例的示例 Using saveAsTextFile直接在模型上写入 RDD 位置而不是实际值 import org apache spa
  • Java中的媒体播放器库[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在评估用于在 Java 中播放音频 视频的库 它不需要 100 Java Java 与本机库的绑定
  • 如何使用 PySpark 预处理图像?

    我有一个项目 需要为 1 设置大数据架构 AWS S3 SageMaker 的概念验证使用 PySpark 预处理图像 2 执行 PCA and 3 训练一些机器或深度学习模型 我的问题是了解如何使用 PySpark 操作图像数据 但无法在
  • 如何读取一次流数据集并输出到多个接收器?

    我有 Spark 结构化流作业 它从 S3 读取数据 转换数据 然后将其存储到一个 S3 接收器和一个 Elasticsearch 接收器 目前 我正在做readStream一次然后writeStream format start 两次 这
  • 使用 FastAPI 传输 LangChain OpenAI 响应 [重复]

    这个问题在这里已经有答案了 我想将 OpenAI 的响应直接传输到 FastAPI 的端点 Code 在我的threads handler py 位于单独的文件夹中 中 我有以下函数askQuestion def askQuestion s
  • 如何设置SPARK_HOME变量?

    按照链接中的气泡水步骤进行操作http h2o release s3 amazonaws com sparkling water rel 2 2 0 index html http h2o release s3 amazonaws com
  • Scala Spark 包含与不包含

    我可以使用 contains 过滤 RDD 中的元组 如下所示 但是使用 不包含 来过滤 RDD 又如何呢 val rdd2 rdd1 filter x gt x 1 contains 我找不到这个的语法 假设这是可能的并且我没有使用Dat
  • Spark 有没有办法捕获执行器终止异常?

    在执行我的 Spark 程序期间 有时 其原因对我来说仍然是个谜 yarn 会杀死容器 执行器 并给出超出内存限制的消息 我的程序确实恢复了 但 Spark 通过生成一个新容器重新执行任务 但是 在我的程序中 任务还会在磁盘上创建一些中间文
  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • Spark scala 模拟 Spark.implicits 用于单元测试

    当尝试使用 Spark 和 Scala 简化单元测试时 我使用 scala test 和mockito scala 以及mockito Sugar 这只是让你做这样的事情 val sparkSessionMock mock SparkSes
  • Scala 案例类忽略 Spark shell 中的导入

    我希望这个问题有一个明显的答案 我刚刚升级到 Spark v2 0 并且遇到了一个奇怪的问题火花外壳 Scala 2 11 版本 如果我输入以下最小的 Scala import java sql Timestamp case class C
  • 我可以在集群模式下运行 dataproc 作业吗

    刚刚开始熟悉 GCP dataproc 我在使用时注意到gcloud dataproc jobs submit pyspark提交的作业spark submit deployMode client Is spark submit deplo
  • 如何在不从 DataFrame 转换并访问它的情况下向数据集添加列?

    我知道使用以下方法将新列添加到 Spark 数据集的方法 withColumn and a UDF 它返回一个 DataFrame 我还知道 我们可以将生成的 DataFrame 转换为 DataSet 我的问题是 如果我们仍然遵循传统的
  • Spark:查找前 n 个值的高性能方法

    我有一个很大的数据集 我想找到具有 n 个最高值的行 id count id1 10 id2 15 id3 5 我能想到的唯一方法是使用row number没有分区就像 val window Window orderBy desc coun
  • 我可以在没有 Hadoop 的情况下使用 Spark 作为开发环境吗?

    我对大数据和相关领域的概念非常陌生 如果我犯了一些错误或拼写错误 我很抱歉 我想了解阿帕奇火花 http spark apache org 并使用它仅在我的电脑中 在开发 测试环境中 由于Hadoop包含HDFS Hadoop分布式文件系统
  • 如何将文件透明地传输到浏览器?

    受控环境 IE8 IIS 7 ColdFusion 当从 IE 发出指向媒体文件 例如 mp3 mpeg 等 的 GET 请求时 浏览器将启动关联的应用程序 Window Media Player 我猜测 IIS 提供文件的方式允许应用程序
  • 数量重新分配逻辑 - 具有外部数据集的 MapGroups

    我正在研究一种复杂的逻辑 需要将数量从一个数据集重新分配到另一个数据集 在例子中我们有Owner and Invoice 我们需要从数量中减去Invoice准确地Owner匹配 在给定汽车的给定邮政编码处 减去的数量需要重新分配回同一辆车出
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • Spark:导入UTF-8编码的文本文件

    我正在尝试处理一个包含很多特殊字符的文件 例如德语变音符号 o 等 如下所示 sc hadoopConfiguration set textinputformat record delimiter r n r n sc textFile f

随机推荐

  • 用户位置的自定义注释视图不移动地图视图

    我们可以在 iOS 中为用户当前位置提供自定义注释视图吗 我需要用我自己的自定义视图 比如一些 ping 引脚 删除蓝点 带圆圈 是否有可能做到这一点 如果我们这样做 当用户位置发生变化时 该图钉是否会移动到新位置 或者我们需要以编程方式处
  • node.js websocket 模块已安装,但无法在脚本中运行

    我刚刚安装了node js microsoft Visual 以便能够安装websocket 它安装得很好 C Users Administrator gt npm install websocket npm http GET https
  • 为 linq groupby 编写自定义比较器

    同样 这个示例是我的实际问题的一个非常简化的版本 涉及 linq 分组的自定义比较器 我做错了什么 下面的代码产生下面的结果 1 2 0 4 1 0 4 1 0 1 1 0 然而我期待以下结果 因为 1 1 和 1 2 之间的距离 clas
  • 改变这是什么

    有没有办法改变 THIS 指向的内容 class foo foo fooinstance new foo foo otherfooinstance new foo void foo bar this otherfooinstance foo
  • 错误号2058无法加载插件authentication_windows_client:找不到指定的模块

    MySQL 有一个插件 允许根据当前用户的 Windows 凭据进行用户身份验证 该插件是 authentication windows dll 我从 SQLyog 收到以下错误消息 错误号 2058 插件authentication wi
  • 更改 Flash 播放器音频输出设备

    有没有办法改变Flash播放器的音频输出设备 如果没有的话 有没有swf播放器有这种可能性 谢谢 直到几分钟前我才遇到一个关于此的问题 我的 XP 盒子有两个音频设备 一个 iMic USB 音频 I O 设备 我已将桌面扬声器永久插入其中
  • 如何从反应应用程序中的公共文件夹导入文件?

    我在 public 文件夹中有一个 javascript 文件 我想将该文件导入到文件夹 src components 中的组件 projectFolder publicFolder index html recorder js srcFo
  • eclipse 从 root 显示 README

    以下项目结构并不罕见 项目A 目录 项目B 目录 ProjectX 目录 变更日志 文件 许可证 文件 自述文件 文件 这种结构 README 位于根目录中 得到了不同在线 Git 解决方案 如 github com bitbucket o
  • MDX - NON EMPTY 函数更快?

    我当时的假设是NON EMPTY必须尽可能避免使用该子句 因此 当我意外地发现它实际上使查询速度更快时 我感到震惊 示例如下 select Measures Count Of Requests on 0 Client Client Numb
  • Laravel - 与软删除数据的隐式路由模型绑定

    我有一个小问题 有两种用户角色 一种是普通成员 一种是管理员 成员可以删除博客 并且在删除 软删除 博客后他们将无法看到该博客 而管理员仍然可以看到该博客 即使它是软删除的 示例代码 Route file Route get blog bl
  • AngularJS 指令链接函数未被调用

    我正在尝试将 Angular http auth 库与引导模式窗口一起使用 模态框工作正常 但我在指令方面遇到问题 这是一个 jsfiddle 链接 http jsfiddle net jCUSh 85 http jsfiddle net
  • Android - 从Webview调用Java

    我想从Webview调用Java I have JavaScriptInterface below class JavaScriptInterface private Activity activity public JavaScriptI
  • Rails 4.0.1 中的新记录“没有将符号显式转换为字符串”(仅限)

    在我升级 Rails 4 后 尝试为我的任何 ActiveRecord 类创建新记录会给出 No explicit conversion of Symbol into String 例如 这是我的 links links params 方法
  • 在 FLASK 中运行 pypupeteer 会出现 ValueError: signal only Works in main thread

    我正在尝试将 pyppeteer 集成到 Flask 应用程序中 我有一个运行 pyppeteer 并截取页面屏幕截图的 python 脚本 如果我单独运行该脚本 这是工作文件 The PROBLEM当我在 FLASK 应用程序中运行它时
  • c++ - 不命名类型

    我有一个问题 当我尝试构建以下代码时 我得到 keywords does not name a type whitespace does not name a type 第 18 19 行和第 22 24 行 有人可以帮忙吗 这是代码 cp
  • 我如何解释这个输入?

    我目前使用 ANTLR 在 Java 中实现了一种可用的 简单的语言 我想做的是将其嵌入纯文本中 与 PHP 类似 例如 Lorem ipsum dolor sit amet Phasellus volutpat dignissim sap
  • Woocommerce/Wordpress - 将用户登录重定向到主页

    我已经搜索了这个问题的答案 使用了插件 但仍然没有任何效果 我希望我的网站的用户在登录 注册后被重定向到主页 目前 用户登录并被重定向到我的帐户页面 Woocommerce 提供了此代码 但它对我不起作用 goes in theme fun
  • 如何减少/消除 Angular 应用程序中的内存泄漏

    我正在优化我的大Angular App 当我发现一个Google DevTools非常好发现问题 由于我刚刚开始学习DevTools 我对内存泄漏很困惑 当我在应用程序中的不同页面之间来回移动时 配置文件堆快照大小一次又一次地增加 因此我认
  • 如何在 Java 中为 Swing 组件设置字体粗细

    我想设置不同字体粗细我的 JFrame 对话框上的组件 我该怎么做呢 在下面的Java语句中 setFont new Font Dialog Font BOLD 12 当我使用 Font BOLD 时 它太粗体 当我使用 Font Plai
  • Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时抛出错误

    我目前正在 Dataproc 上运行 Spark 作业 在尝试重新加入组并从 kafka 主题读取数据时遇到错误 我做了一些挖掘 但不确定问题是什么 我有auto offset reset set to earliest所以它应该从最早可用