我对 Spark(老实说也是 Python)有点菜鸟,所以如果我错过了一些明显的东西,请原谅我。
我正在使用 Spark 和 Python 进行文件流处理。在我做的第一个示例中,Spark 正确地侦听给定目录并计算文件中单词的出现次数,因此我知道在侦听目录方面一切正常。
现在我试图获取出于审计目的而处理的文件的名称。我在这里读到http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvf[电子邮件受保护]%3E http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvfmP8OC9jrpVgWsRWfqjMxeYd6sE6EojfdyFy_GaJ3BO43_A@mail.gmail.com%3E这不是一件小事。我在这里得到了一个可能的解决方案http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgy[电子邮件受保护]%3E http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgyCiZbnrd6Y_aG0cBRCVC1u37X8FERSEcHB=tR3A2VGrGrPQ@mail.gmail.com%3E我尝试按如下方式实现它:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def fileName(data):
string = data.toDebugString
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingFileNamePrinter")
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream("file:///test/input/")
files = lines.foreachRDD(fileName)
print(files)
ssc.start()
ssc.awaitTermination()
不幸的是,现在它不再每秒监听该文件夹,而是监听一次,输出“None”,然后什么也不做。这与有效代码之间的唯一区别是
files = lines.foreachRDD(fileName)
在我担心获取文件名(明天的问题)之前,有人能明白为什么这只检查目录一次吗?
提前致谢
中号
所以这是一个菜鸟错误。我将我的解决方案发布出来,供我自己和其他人参考。
正如@user3689574 所指出的,我没有在函数中返回调试字符串。这充分解释了为什么我得到“无”。
接下来,我在函数外部打印调试信息,这意味着它从来不是 foreachRDD 的一部分。将其移动到函数中,如下所示:
def fileName(data):
debug = data.toDebugString()
print(debug)
这会按应有的方式打印调试信息,并按应有的方式继续侦听目录。改变它解决了我最初的问题。就获取文件名而言,这变得非常简单。
目录没有变化时的调试字符串如下:
(0) MapPartitionsRDD[1] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[0] at textFileStream at NativeMethodAccessorImpl.java:-2 []
这清楚地表明没有文件。当文件复制到目录中时,调试输出如下:
(1) MapPartitionsRDD[42] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[41] at testFileStream at NativeMethodAccessorImpl.java:-2 [] | file:/test/input/test.txt New HadoopRDD[40] at textFileStream at NativeMethodAccessorImpl.java:-2 []
通过快速的正则表达式,可以轻松地为您提供文件名。希望这对其他人有帮助。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)