如何将 Spark Streaming 数据转换为 Spark DataFrame

2023-11-26

到目前为止,Spark还没有创建用于流数据的DataFrame,但是当我在进行异常检测时,使用DataFrame进行数据分析更加方便快捷。我已经完成了这部分,但是当我尝试使用流数据进行实时异常检测时,出现了问题。我尝试了多种方法,仍然无法将DStream转换为DataFrame,也无法将DStream内部的RDD转换为DataFrame。

这是我最新版本的代码的一部分:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

正如你在 main() 函数中看到的,当我使用 ssc.socketTextStream() 方法读取输入流数据时,它会生成 DStream,然后我尝试将 DStream 中的每个个体转换为 Row,希望可以将数据转换为数据帧稍后。

如果我在这里使用 ppprint() 打印出 features_rdd ,它是有效的,这让我想到,features_rdd 中的每个个体都是一批 RDD,而整个 features_rdd 是一个 DStream。

然后我创建了streamrdd_to_df()方法并希望将每批RDD转换为数据帧,它给了我错误,显示:

错误 StreamingContext:启动上下文时出错,将其标记为已停止 java.lang.IllegalArgumentException:要求失败:未注册输出操作,因此无需执行任何操作

有没有想过如何对 Spark 流数据进行 DataFrame 操作?


Spark为我们提供了结构化流媒体可以解决此类问题。它可以生成流数据帧,即连续附加的数据帧。请检查以下链接

http://spark.apache.org/docs/latest/structed-streaming-programming-guide.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 Spark Streaming 数据转换为 Spark DataFrame 的相关文章

  • 使用 python 进行串行数据记录

    Intro 我需要编写一个小程序来实时读取串行数据并将其写入文本文件 我在读取数据方面取得了一些进展 但尚未成功地将这些信息存储在新文件中 这是我的代码 from future import print function import se
  • 我怎样才能更多地了解Python的内部原理? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我使用Python编程已经有半年多了 我对Python内部更感兴趣 而不是使用Python开发应用程序
  • python 中的代表

    我实现了这个简短的示例来尝试演示一个简单的委托模式 我的问题是 这看起来我已经理解了委托吗 class Handler def init self parent None self parent parent def Handle self
  • 如何使用 Plotly 中的直方图将所有离群值分入一个分箱?

    所以问题是 我可以在 Plotly 中绘制直方图 其中所有大于某个阈值的值都将被分组到一个箱中吗 所需的输出 但使用标准情节Histogram类我只能得到这个输出 import pandas as pd from plotly import
  • Django 模型在模板中不可迭代

    我试图迭代模型以获取列表中的第一个图像 但它给了我错误 即模型不可迭代 以下是我的模型和模板的代码 我只需要获取与单个产品相关的列表中的第一个图像 模型 py class Product models Model title models
  • 从零开始的 numpy 形状意味着什么

    好的 我发现数组的形状中可以包含 0 对于将 0 作为唯一维度的情况 这对我来说是有意义的 它是一个空数组 np zeros 0 但如果你有这样的情况 np zeros 0 100 让我很困惑 为什么这么定义呢 据我所知 这只是表达空数组的
  • 为什么Python的curses中escape键有延迟?

    In the Python curses module I have observed that there is a roughly 1 second delay between pressing the esc key and getc
  • 更改 `base_compiledir` 以将编译后的文件保存在另一个目录中

    theano base compiledir指编译后的文件存放的目录 有没有办法可以永久设置theano base compiledir到不同的位置 也许通过修改一些内部 Theano 文件的内容 http deeplearning net
  • Numpy 过滤器平滑零区域

    我有一个 0 及更大整数的 2D numpy 数组 其中值代表区域标签 例如 array 9 9 9 0 0 0 0 1 1 1 9 9 9 9 0 7 1 1 1 1 9 9 9 9 0 2 2 1 1 1 9 9 9 8 0 2 2 1
  • 首先对列表中最长的项目进行排序

    我正在使用 lambda 来修改排序的行为 sorted list key lambda item item lower len item 对包含元素的列表进行排序A1 A2 A3 A B1 B2 B3 B 结果是A A1 A2 A3 B
  • Seaborn Pairplot 图例不显示颜色

    我一直在学习如何在Python中使用seaborn和pairplot 这里的一切似乎都工作正常 但由于某种原因 图例不会显示相关的颜色 我无法找到解决方案 因此如果有人有任何建议 请告诉我 x sns pairplot stats2 hue
  • 将 matplotlib 颜色图集中在特定值上

    我正在使用 matplotlib 颜色图 seismic 绘制绘图 并且希望白色以 0 为中心 当我在不进行任何更改的情况下运行脚本时 白色从 0 下降到 10 我尝试设置 vmin 50 vmax 50 但在这种情况下我完全失去了白色 关
  • 如何将 UDF 中的结构或类数组返回到数据帧列值中?

    d ID 1 pID 1000 startTime 2018 07 02T03 34 20 endTime 2018 07 03T02 40 20 ID 1 pID 1000 startTime 2018 07 02T03 45 20 en
  • 在 Pandas 中使用正则表达式的多种模式

    我是Python编程的初学者 我正在探索正则表达式 我正在尝试从 描述 列中提取一个单词 数据库名称 我无法给出多个正则表达式模式 请参阅下面的描述和代码 描述 Summary AD1 Low free DATA space in data
  • 无法在 osx-arm64 上安装 Python 3.7

    我正在尝试使用 Conda 创建一个带有 Python 3 7 的新环境 例如 conda create n qnn python 3 7 我收到以下错误 Collecting package metadata current repoda
  • 使用 NumPy 将非均匀数据从文件读取到数组中

    假设我有一个如下所示的文本文件 33 346 1223 10 23 11 23 12 23 13 23 14 23 15 23 16 24 10 24 11 24 12 24 13 24 14 24 15 24 16 25 14 25 15
  • 默认情况下,Keras 自定义层参数是不可训练的吗?

    我在 Keras 中构建了一个简单的自定义层 并惊讶地发现参数默认情况下未设置为可训练 我可以通过显式设置可训练属性来使其工作 我无法通过查看文档或代码来解释为什么会这样 这是应该的样子还是我做错了什么导致默认情况下参数不可训练 代码 im
  • Elastic Beanstalk 中的 enum34 问题

    我正在尝试在 Elastic Beanstalk 中设置 django 环境 当我尝试通过requirements txt 文件安装时 我遇到了python3 6 问题 File opt python run venv bin pip li
  • 列表值的意外更改

    这是我的课 class variable object def init self name name alias parents values table name of the variable self name 这是有问题的函数 f
  • Scrapy Spider不存储状态(持久状态)

    您好 有一个基本的蜘蛛 可以运行以获取给定域上的所有链接 我想确保它保持其状态 以便它可以从离开的位置恢复 我已按照给定的网址进行操作http doc scrapy org en latest topics jobs html http d

