来自 Kafka 的 pySpark 结构化流不会输出到控制台进行调试

2024-03-23

下面是我的代码。我尝试了许多不同的选择变体,但应用程序运行,但没有显示每秒写入的消息。我有一个 Spark Streaming 示例,它使用 pprint() 确认 kafka 实际上每秒都会收到消息。 Kafka 中的消息采用 JSON 格式,请参阅字段/列标签的架构:

from pyspark.sql.functions import *
from pyspark.sql.types import *
import statistics


KAFKA_TOPIC = "vehicle_events_fast_testdata"
KAFKA_SERVER = "10.2.0.6:2181"

if __name__ == "__main__":
    print("NXB PySpark Structured Streaming with Kafka Demo Started")

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar,/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.executor.extraLibrary", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .config("spark.driver.extraClassPath", "/home/cldr/streams-dev/libs/spark-sql-kafka-0-10_2.11-2.4.4.jar:/home/cldr/streams-dev/libs/kafka-clients-2.0.0.jar") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    schema = StructType() \
        .add("WheelAngle", IntegerType()) \
        .add("acceleration", IntegerType()) \
        .add("heading", IntegerType()) \
        .add("reading_time", IntegerType()) \
        .add("tractionForce", IntegerType()) \
        .add("vel_latitudinal", IntegerType()) \
        .add("vel_longitudinal", IntegerType()) \
        .add("velocity", IntegerType()) \
        .add("x_pos", IntegerType()) \
        .add("y_pos", IntegerType()) \
        .add("yawrate", IntegerType())


 # Construct a streaming DataFrame that reads from testtopic
    trans_det_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_SERVER) \
        .option("subscribe", KAFKA_TOPIC) \
        .option("startingOffsets", "latest") \
        .load() \
        .selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")


#(from_json(col("value").cast("string"),schema))

    #Q1 =  trans_det_df.select(from_json(col("value"), schema).alias("parsed_value"), "timestamp")
    #Q2 =  trans_det_d.select("parsed_value*", "timestamp")


    query = trans_det_df.writeStream \
            .format("console") \
            .option("truncate","false") \
            .start() \
            .awaitTermination()

kafka.bootstrap.servers is the 卡夫卡经纪人地址(默认端口 9092),而不是 Zookeeper(端口 2181)

另请注意,您的起始偏移量是最新的,因此您必须在启动流应用程序后生成数据。

如果您想查看现有主题数据,请使用最早的偏移量。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

