访问 Spark RDD 时闭包中局部变量的使用

2024-01-30

我有一个关于访问 Spark RDD 时闭包中局部变量的使用的问题。我想解决的问题如下:

我有一个应该读入 RDD 的文本文件列表。 但是,首先我需要向从单个文本文件创建的 RDD 添加附加信息。此附加信息是从文件名中提取的。然后,使用 union() 将 RDD 放入一个大 RDD 中。

from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)

list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
    tmp_rdd = spark_context.textFile(filename)
    # extract_file_info('file_from_Owner.txt') == 'Owner'
    file_owner = extract_file_info(filename)   
    tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
    rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work: 
# The result is that always Bert will be the owner, i.e., never Ernie.

问题是循环中的map()函数没有引用“正确的”file_owner。相反,它将引用 file_owner 的最新值。在我的本地计算机上,我通过为每个 RDD 调用 cache() 函数来解决这个问题:

# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
tmp_rdd.cache()
# ..

我的问题:使用 cache() 是解决我的问题的正确解决方案吗?还有其他选择吗?

非常感谢!


这不是 Spark 现象,而是一种普通的 Python 现象。

>>> fns = []
>>> for i in range(3):
...   fns.append(lambda: i)
... 
>>> for fn in fns:
...   print fn()
... 
2
2
2

避免这种情况的一种方法是声明具有默认参数的函数。默认值在声明时计算。

>>> fns = []
>>> for i in range(3):
...   def f(i=i):
...     return i
...   fns.append(f)
... 
>>> for fn in fns:
...   print fn()
... 
0
1
2

这个问题经常出现,请参阅以下其他问题:

  • Python 中的词法闭包 https://stackoverflow.com/questions/233673/lexical-closures-in-python
  • (lambda) 函数闭包捕获什么? https://stackoverflow.com/questions/2295290/what-do-lambda-function-closures-capture-in-python
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

