Spark 结构化流:多个接收器

2023-12-21

  1. 我们使用结构化流从 Kafka 进行消费,并将处理后的数据集写入 s3。

    我们还想将处理后的数据写入 Kafka,是否可以通过同一个流查询来完成此操作? (火花版本2.1.1)

  2. 在日志中,我看到流式查询进度输出,并且我有来自日志的示例持续时间 JSON,有人可以更清楚地说明两者之间的区别吗?addBatch and getBatch?

  3. TriggerExecution - 是处理获取的数据和写入接收器所花费的时间吗?

    "durationMs" : {
        "addBatch" : 2263426,
        "getBatch" : 12,
        "getOffset" : 273,
       "queryPlanning" : 13,
        "triggerExecution" : 2264288,
        "walCommit" : 552
    },
    

  1. Yes.

    在 Spark 2.1.1 中,您可以使用writeStream.foreach将数据写入 Kafka。这个博客中有一个例子:https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structed-streaming.html https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

    或者您可以使用Spark 2.2.0,它添加了Kafka接收器以支持正式写入Kafka。

  2. getBatch测量从源创建 DataFrame 所需的时间。这通常非常快。addBatch测量 DataFrame 在接收器中运行的时间。

  3. triggerExecution测量触发器执行的运行时间,通常几乎与getOffset + getBatch + addBatch.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 结构化流:多个接收器 的相关文章

随机推荐

  • 如何从 Groovy 中的对象字段中提取新列表

    在 Groovy 中 如何从以下内容中提取新列表 def people new Person name Tom yearOfBirth 1985 new Person name Abigail yearOfBirth 1987 new Pe
  • 为什么不能用两层列表初始化器来初始化 2D std::array?

    有人可以帮助我理解为什么我的编译器不能 不能推断出这一点吗 使用 g 7 3 不起作用 include
  • 如何找到串口蓝牙设备的UUID?

    我想从串口蓝牙设备接收数据到Android手机 但我不知道该设备的 UUID 如何找到该设备的 UUID 扩展 pwc 关于 UUID 为 0x1101 的说法 据我所知 这是 UUID 的 16 位版本 我无法弄清楚如何使用 16 位 U
  • 如何从 onDataChange 方法传递数据? [复制]

    这个问题在这里已经有答案了 我设置了一个布尔值来检查应用程序中是否存在重复的用户名 我希望布尔值根据 ValueEventListener 中 onDataChange 的结果返回数据 这是我所拥有的 private boolean isU
  • 如何从父网格和子网格获取复选框元素的引用

    我有这样的要求 比如我需要找到父网格行复选框 如果选中父网格行复选框 那么我需要将父网格行复选框的所有子网格复选框设置为 true 为此我已经这样做了
  • Pod 未找到:在 64 个 gem 中找不到“cocoapods”(>= 0)(Gem::LoadError)

    安装 cocoapods 后sudo gem install cocoapods 尝试运行pod setup返回此错误 Could not find cocoapods gt 0 among 64 total gem s Gem LoadE
  • 如何发送 Bundle 中的自定义对象的 ArrayList

    我有一个应用程序 它使用服务每 x 秒创建一个自定义对象 MyObject 的 ArrayList 然后我希望我的 Activity 获得这个 ArrayList 我目前计划让服务在每次完成数据查询时向活动处理程序发送一条消息 我希望发送给
  • Spring Batch 中的 NonTransientFlatFileException

    我试图读取一个包含 100 条记录的 CSV 文件 并一次性处理一批 10 条记录 一切工作正常 但处理完所有记录后 我得到 org springframework batch item file NonTransientFlatFileE
  • Angular ng build --target=product 给出错误

    我使用 Angular CLI 创建了一个新的 Angular 项目 我使用的版本是 Angular Cli 1 0 2 角度 4 0 0 我在其中添加了很多代码 但是现在 当我使用下面的命令构建我的项目时 我收到了一堆错误 ng buil
  • c 定义多行宏?

    define DEBUG BREAK a if a asm int 3 我已经按照上面定义了一个宏 并尝试使用它 include test define h int main DEBUG BREAK 1 1 return 0 但该示例无法编
  • 登录 Clojure

    对于Java开发 我使用Slf4j和Logback Logger logger LoggerFactory getLogger HelloWorld class logger debug Hello world 如何在 Clojure 程序
  • 使用 GDI+ 旋转图像而不剪裁其边缘的最快方法是什么?

    有一些非常漫长且饥饿的算法可以做到这一点 但到目前为止我还没有想出或发现任何特别快的算法 最快的方法是使用不安全调用直接操作图像内存LockBits 听起来很可怕 但其实很简单 如果您搜索 LockBits 您会发现大量示例 例如here
  • DevPay和Mfa是互斥的授权方式

    我尝试使用以下命令通过 AWS cli 将 MFA 删除添加到我的 S3 存储桶 aws s3api put bucket versioning bucket
  • 2D 软体:凝胶状和可塑性?

    我正在使用 Matter js 物理学来尝试创建软体 我能够创建这样的身体 但我不确定这是否是我想要的 软体 确实 这个物体并不完全是刚性的 并且在碰撞和被拖动时具有弹性的感觉 我一直在寻找与凝胶有相似之处的身体 这张图片可能在视觉上有助于
  • Pandas 分割错误

    由于内存不足 以下代码行未成功执行 import pandas as pd import datetime as dt u cols remote host dummy1 dummy2 date timezone get status by
  • PDO 错误:一般错误:2031 [重复]

    这个问题在这里已经有答案了 当我执行代码时出现此错误 我知道这已经在这里讨论过几次了 但我无法通过阅读那里提供的解决方案来解决我的问题 这是我得到的错误 致命错误 未捕获异常 PDOException 消息为 SQLSTATE HY000
  • 对于软件开发人员来说,学习如何对微控制器进行编程有多难?

    我是一名软件开发人员 我使用高级语言进行编程已有几年了 我想知道如何迈出硬件编程的第一步 不是什么疯狂复杂的东西 但也许是一些普通的 CE 设备 假设我不需要将 PCB 与各种组件放在一起 而只是对微型 cpu 进行编程 我要到多低的级别
  • 该算法的复杂度(Big-O)是多少?

    我对算法分析相当熟悉 并且可以说出我使用的大多数算法的大体 但我已经被困了几个小时 无法为我编写的这段代码想出 Big O 基本上 它是一种生成字符串排列的方法 它的工作原理是使字符串中的每个字符成为第一个字符 并将其与子字符串减去该字符的
  • 我可以在摘要式身份验证中使用已 MD5 编码的密码吗

    我在数据库中有密码的 MD5 哈希值 我想将其用于 HTTP AUTH DIGEST 但在阅读文档时 摘要哈希看起来包含用户名 领域和明文密码的哈希 在这种情况下有什么办法可以使用密码的 MD5 哈希吗 不 如果他们需要的哈希是这样生成的
  • Spark 结构化流:多个接收器

    我们使用结构化流从 Kafka 进行消费 并将处理后的数据集写入 s3 我们还想将处理后的数据写入 Kafka 是否可以通过同一个流查询来完成此操作 火花版本2 1 1 在日志中 我看到流式查询进度输出 并且我有来自日志的示例持续时间 JS