随机推荐

  • 使用陀螺仪滚动图像时遇到问题

    我的 iPad Air 遇到了一个奇怪的问题 我的代码在 iPad 3 iPad 4 iPhone 5S iPod 5th Gen 上运行良好 但在 iPad Air 上 我的图像会自动滚动 无需用户旋转设备 这是我的代码 property
  • 为什么 Symfony 仍然记录到 dev.log 文件,即使我没有在日志处理程序中定义它?

    在执行 Symfony 命令期间 我想将消息记录到不同的文件中 我已经阅读了 Symfony 和 Monolog 文档 它应该像我在这里描述的那样工作 请注意 我知道来自 教义 事件 等通道的消息仍将由主处理程序记录 但这对我来说并不重要
  • 如何在我的 C# 控制器中获取 Ajax 发布的数组?

    我使用 ASP NET MVC 我尝试在 ajax 中发布一个数组 但我不知道如何将它获取到我的控制器中 这是我的代码 Ajax var lines new Array lines push ABC lines push DEF lines
  • 如何获取mp3文件的大小和持续时间?

    我需要计算 mp3 文件的总长度 目前我正在使用我发现的 PHP 类 http www zedwood com article php calculate duration of mp3 如果 mp3 文件位于同一服务器中 则此操作完美 但
  • 用 Python 计算 XIRR

    我需要计算一段时间内进行的金融投资的 XIRR numpy pandas 或普通 python 中是否有任何函数可以执行此操作 参考 什么是 XIRR 原问题中接受的答案不正确 可以改进 创建了一个用于快速 XIRR 计算的包 PyXIRR
  • 最大字符串内容长度配额(8192)

    反序列化操作 CreateTransactionEntity 的回复消息正文时出错 读取 XML 数据时超出了最大字符串内容长度配额 8192 通过更改创建 XML 读取器时使用的 XmlDictionaryReaderQuotas 对象的
  • Rust 中 C++ 的 shared_ptr 的等价物是什么?

    为什么 Rust 不允许这种语法 fn main let a String from ping let b a println a b 当我尝试编译这段代码时 我得到 error E0382 use of moved value a gt
  • 清理基于 Play 框架的项目

    运行新的后Play framework 2 0基于项目 我未能清理它 生成的工作人员仍然在下面 play new myapp gt app name myapp gt template java app myapp app controll
  • swift 代码崩溃,日志:dyld:未加载库:@rpath/libswiftCore.dylib

    我调试了一个简单的swift app 它在模拟器中可以正常运行 但是在真机上运行就立即崩溃 系统版本 ios 8 xcode版本 6 0 1 崩溃日志 dyld 未加载库 rpath libswiftCore dylib 引用自 priva
  • openCV 的 HSV 色轮图像?

    我已经编写了直方图代码 我想将其用于 SVM 训练 但根本问题是 我不明白应该选择多少个最小数量的垃圾箱 以便我可以在不同颜色 红色 绿色 黄色 蓝色 橙色 的垃圾箱之间获得广泛不同的分布 那么 有人可以给我链接 发布图像吗openCV 的
  • 配置 Gunicorn:未指定应用程序模块

    我正在尝试使用 NGINX 和 Gunicorn 部署 django 项目 我不断收到 502 Bad Gateway 过去几天我一直在不停地工作 但似乎无法部署它 我已经阅读了 3 个关于 Digital Ocean 的教程 但显然它们都
  • 如何在 React Context 中传递 State

    我试图通过并更新一个状态useContext App js import Home from components Home const UserContext createContext function App const name s
  • 拒绝:“2.23:应用程序必须遵循 iOS 数据存储指南”,我们没有在文档文件夹中保存任何数据

    我们的App被App拒绝了 原因是 2 23 我们发现您的应用程序不遵循iOS数据存储 指南 这是 App Store 审核指南所要求的 特别是 我们发现在启动和 或内容下载时 您的 应用程序存储超过 2 MB 的数据 要检查您的应用程序存
  • 为什么不能在没有括号的内插字符串中使用条件运算符? [复制]

    这个问题在这里已经有答案了 为什么我不能在 c 6 字符串插值中使用内联条件运算符 而不将其包含在括号内 和错误 正如您所看到的 解析器似乎遇到了困难 这是一个错误 还是字符串插值机制的一个特性 From MSDN 强调我的 person
  • 如何知道 UITableView 中 Tableview 单元格按钮单击的索引路径/行? [复制]

    这个问题在这里已经有答案了 我创建了一个具有自定义 UITableViewCell 的 TableView 一个按钮与表格视图的每一行相关联 现在我想知道单击按钮时的行号 以便我知道单击了哪个行按钮 我尝试了在堆栈上找到的一些东西 但没有任
  • 是否可以更改 Xcode 缩进注释块的方式?

    默认情况下 Xcode 自动将 C 风格注释块中的多行代码缩进一个空格 this is a comment block line 1 line 2 是否可以修改此行为 我希望评论块内没有缩进 这不是一个完整的答案 但 Xcode 在该键下有
  • PHP - 无法打开流:没有这样的文件或目录

    在PHP脚本中 是否调用include require fopen 或它们的衍生物 例如include once require once 甚至 move uploaded file 人们经常遇到错误或警告 无法打开流 没有这样的文件或目录
  • HashMap 缓存中的同步

    我有一个网络应用程序 人们可以在其中请求资源 为了提高效率 这些资源使用同步哈希图进行缓存 这里的问题是 当两个不同的请求同时针对同一未缓存的资源时 检索资源的操作会占用大量内存 因此我想避免对同一资源多次调用它 有人可以告诉我以下代码片段
  • Ansible 如何循环执行一系列任务?

    怎样才能Ansible剧本loop在一系列任务上 我希望实现一个执行任务序列的轮询循环 直到任务成功 当失败时 异常处理程序将尝试修复该条件 然后循环将重复任务序列 考虑以下假想的例子 action block debug msg i ex
  • 如何将 Spark Streaming 数据转换为 Spark DataFrame

    到目前为止 Spark还没有创建用于流数据的DataFrame 但是当我在进行异常检测时 使用DataFrame进行数据分析更加方便快捷 我已经完成了这部分 但是当我尝试使用流数据进行实时异常检测时 出现了问题 我尝试了多种方法 仍然无法将