来自 Kafka 的 pySpark 结构化流不会输出到控制台进行调试 的相关文章

  • Python 的“platform.mac_ver()”报告不正确的 MacOS 版本

    我正在使用Pythonplatform module https docs python org 3 library platform html要识别 MacOS 版本 如下所示 import platform print platform
  • API网关+Lambda+Python:处理异常

    我正在非代理模式下从 API Gateway 调用基于 Python 的 AWS Lambda 方法 我应该如何正确处理异常 以便使用部分异常设置适当的 HTTP 状态代码以及 JSON 正文 作为示例 我有以下处理程序 def my ha
  • 如何使用 QWebView 显示 html。 Python?

    如何在控制台中显示 HTML 格式的网页 import sys from PyQt4 QtGui import QApplication from PyQt4 QtCore import QUrl from PyQt4 QtWebKit i
  • 使用记事本打开文本文件作为python中的帮助文件?

    我想为我的简单程序的用户提供打开帮助文件的机会 以指导他们如何充分利用我的程序 理想情况下 我希望在 GUI 上有一个蓝色的小帮助链接 可以随时单击该链接 从而在本机文本编辑器 例如记事本 中打开 txt 文件 有没有一种简单的方法可以做到
  • 如何创建毫秒粒度的 Python 时间戳?

    我需要一个自纪元以来的毫秒 ms 时间戳 这应该不难 我确信我只是缺少一些方法datetime或类似的东西 实际上微秒 s 粒度也很好 我只需要亚 1 10 秒的计时 例子 我有一个每 750 毫秒发生一次的事件 假设它检查灯是否打开或关闭
  • Python 排列(包括子字符串)

    我遇到过这个帖子 如何在Python中生成列表的所有排列 https stackoverflow com questions 104420 how to generate all permutations of a list in pyth
  • 配置 Flask 以正确加载 Bootstrap js 和 css 文件

    如何使用 Flask 中的 url for 指令来正确设置 以便使用 Bootstrap 和 RGraph 的 html 页面可以正常工作 假设我的 html 页面看起来像这样 部分片段
  • 有效地写入 pandas 中的多个相邻列

    使用 numpy ndarray 可以一次写入多个列 而无需先进行复制 只要它们相邻 如果我想写入数组的前三列 我会写 a 0 0 3 1 2 3 this is very fast a is a numpy ndarray 我希望在 pa
  • 如何避免在matplotlib中调用latex(输出到pgf)

    我使用 matplotlib 及其 pgf 后端来生成包含在 LaTeX 投影仪文档中的绘图 当我使用未定义的乳胶命令时 我遇到了麻烦 但对于我的应用程序 我不需要 matplotlib 来使用 Latex 生成标签或注释 我只想要正确的
  • Python 上每个系数具有特定约束的多元线性回归

    我目前正在数据集上运行多元线性回归 起初 我没有意识到我需要限制自己的体重 事实上 我需要有特定的正权重和负权重 更准确地说 我正在做一个评分系统 这就是为什么我的一些变量应该对音符产生积极或消极的影响 然而 当运行我的模型时 结果不符合我
  • 如何在Python中获取套接字的外部IP?

    当我打电话时socket getsockname 在套接字对象上 它返回我的机器的内部 IP 和端口的元组 但是 我想找回我的外部IP 最便宜 最有效的方式是什么 如果没有外部服务器的配合 这是不可能的 因为您和另一台计算机之间可能存在任意
  • Python 用静态图像将 mp3 转换为 mp4

    我有x文件包含一个列表mp3我想转换的文件mp3文件至mp4文件带有static png photo 似乎这里唯一的方法是使用ffmpeg但我不知道如何实现它 我编写了脚本来接受输入mp3文件夹和一个 png photo 然后它将创建新文件
  • 检测反射 DLL 注入

    在过去的几年中 恶意软件 以及一些渗透测试工具 如 Metasploit 的 meterpreter 负载 已经开始使用反射 DLL 注入 PDF http www harmonysecurity com files HS P005 Ref
  • 如何使用 SymPy 求给定一阶导数的 n 阶导数?

    Given some f and the differential equation x t f x t how do I compute x n t in terms of x t For example given f x t sin
  • 访问 Scrapy 内的 django 模型

    是否可以在 Scrapy 管道内访问我的 django 模型 以便我可以将抓取的数据直接保存到我的模型中 我见过this https scrapy readthedocs org en latest topics djangoitem ht
  • 使用 pyspark 计算所有可能的单词对

    我有一个文本文档 我需要找到整个文档中重复单词对的可能数量 例如 我有下面的word文档 该文档有两行 每行用 分隔 文档 My name is Sam My name is Sam My name is Sam My name is Sa
  • 通过过滤对 Pyspark Dataframe 进行分组

    我有一个数据框如下 cust id req req met 1 r1 1 1 r2 0 1 r2 1 2 r1 1 3 r1 1 3 r2 1 4 r1 0 5 r1 1 5 r2 0 5 r1 1 我必须观察客户 看看他们有多少要求 看看
  • 如何在Python中显示坐标网格线的变换?

    假设我有常规的笛卡尔坐标系 x y 并且我考虑一个矩形网格区域 D 分成小方块 我想看看域 D 如何在 Python 中的坐标变换 T x y gt u x y v x y 下映射 我正在寻找这样的东西 See here https mat
  • matplotlib imshow() 和像素强度

    我试图了解矩阵的值是如何输入到 matplotlib 的imshow 函数确定灰度模式下像素的强度 考虑示例代码 import random import matplotlib pyplot as plt import matplotlib
  • Pandas:如何删除以 nan 作为列名的多个列?

    根据标题 这是一个可重现的示例 raw data x this that this that this np nan np nan np nan np nan np nan np nan y np nan np nan np nan np