访问 Spark RDD 时闭包中局部变量的使用 的相关文章

  • 如何正确地将 MIDI 刻度转换为毫秒?

    我正在尝试将 MIDI 刻度 增量时间转换为毫秒 并且已经找到了一些有用的资源 MIDI Delta 时间刻度到秒 http www lastrayofhope co uk 2009 12 23 midi delta time ticks
  • Python逻辑运算符优先级[重复]

    这个问题在这里已经有答案了 哪个运算符优先4 gt 5 or 3 lt 4 and 9 gt 8 这会被评估为真还是假 我知道该声明3 gt 4 or 2 lt 3 and 9 gt 10 显然应该评估为 false 但我不太确定 pyth
  • 在 Python distutils 中从 setup.py 查找脚本目录的正确方法?

    我正在分发一个具有以下结构的包 mymodule mymodule init py mymodule code py scripts script1 py scripts script2 py The mymodule的子目录mymodul
  • 通过列表理解压平列表列表

    我正在尝试使用 python 中的列表理解来展平列表 我的清单有点像 1 2 3 4 5 6 7 8 只是为了打印这个列表列表中的单个项目 我编写了这个函数 def flat listoflist for item in listoflis
  • Django 模型在模板中不可迭代

    我试图迭代模型以获取列表中的第一个图像 但它给了我错误 即模型不可迭代 以下是我的模型和模板的代码 我只需要获取与单个产品相关的列表中的第一个图像 模型 py class Product models Model title models
  • if 语句未命中中的 continue 断点

    在下面的代码中 两者a and b是生成器函数的输出 并且可以评估为None或者有一个值 def testBehaviour self a None b 5 while True if not a or not b continue pri
  • Pandas 数据帧到 numpy 数组 [重复]

    这个问题在这里已经有答案了 我对 Python 很陌生 经验也很少 我已经设法通过复制 粘贴和替换我拥有的数据来使一些代码正常工作 但是我一直在寻找如何从数据框中选择数据 但无法理解这些示例并替换我自己的数据 总体目标 如果有人真的可以帮助
  • 在Python中调整图像大小

    我有一张尺寸为 288 352 的图像 我想将其大小调整为 160 240 我尝试了以下代码 im imread abc png img im resize 160 240 Image ANTIALIAS 但它给出了一个错误TypeErro
  • 如何通过在 Python 3.x 上按键来启动和中断循环

    我有这段代码 当按下 P 键时会中断循环 但除非我按下非 P 键 否则循环不会工作 def main openGame while True purchase imageGrab if a sum gt 1200 fleaButton ti
  • 对图像块进行多重处理

    我有一个函数必须循环遍历图像的各个像素并计算一些几何形状 此函数需要很长时间才能运行 在 24 兆像素图像上大约需要 5 小时 但似乎应该很容易在多个内核上并行运行 然而 我一生都找不到一个有据可查 解释充分的例子来使用 Multiproc
  • Spark-1.6.1 上的 DMLC 的 XGBoost-4j

    我正在尝试在 Spark 1 6 1 上使用 DMLC 的 XGBoost 实现 我能够使用 XGBoost 训练我的数据 但在预测方面面临困难 我实际上想以在 Apache Spark mllib 库中完成的方式进行预测 这有助于计算训练
  • 使用鼻子获取设置中当前测试的名称

    我目前正在使用鼻子编写一些功能测试 我正在测试的库操作目录结构 为了获得可重现的结果 我存储了一个测试目录结构的模板 并在执行测试之前创建该模板的副本 我在测试中执行此操作 setup功能 这确保了我在测试开始时始终具有明确定义的状态 现在
  • Pandas 根据 diff 列形成簇

    我正在尝试使用 Pandas 根据表示时间 以秒为单位 的列中的差异来消除数据框中的一些接近重复项 例如 import pandas as pd numpy as np df pd DataFrame 1200 1201 1233 1555
  • 如何使用列表作为pandas数据框中的值?

    我有一个数据框 需要列的子集包含具有多个值的条目 下面是一个带有 运行时 列的数据框 其中包含程序在各种条件下的运行时 df condition a runtimes 1 1 5 2 condition b runtimes 0 5 0 7
  • 将 2D NumPy 数组按元素相乘并求和

    我想知道是否有一种更快的方法 专用 NumPy 函数来执行 2D NumPy 数组的元素乘法 然后对所有元素求和 我目前使用np sum np multiply A B 其中 A B 是相同维度的 NumPy 数组m x n 您可以使用np
  • 在 Pandas 中使用正则表达式的多种模式

    我是Python编程的初学者 我正在探索正则表达式 我正在尝试从 描述 列中提取一个单词 数据库名称 我无法给出多个正则表达式模式 请参阅下面的描述和代码 描述 Summary AD1 Low free DATA space in data
  • 您可以将操作直接应用于map/reduce/filter 中的参数吗?

    map and filter通常可以与列表理解互换 但是reduce并不那么容易被交换map and filter 此外 在某些情况下我仍然更喜欢函数语法 但是 当您需要对参数本身进行操作时 我发现自己正在经历语法体操 最终必须编写整个函数
  • 限制 django 应用程序模型中的单个记录?

    我想使用模型来保存 django 应用程序的系统设置 因此 我想限制该模型 使其只能有一条记录 极限怎么办 尝试这个 class MyModel models Model onefield models CharField The fiel
  • 如何读取Python字节码?

    我很难理解 Python 的字节码及其dis module import dis def func x 1 dis dis func 上述代码在解释器中输入时会产生以下输出 0 LOAD CONST 1 1 3 STORE FAST 0 x
  • Scrapy Spider不存储状态(持久状态)

    您好 有一个基本的蜘蛛 可以运行以获取给定域上的所有链接 我想确保它保持其状态 以便它可以从离开的位置恢复 我已按照给定的网址进行操作http doc scrapy org en latest topics jobs html http d

