我使用spark 1.3.1和Python 2.7
这是我第一次体验 Spark Streaming。
我尝试使用 Spark Streaming 从文件中读取数据的代码示例。
这是示例的链接:https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py
我的代码如下:
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream('../inputs/2.txt')
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda x: (x, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
我遇到了类似的问题,但我意识到,一旦我设置 Streaming 运行,streamingcontext 就会从新文件中获取数据。它仅在流启动后摄取新放置在源目录中的数据。
实际上,pyspark文档说得非常明确:
文本文件流(目录)
Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)