随机推荐

  • 编写自定义重构脚本的最佳 Java 库是什么? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • matplotlib 图例无法与句柄正常工作

    Matplotlib 可以自动显示图例 也可以通过为其提供绘图句柄来手动显示图例 但不知怎的 后者对我来说不能正常工作 举个例子 legend handles lgh plt plot 0 1 0 1 r lgh plt plot 0 1
  • 使用 searchview 的特定标题(从 api 获取)?

    我想要这样的东西 所以问题是 我真正想要的是当用户在搜索视图中输入特定主题名称 如果存在于应用程序中 时 它应该能够给出建议 如果找到它应该打开该主题活动 就像 Facebook Instagram 等搜索一样 这些标题来自 API 我已在
  • 在Linux中测量C程序缓存命中/未命中和CPU时间的最简单工具?

    我正在用 C 编写一个小程序 我想测量它的性能 我想看看它在处理器中运行了多少时间以及它进行了多少次缓存命中 未命中 有关上下文切换和内存使用情况的信息也很好 该程序的执行时间不到一秒钟 我喜欢 proc pid stat 的信息 但我不知
  • jQuery 不支持选择顶级节点文本?

    每当我需要顶级文本节点时 我必须写这个长故事吗 hello clone children remove end text 为什么没有原生函数支持呢 我不确定为什么没有本地支持 我想你可以把这行笨重的代码变成一个插件 一定有一个比我选择的更好
  • Spark:连接拒绝纱线上的 webapp 代理

    我在 docker 容器上使用 Spark 和 hadoop 我有 3 个容器主容器和 2 个从容器 一切工作正常 但我在运行任务时遇到 Spark 代理 Web 应用程序问题 我可以连接到yarn webapp 但nhttp 172 20
  • 如何将express添加到角度启动器中?

    我一直在使用 webpack 和 Angular 使用 webpack dev server 开发我的 angular2 应用程序 https github com AngularClass angular starter https gi
  • 使用变量定义数组的大小

    这在C语言中有效吗 include
  • Pony ORM 报告记录“在当前事务之外更新”,而没有其他事务

    代码很简单 如下 from pony orm import Required Set Optional PrimaryKey from pony orm import Database db session import time db D
  • 在C#中,将字符串解析为单个字符

    在 C 中 如何将字符串解析为单个字符 Given 词 太棒了 期望的结果 字母 0 W 字母 1 o 字母 2 n 字母 3 d 字母 4 e 字母 5 r 字母 6 f 字母 7 u 字母 8 l Char letters word T
  • TeraData SQL 中“TOP”和“SAMPLE”之间的区别

    TeraData SQL 中的 TOP 和 SAMPLE 有什么区别 它们是一样的吗 From 顶部与样本 http datawarehouse ittoolbox com groups technical functional terad
  • 带有 C++ 代码的 R 包安装失败,未创建 DLL

    我目前正在开发一个 R 包 它使用 C 代码并包含外部库 dlib boost 和小组开发的优化库 我们使用 Rcpp 来集成 R 和 C 但问题是该包总是无法编译 而且我发现的类似问题都对我不起作用 R CMD 检查生成的报告为 inst
  • 带有前缀的 DynamoDB 和 TableNameOverride

    我正在测试 DynamoDB 表 并希望使用前缀 dev 为生产和开发环境设置不同的表名称以进行开发 我做了这个测试来打印表名称 import com amazonaws services dynamodbv2 datamodeling D
  • SQLite数据库更新一行android

    我正在开发一个android应用程序 其中我需要根据某个where子句更新表中的列 下面是代码 public void updatethekeyofweeklycolumn String profilename String keystem
  • 失败的测试是否会导致持续构建失败?

    如果一个项目的测试作为构建计算机上构建过程的一部分执行 那么如果一组测试失败 整个构建是否应该失败 回答这个问题时应该考虑哪些因素 哪些测试失败重要吗 提出这个问题的背景信息 目前我正在开发一个项目NUnit http www nunit
  • C#中使用Open Xml SDK导出DataTable到Excel时指定编码格式

    我只是想使用 open xml 将数据表导出为 excel 参考了下面的方法 在 C 中使用 Open Xml SDK 将数据表导出到 Excel https stackoverflow com questions 11811143 exp
  • 如何在python中声明零数组(或特定大小的数组)[重复]

    这个问题在这里已经有答案了 我正在尝试构建计数直方图 所以我创建了桶 我知道我可以遍历并附加一堆零 即 buckets for i in xrange 0 100 buckets append 0 有更优雅的方法吗 我觉得应该有一种方法来声
  • 使用 GWT 平台将参数从一个演示者传递到另一个演示者

    我正在尝试将已加载到演示者上的参数传递给另一个演示者 例如来自某个客户端的汽车 最好的方法是什么 使用网守 有什么例子吗 PS 我将 DI 与 gin 和 GWT Platform 框架一起使用 如果应该在事件触发时加载演示者 您可以使用P
  • paypal 沙箱不工作[重复]

    这个问题在这里已经有答案了 我正在使用 PayPal 沙箱进行测试付款 自过去 3 4 个月以来 它运行良好 但我看到他们正在沙箱设计 管理测试电子邮件帐户的方式等方面进行更改 他们禁用了我的实际沙箱帐户 并要求我使用真实的 PayPal
  • 来自 Kafka 的 pySpark 结构化流不会输出到控制台进行调试

    下面是我的代码 我尝试了许多不同的选择变体 但应用程序运行 但没有显示每秒写入的消息 我有一个 Spark Streaming 示例 它使用 pprint 确认 kafka 实际上每秒都会收到消息 Kafka 中的消息采用 JSON 格式