随机推荐

  • 哪个系统调用返回连接到 Linux 系统的设备列表?

    我正在尝试编写一个 C Java 程序 它将显示连接到系统的设备列表 非常感谢在这方面的任何帮助 lsusb http www cyberciti biz faq linux how do i list all usb devices 命令
  • iOS EXC_BAD_ACCESS:如何调试?

    我收到 EXC BAD ACCESS 我知道这通常意味着什么 尝试访问 不再 的对象是最可能的原因 那么 我在哪里可以找到它 我在网上看了很多帖子 他们都说 方案中 启用 NSZombie 现在 当我运行调试器时 我应该查看什么 我看不出有
  • Thread.join 和 Synchronized 有什么区别?

    我很困惑何时使用Thread join 以及何时使用synchronization在多线程应用程序中 根据我的说法 它们都阻塞或等待其他线程完成执行 此示例必须按顺序模式依次输出 10 个 A 10 个 B 和 10 个 C 如下所示 1
  • template 与 template 不匹配是一个缺陷吗?

    在探索的同时这个答案 https stackoverflow com a 47730794 1366368我发现采用参数包的模板不会被需要具有特定数量参数的模板的模板所接受 在我看来 这是一个缺陷 因为如果模板可以采用任意数量的参数 它应该
  • 在纹理数组中绘制Texture2D图集

    如何通过 GLSL Sampler 仅绘制存储在纹理数组中的纹理2D 图集的一部分 例如 我有纹理图集 我会将它们放在一起 与相同大小的其他图集 在Texture2D数组里面 glTexSubImage3D 那么 在这种情况下我的采样器应该
  • 为什么我们需要快速正文解析器?

    我遇到了很多博客和文章 他们建议使用 body parser 来解析请求正文数据 有没有办法在不使用任何中间件的情况下解析数据或从正文获取正文数据 默认情况下 express 只为您提供原始 HTTP 请求正文req论证作为Incoming
  • Lambda 没有自动推断返回类型

    当我回答我自己的问题时https stackoverflow com a 32115498 383779 https stackoverflow com a 32115498 383779 我又有一个疑问了 In const CArray
  • 如何逐行运行Linux程序

    我想使用一些调试器逐行运行 GTK C 程序 我从未调试过 Linux 程序 那么在哪里可以找到针对初学者如何调试代码的说明呢 我只是有一个想法 我必须从网上下载源代码 使用调试符号编译项目并通过 DDD 或 GDB 运行源代码 那么任何人
  • 切换当前元素的可见性

    我正在尝试写一个函数toggle active单击即可显示隐藏内容 再单击一次即可再次折叠内容 可悲的是 它不起作用 你能帮我修改一下吗 function toggle active this var x this nextSibling
  • XDebug 无法在 VScode 中进行 php 调试

    即使在所有配置之后 使用 xdebug 和 xampp 在 vscode 中进行 PHP 调试也无法正常工作 这是我的 php ini 文件配置 zend extension D Xampp php ext php xdebug 3 0 0
  • CakePHP 中页面别名的自动路由

    我正在使用 CakePHP 框架创建一个 CMS 通过 CMS 创建的每个页面都将有其唯一的 URL 别名 这还取决于虚拟文件夹结构 例如 www site com level 1 about us www site com level 2
  • 如何让 async/await 等待 Observable 返回

    对 Angular 相当陌生 并且在 Promises Observables 和 async await 方面遇到困难 我有一个功能需要当前用户详细信息来执行某些任务 为此 我编写了一个获取用户详细信息方法 该方法返回一个承诺 并且任务在
  • 来自 Pivot 的 Seaborn 热图中的数据顺序

    所以我有一个使用seaborn创建的热图 revels rd pivot Flavour Packet number Contents ax sns heatmap revels annot True fmt d linewidths 0
  • 为购物车项目和产品设置正确的 jpa 映射

    我正在通过一些例子学习jpa 涉及购物车和购物车物品 我将它们定义如下 但不太确定要使用哪个映射 Entity class Product private Long id private String name Entity class C
  • 如何在 Facebook 应用程序中添加画布和安全画布 URL

    我正在尝试开发 Facebook 教程 Friends Smash 但我遇到了一个大问题 它没有显示任何添加画布和安全画布 URL 的选项 设置 gt gt 添加平台 gt gt 网站 我得到以下选项 如何添加画布和安全画布 URL 画布和
  • 如何使用解析表证明左递归语法不在LL(1)中

    我有一个语法 想证明它不在 LL 1 中 S gt SA A A gt a 由于它是左递归语法 为了找到第一个和后续集合 我消除了左递归并得到 S gt AS S gt AS Empty A gt a first of A a follow
  • 将 DAO 与复合对象一起使用

    我正在尝试重写一堆 DAO 这里的设置是 仅纯 JDBC 无 JPA ORM 任何形式 没有使用任何接口 插入对象之前进行大量检查 业务对象紧密相连 我的主要问题是 如何持久 检索由多个其他对象组成的业务对象 例如我的 CustomerDA
  • NGRX - 如何计算商店中商品的属性

    我们在 Angular 应用程序中使用 NGRX 数据来自 API 某些属性以未格式化的字符串形式来自 API 因此我们需要对其进行格式化 当然 这可以在 HTML 中完成 但问题是在 HTML 和 TypeScript 中的多个位置都需要
  • ZF2 - 使用导航视图助手的多个导航菜单

    我正在尝试将主导航与子菜单结合使用以进行更具体的导航 In my layout我这样调用视图助手 this gt navigation main navigation gt menu 并在我的view我这样称呼它 this gt navig
  • 访问 Spark RDD 时闭包中局部变量的使用

    我有一个关于访问 Spark RDD 时闭包中局部变量的使用的问题 我想解决的问题如下 我有一个应该读入 RDD 的文本文件列表 但是 首先我需要向从单个文本文件创建的 RDD 添加附加信息 此附加信息是从文件名中提取的 然后 使用 uni