Spark结构化流:书写不正确

2024-04-20

我将抄表记录以 JSON 形式从 kafka_2.11-0.10.0.1 流式传输到 Spark 2.1 中。我改用结构化流媒体;尽管kafka消费者确认传入数据,但我控制台和writeStream不动。我正在测试使用

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

My code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession \
    .builder \
    .appName("interval") \
    .master("local[4]") \
    .getOrCreate()
schema = StructType().add("customer_id", StringType()) 
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "xx.xxx.xx.xxx:9092") \
      .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load() \
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

query = df.writeStream \
 .option("checkpointLocation", "/user/XX/checkpoint5") \
 .format("parquet") \
 .start("/user/XX/interval5") 

它使用 388 字节 parquet 文件创建检查点和数据目录。然而,没有写入任何流数据。

$ hdfs dfs -ls interval5
drwxr-xr-x   ... interval5/_spark_metadata
-rw-r--r--   ... interval5/part-00000-0b2eb00a-c361-4dfe-a24e-9589d150a911.snappy.parquet
-rw-r--r--   ... interval5/part-00000-e0cb12d1-9c29-4eb0-92a8-688f468a42ce.snappy.parquet

kafka-consumer 确认数据正在发送:

{"customer_id":"customer_736"}
{"customer_id":"customer_995"}
{"customer_id":"customer_1899"}
{"customer_id":"customer_35"}

kafka-consumer 显示流数据。

我认为我错过了出列和保存流式传输行的重要步骤 - 一天的 stackoverflow 搜索并没有帮助。 (编辑以删除对控制台的引用;因为它不相关)。


With .option("startingOffsets", "latest")您应该只期望在启动流查询后发布的消息。

因此,预期的操作过程是启动流式查询,然后发布消息。

Parquet 文件中未写入任何内容。

