到目前为止,Spark还没有创建用于流数据的DataFrame,但是当我在进行异常检测时,使用DataFrame进行数据分析更加方便快捷。我已经完成了这部分,但是当我尝试使用流数据进行实时异常检测时,出现了问题。我尝试了多种方法,仍然无法将DStream转换为DataFrame,也无法将DStream内部的RDD转换为DataFrame。
这是我最新版本的代码的一部分:
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator
sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
model_inputs = sys.argv[1]
def streamrdd_to_df(srdd):
sdf = sqlContext.createDataFrame(srdd)
sdf.show(n=2, truncate=False)
return sdf
def main():
indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
inrdd = indata.map(lambda r: get_tuple(r))
Features = Row('rawFeatures')
features_rdd = inrdd.map(lambda r: Features(r))
features_rdd.pprint(num=3)
streaming_df = features_rdd.flatMap(streamrdd_to_df)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
正如你在 main() 函数中看到的,当我使用 ssc.socketTextStream() 方法读取输入流数据时,它会生成 DStream,然后我尝试将 DStream 中的每个个体转换为 Row,希望可以将数据转换为数据帧稍后。
如果我在这里使用 ppprint() 打印出 features_rdd ,它是有效的,这让我想到,features_rdd 中的每个个体都是一批 RDD,而整个 features_rdd 是一个 DStream。
然后我创建了streamrdd_to_df()方法并希望将每批RDD转换为数据帧,它给了我错误,显示:
错误 StreamingContext:启动上下文时出错,将其标记为已停止
java.lang.IllegalArgumentException:要求失败:未注册输出操作,因此无需执行任何操作
有没有想过如何对 Spark 流数据进行 DataFrame 操作?
Spark为我们提供了结构化流媒体可以解决此类问题。它可以生成流数据帧,即连续附加的数据帧。请检查以下链接
http://spark.apache.org/docs/latest/structed-streaming-programming-guide.html
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)