我有一个客户端将 CSV 文件放置在嵌套目录中,如下所示,我需要实时读取这些文件。我正在尝试使用 Spark 结构化流来做到这一点。
Data:
/user/data/1.csv
/user/data/2.csv
/user/data/3.csv
/user/data/sub1/1_1.csv
/user/data/sub1/1_2.csv
/user/data/sub1/sub2/2_1.csv
/user/data/sub1/sub2/2_2.csv
Code:
val csvDF = spark
.readStream
.option("sep", ",")
.schema(userSchema) // Schema of the csv files
.csv("/user/data/")
添加任何配置以允许 Spark 从结构化流中的嵌套目录读取。
我可以使用 glob 路径流式传输子目录中的文件。
在这里发帖是为了其他人。
inputPath = "/spark_structured_input/*?*"
inputDF = spark.readStream.option("header", "true").schema(userSchema).csv(inputPath)
query = inputDF.writeStream.format("console").start()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)