你会看见nothing自您使用以来保存到镶木地板文件中.format("console")。你必须将其更改为parquet并重新开始查询。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark结构化流:书写不正确 的相关文章

  • 有没有办法离线将多个 Plotly HTML 文件合并/嵌入到一个页面/HTML 文件中?

    我正在尝试将多个图表合并成一个 HTML 报告来发送 问题是我真的不认为子图是最好的主意 因为图表相对不相关 不同的 X Y 轴 我所需要做的只是将图表附加到 1 个 HTML 文件中 有一个指南解释了如何使用绘图 URL 来完成此操作 但
  • Python:我可以修改元组吗?

    我有一个 2 D 元组 实际上我以为 它是一个列表 但错误说它是一个元组 但无论如何 该元组的形式为 浮点数 val prod id 现在我有一个字典 其中包含 key gt prod id 和 value prod name 现在 我想将
  • 将 python scikit learn 模型导出到 pmml

    我想将 python scikit learn 模型导出到 PMML 中 什么 python 包最适合 我读到Augustus https github com opendatagroup augustus 但我找不到任何使用 scikit
  • 是否可以在 SQLAlchemy 中创建一个可以创建父记录的事件侦听器?

    有两个表 父表和子表 我想创建一个事件监听器 触发器 如果孩子没有父母 它可以创建父母 这就是我试图做的 class parent db Model tablename parent id db Column db Integer prim
  • 在 python 中返回 self [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有一个代表对象的类 我有很多方法可以修改这个对象状态 没有明显的返回或显然没有任何返回 在 C 中 我会将所有这些方法声明为void
  • AWS Lambda - 在区域之间自动复制 EC2 快照?

    我想创建一个 Lambda 函数 python 它将自动将已创建的快照复制到另一个区域 我已联系 AWS Support 他们只向我发送了用于 RDS 数据库的 GitHub 脚本 没有 EC2 快照复制脚本 任何帮助都会很棒 谢谢 是的
  • 为什么我在 Python 中收到“连接被拒绝”错误? (插座)

    我是套接字新手 请原谅我完全缺乏理解 我有一个服务器脚本 server py usr bin python import socket import the socket module s socket socket Create a so
  • 在 AWS Elastic Beanstalk 中部署 Flask 应用程序

    当我部署 Flask 应用程序时 它显示成功 但是当我检索日志时 我看到错误 找不到 Flask 我的需求文件中有烧瓶 任何帮助 Sat Jan 11 06 51 50 503908 2020 error pid 3393 remote 1
  • 多级QTreeView

    我很难理解如何使用 QTreeView 和 QStandardItemModel 设置多级 QTree 这是我所拥有的 from PySide QtGui import import sys class MainFrame QWidget
  • scipy 的 curve_fit 函数的尺寸问题

    我对 python 中的曲线拟合以及一般的 python 都很陌生 目前 我正在尝试使用 scipy 中的 curve fit 模块来拟合 4 个光谱峰 简而言之 我的文本文件中有两列数据 所以我的第一步是将数据导入到两个数组中 一个包含
  • Python lmfit:拟合 2D 模型

    我正在尝试将二维高斯拟合到一些灰度图像数据 该数据由一个二维数组给出 lmfit 库实现了一个易于使用的模型类 它应该能够做到这一点 不幸的是文档 http lmfit github io lmfit py model html http
  • OpenCV 在使用 anaconda 的 Linux 上无法与 python 正常工作。收到 cv2.imshow() 未实现的错误

    这就是我得到的确切错误 我的操作系统是 Ubuntu 16 10 OpenCV 错误 未指定错误 该功能未实现 使用 Windows GTK 2 x 或 Carbon 支持重新构建库 如果您使用的是 Ubuntu 或 Debian 请安装
  • Spark parquet 分区:大量文件

    我正在尝试利用 Spark 分区 我试图做类似的事情 data write partitionBy key parquet location 这里的问题是每个分区都会创建大量镶木地板文件 如果我尝试从根目录读取 则会导致读取速度变慢 为了避
  • 有没有更快的方法将数字转换为名称?

    以下代码定义了映射到数字的名称序列 它的设计目的是获取一个号码并检索一个特定的名称 该类通过确保名称存在于其缓存中来进行操作 然后通过索引到其缓存中来返回名称 问题在这 如何在不存储缓存的情况下根据数字计算出名称 该名称可以被认为是一个以
  • 结束一天(日期时间)的最优雅的方式是什么?

    我目前正在编写一些报告代码 允许用户选择指定日期范围 它的工作方式 简化 是 用户 可选 指定年份 用户 可选 指定月份 用户 可选 指定一天 这是一个代码片段 以及描述我想要的内容的注释like to do from datetime i
  • 为什么删除 DataFrame 的列或部分会增加内存使用量,以及如何确保对未使用的 DataFrame 切片进行垃圾回收

    处理大型 DataFrame 时 您需要小心内存使用情况 例如 您可能想要分块下载大数据 处理这些块 然后从内存中删除所有不必要的部分 我找不到任何有关处理垃圾收集的最佳程序的资源pandas 但我尝试了以下方法并得到了令人惊讶的结果 im
  • numpy.polyfit 没有关键字“cov”

    我试图使用 polyfit 来找到一组数据的最佳拟合直线 但我还需要知道参数的不确定性 所以我也想要协方差矩阵 在线文档建议我写 polyfit x y 2 cov True 但这给出了错误 类型错误 polyfit 得到了意外的关键字参数
  • 对二进制数的字符串表示进行按位运算 python 2.7

    我想对二进制数的两个字符串表示执行按位或 但我不知道如何将字符串转换为原始二进制 a 010110 b 100000 a b 应该产生 110110 然后我想计算 on 位的数量 这应该返回 4 您可以使用内置的将字符串转换为二进制int
  • Mac OS 上的 pybluez 安装错误

    我尝试安装pybluez使用以下命令 pip install pybluez sudo easy install pybluez 但对于这两个命令我最终都会出错 环境 Mac OSX 10 9 1 Python 2 7 点日志 cc fno
  • Snakemake根据字典输入和输出

    我正在尝试重命名 Snakemake 管道中的一些文件 假设我有三个文件 FileA txt FileB txt FileC txt 我希望根据字典重新命名它们dict A 0 B 1 C 2 to get RenamedFile0 txt

随机推荐

  • 如何构建在 Android 上运行的本机(命令行)可执行文件?

    我已经成功构建了一个使用本机 JNI 库的 Android 应用程序 GUI 但是 现在我想创建一个从命令行 root 权限 运行并且根本不使用 GUI 的可执行文件 我如何构建这样的东西 从 NDK r8d 开始 这个问题可以通过更简单的
  • 在 sqlalchemy 中,有没有一种方法可以进行排序,使空单元格位于末尾,无论排序顺序如何?

    我有一个非常标准的设置 想按列排序 someselect order by asc table1 mycol 但是 我想要行 or NULL for mycol显示在结果的末尾 有没有办法做到这一点 SQLAlchemy 有一个NULLS
  • 无法将 UIViewController 类型的值转换为 PatternDetailViewController

    我试图将视图控制器向下转换为详细视图控制器 但不能 我正在使用 Core Data 第一次 错误出现在prepareForSegue方法中 内容如下 无法将 UIViewController 类型的值 0x1b81cdc 转换为 Patte
  • 定义用于执行程序的工作目录 (C#)

    我目前正在尝试从特定文件夹启动可执行文件 我下面的代码使应用程序崩溃得很奇怪 Process p new Process p StartInfo WorkingDirectory dump p StartInfo FileName s p
  • 如何使用Python检查url是否重定向到另一个url

    我想检查目标url访问后是否会重定向 我想我可以做这样的事情 req urllib2 Request url url headers headers resp urllib2 urlopen req timeout 3 code resp
  • 具有特定长度的字符串的哈希值

    有没有一种方法可以生成字符串的哈希值 以便哈希值本身具有特定的长度 我有一个生成 41 字节哈希值 SHA 1 的函数 但我需要它最大为 33 字节 由于某些硬件限制 如果我将 41 字节哈希截断为 33 我可能 当然 失去了唯一性 或者实
  • Google Webmasters API for Java 返回空网站列表

    我编写了一个简单的站点列表查询代码 它使用 Oauth 与服务帐户基于谷歌的文档 https developers google com identity protocols OAuth2ServiceAccount 所使用的身份验证密钥文
  • AJAX 中的 GET 与 POST?

    为什么 AJAX 中有 GET 和 POST 请求 因为它无论如何都不影响页面 URL 由于数据未反映到页面 URL 因此通过 AJAX 中的 GET 传递敏感数据有何区别 您应该根据您的 Web 服务要求使用正确的 HTTP 动词 当处理
  • $(this) 在函数中不起作用

    以下代码从文件加载 html 内容 我使用这个线程 https stackoverflow com questions 168963 stop jquery load response from being cached
  • 删除所有 kubernetes 命名空间中所有 pod 的命令

    查看文档后 有一个 API 调用可以删除单个 pod 但是有没有办法删除all所有命名空间中的 Pod 没有命令可以完全按照您的要求进行操作 以下是一些势均力敌的比赛 运行任何这些命令之前请务必小心 如果您使用多个集群 请确保您连接到正确的
  • 如何使用 C# 在 Selenium WebDriver 中等待警报?

    我如何将 Selenium WebDriver 设置为在接受警报而不是 Thread Sleep 之前等待警报 作为网站 有时加载速度非常慢 有时加载速度很快 Thanks 您应该应用 webdriver 等待警报正确出现 var wait
  • azure 以编程方式设置环境变量以禁用 azure 功能

    我有一个 Azure API 和一个 Azure 函数 当某些逻辑传递到 API 时 我希望它禁用 Azure 功能 这个帖子 https stackoverflow com questions 36368786 programmatica
  • 如何检查 IP 是否位于这些子网之一

    我有大约 12600 个子网 例如 123 123 208 0 20 和一个IP 我可以使用 SQLite 数据库或数组或其他任何东西 大约一个月前有人问过一个类似的问题 但是我不是在寻找针对一个子网检查一个IP 而是针对一堆子网 显然是最
  • 我无法理解“找不到模块:错误:无法解析'fs''”

    有一点空闲时间 我尝试用我喜欢的新网络技术 Typescript Pug 和 React 重新设计我的一个旧项目 一切工作正常 直到我尝试将 pug js 添加到混合中babel plugin transform react pug 无论我
  • 在 xml 中存储 url 的正确方法?

    我将数据存储在 xml 文件中 在其中一个节点中 我必须存储一个由特殊字符 如 组成的 url 我使用 amp 而不是 并且 xml 显示没有错误 但是当我进行 SAX 解析时 节点内返回的字符串值是 之后的字符串 我猜我存储网址的方式不正
  • 在本地主机上的 laravel 中将 slack 连接到 botman

    这是我在 laravel 中的路线文件 我将任何 url 与调用闭包的 botman 相匹配 该闭包为 botman 注册一个 slack 驱动程序并监听消息 hello 在闲暇时我试图设置Request URL在使用此事件订阅下http
  • 查找或创建竞争条件

    我正在尝试使用 ActiveRecordfind or create by column 但我从 Postgres 收到错误 让我知道它有时无法找到模型 并尝试插入一个模型 保持这张表的独特性非常重要 所以我添加了一个 unique gt
  • LuaJIT FFI 回调性能

    The LuaJIT FFI 文档 http luajit org ext ffi semantics html提到从 C 调用回 Lua 代码相对较慢 建议尽可能避免使用 不要将回调用于性能敏感的工作 例如考虑一个数值积分例程 它需要用户
  • 在 Windows 上绘制主题组合框

    我尝试模仿主题不可编辑组合框的外观 CBS DROPDOWNLIST using DrawThemeBackground https msdn microsoft com library windows desktop bb773306 v
  • Spark结构化流:书写不正确

    我将抄表记录以 JSON 形式从 kafka 2 11 0 10 0 1 流式传输到 Spark 2 1 中 我改用结构化流媒体 尽管kafka消费者确认传入数据 但我控制台和writeStream不动 我正在测试使用 pyspark pa