使用 kafka 进行 Spark 结构化流处理只会导致一批(Pyspark)

2023-12-28

我有以下代码,我想知道为什么它只生成一批:

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "IP").option("subscribe", "Topic").option("startingOffsets","earliest").load()
// groupby on slidings windows
query = slidingWindowsDF.writeStream.queryName("bla").outputMode("complete").format("memory").start()

该应用程序使用以下参数启动:

spark.streaming.backpressure.initialRate 5
spark.streaming.backpressure.enabled True

kafka 主题包含大约 1100 万条消息。由于initialRate参数,我预计它至少应该生成两批,但它只生成一批。谁能告诉我为什么 Spark 仅在一批中处理我的代码?

我正在使用 Spark 2.2.1 和 Kafka 1.0。


那是因为spark.streaming.backpressure.initialRate参数仅由旧的 Spark Streaming 使用,而不由 Structured Streaming 使用。

相反,使用maxOffsetsPerTrigger: http://spark.apache.org/docs/latest/structed-streaming-kafka-integration.html http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

顺便说一句,另请参阅这个答案:Spark 结构化流如何处理背压? https://stackoverflow.com/questions/44871621/how-spark-structured-streaming-handles-backpressure, SSS现在没有完整的背压支持

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 kafka 进行 Spark 结构化流处理只会导致一批(Pyspark) 的相关文章

随机推荐

  • Python:使用 setproctitle 更改进程名称

    我有一个 python 脚本 它启动许多 C 程序 每个程序都会传递一个命令行参数 如下所示 process path test process name test num process 10 for p in range 1 num p
  • Sequelize:如何在使用左外连接的连接表上执行 WHERE 条件

    我的数据库模型如下 员工驾驶一辆或零辆车辆一辆车可以由一名或多名员工驾驶车辆有一个模型类型 可以告诉我们它的燃料类型以及其他信息 我想要续集为我找到所有不开车的员工 或者如果他们开车 那么车辆不是柴油车 因此 其中 VehicleID 为
  • 带有 SharePoint 参数的 VB.Net 命令行(控制台)程序

    我想在 VB net 中创建一个允许参数的控制台程序 我想要做的是在下面的代码中添加参数 以便可以从 运行 菜单创建 Web 部件页面 例如C MyProgram exe Design 这将创建 Design Webpart 页面 我尝试在
  • 删除图中的文本

    我正在使用绘图功能sizetree from library plotrix 版本 3 8 1 这个函数有一个showcount允许括号中的一些计数显示在绘图上的参数 见下图 但我想知道为什么当我使用showcount FALSE 它们周围
  • Python 游戏网络

    我目前在寻找网络游戏编程资源时遇到困难 特别是Python 我不知道任何其他语言 我在 Python 中发现了很多关于通用网络的东西 但我不确定这就是我需要的 因为我相信游戏网络还涉及一些其他因素 我正在尝试创建一个在不同计算机上玩的 2
  • Python。如何使用libxml2获取属性值

    我使用的是 MINIDOM 但它不提供 xpath 方法 我现在尝试使用 libxml2 但在检索属性值时遇到问题 我的 xml 摘录如下
  • jersey 2.3.1 和 spring 集成兼容性问题

    我正在尝试创建将使用球衣和弹簧的宁静服务项目设置 我最初下载了 jersey1 8 依赖的 jar 我还得到了 jersey spring 1 8 并且我使用 com sun jersey spi spring container serv
  • 如何使用一对 FrameLabels 制作绘图网格?

    创建行 列 网格图 整个网格具有单个 FrameLabel 的最简单方法是什么 我需要类似的东西 p ListPlot RandomInteger 10 5 Joined gt True Axes gt False Frame gt Tru
  • Google 地图 API 3 搜索框

    我不知道如何在我的谷歌地图中实现搜索框 我有它 用户可以从表单中选择一些内容 然后在地图上加载标记 现在我想添加他们可以使用谷歌搜索框输入城市和州的位置 例如在maps google com上 这可以通过 API v 3 来完成吗 Goog
  • Eclipse:选择不包含任何可以在服务器上运行的资源

    我无法将 Maven Java Web 应用程序项目运行到 Eclipse IDE 中配置的 Tomcat 最初 我可以右键单击该项目并在 tomcat 服务器上运行它 但自从我将项目共享到存储库后 我无法执行此操作 我从存储库中断开了项目
  • 确定 JS AudioContext.analysisrNode 中的频率

    背景 我的目标是创建一个基于 JavaScript 的 Web 应用程序来分析和显示音频源 包括页内源 中的频率信息
  • 当理论规定使用已检查异常时,我是否应该使用相关的内置未检查异常?

    SO 上有很多关于 检查与非检查异常 主题的帖子 这个答案 https stackoverflow com a 19061110 2520359可能是最全面 信息最丰富的 然而 我仍然对遵循那里提出的逻辑感到矛盾 这是有原因的 我正在围绕一
  • 我应该在我的应用程序中包含命令行模式吗?

    出于学习目的 我正在 C 和 winforms 中开发一个类生成应用程序 我认为包含允许在脚本中使用应用程序的命令行模式可能会很好 在我的应用程序中包含命令行模式是一个很好的做法吗 最好有两个不同的程序 一个带有 GUI 一个用于命令行 实
  • 如何使 Flask/Jinja2 加载可执行 zip 存档中的捆绑模板?

    我已将 Flask Web 应用程序打包成可执行的 Python 压缩存档 zipapp https docs python org 3 6 library zipapp html 我在加载模板时遇到问题 Flask Jinja2 无法找到
  • 如果 ASP.NET 破坏了 DIV 的 ID,如何从 javascript 访问该 DIV?

    我有一个包含 div 元素的网页 在页面上 有 javascript 来引用 div document getElementById divId 在另一位开发人员重新设计该页面以使用 ASP 母版页之前 该方法一直运行良好 Now docu
  • 流复制和逻辑复制的区别

    有人能告诉我更多关于 PostgreSQL 中物理复制和逻辑复制之间的区别吗 TL DR 逻辑复制发送逐行更改 物理复制发送磁盘块更改 逻辑复制对于某些任务更好 而物理复制对于其他任务更好 请注意 在 PostgreSQL 12 更新时的当
  • Rails 购物车 - 未添加到当前订单

    这里是 Rails 菜鸟 我正在构建一个基本的购物车 它之前运行良好 在不更改任何代码的情况下 我 git reset hard 到我以前的提交 它正在工作 它就崩溃了 这是细分 Github 仓库 https github com chr
  • 编译引用的dll

    使用VS2005和VB NET 我有一个项目 它是我创建的数据存储的 API 编译时创建api dll 我在同一解决方案中有第二个项目 它有一个对 API 项目的项目引用 编译时将创建wrapper dll 这基本上是特定于应用程序的 AP
  • 显示对象而不是字符串

    在这里 我附上了我的问题的快照和代码 它只向我显示作为对象的内容 但完美地显示组名 这个问题的快照在下面的链接中给出 只需浏览这张图片 http imageupload org d 4DA941521 快照 gt 我想要特定组名称的子数据
  • 使用 kafka 进行 Spark 结构化流处理只会导致一批(Pyspark)

    我有以下代码 我想知道为什么它只生成一批 df spark readStream format kafka option kafka bootstrap servers IP option subscribe Topic option st