我有一个关于访问 Spark RDD 时闭包中局部变量的使用的问题。我想解决的问题如下:
我有一个应该读入 RDD 的文本文件列表。
但是,首先我需要向从单个文本文件创建的 RDD 添加附加信息。此附加信息是从文件名中提取的。然后,使用 union() 将 RDD 放入一个大 RDD 中。
from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)
list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
tmp_rdd = spark_context.textFile(filename)
# extract_file_info('file_from_Owner.txt') == 'Owner'
file_owner = extract_file_info(filename)
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work:
# The result is that always Bert will be the owner, i.e., never Ernie.
问题是循环中的map()函数没有引用“正确的”file_owner。相反,它将引用 file_owner 的最新值。在我的本地计算机上,我通过为每个 RDD 调用 cache() 函数来解决这个问题:
# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
tmp_rdd.cache()
# ..
我的问题:使用 cache() 是解决我的问题的正确解决方案吗?还有其他选择吗?
非常感谢!
这不是 Spark 现象,而是一种普通的 Python 现象。
>>> fns = []
>>> for i in range(3):
... fns.append(lambda: i)
...
>>> for fn in fns:
... print fn()
...
2
2
2
避免这种情况的一种方法是声明具有默认参数的函数。默认值在声明时计算。
>>> fns = []
>>> for i in range(3):
... def f(i=i):
... return i
... fns.append(f)
...
>>> for fn in fns:
... print fn()
...
0
1
2
这个问题经常出现,请参阅以下其他问题:
- Python 中的词法闭包 https://stackoverflow.com/questions/233673/lexical-closures-in-python
- (lambda) 函数闭包捕获什么? https://stackoverflow.com/questions/2295290/what-do-lambda-function-closures-capture-in-python
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)