Spark 中函数返回空列表

2024-01-17

下面是获取压缩文件中文件名列表的代码

def getListOfFilesInRepo(zipFileRDD : RDD[(String,PortableDataStream)]) : (List[String]) = {
    val zipInputStream = zipFileRDD.values.map(x => new ZipInputStream(x.open))
    val filesInZip =  new ArrayBuffer[String]()
    var ze : Option[ZipEntry] = None
    zipInputStream.foreach(stream =>{
      do{
        ze = Option(stream.getNextEntry);
        ze.foreach{ze =>
          if(ze.getName.endsWith("java") && !ze.isDirectory()){
            var fileName:String = ze.getName.substring(ze.getName.lastIndexOf("/")+1,ze.getName.indexOf(".java"))
            filesInZip += fileName
          }
        }
        stream.closeEntry()
      } while(ze.isDefined)
      println(filesInZip.toList.length) // print 889 (correct)
    })
    println(filesInZip.toList.length) // print 0 (WHY..?)
    (filesInZip.toList)
  }

我按以下方式执行上面的代码:

scala> val zipFileRDD = sc.binaryFiles("./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip")
zipFileRDD: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = ./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip BinaryFileRDD[17] at binaryFiles at <console>:25

scala> getListOfFilesInRepo(zipRDD)
889
0
res12: List[String] = List()

为什么我没有得到 889 而是得到 0?


发生这种情况是因为filesInZip不在工人之间共享。foreach在本地副本上运行filesInZip当它完成时,这个副本将被简单地丢弃并被垃圾收集。如果你想保留结果,你应该使用转换(很可能是flatMap)并返回收集的聚合值。

def listFiles(stream: PortableDataStream): TraversableOnce[String] = ???

zipInputStream.flatMap(listFiles)

您可以了解更多信息了解闭包 https://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 中函数返回空列表 的相关文章

  • 多个 scala 库导致 intellij 出错?

    我正在使用 intellij 14 和 scala 2 11 6 使用 homebrew 安装并使用符号链接 ln s usr local Cellar scala 2 11 6 libexec src usr local Cellar s
  • 如何从命令行向 REPL 添加导入?

    如何使 REPL 导入命令行中给出的包 Sample scala someMagicHere import sys error scala gt imports 1 import scala Predef 162 terms 78 are
  • 更改 Spark SQL 中的 Null 顺序

    我需要能够按升序和降序对列进行排序 并且还允许空值位于第一个或空值位于最后一个 使用 RDD 我可以将 sortByKey 方法与自定义比较器结合使用 我想知道是否有使用 Dataset API 的相应方法 我了解如何将 desc asc
  • Scala 的代码覆盖率工具 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 更改 build.sbt 自定义任务中的版本

    我在 build sbt 中定义了一个自定义任务 val doSmth taskKey Unit smth doSmth version 1 0 SNAPSHOT 但它不会改变版本 我真正想要的是自定义 sbt 发布任务 它将始终将相同的版
  • 高效序列化案例类

    对于我正在工作的图书馆 我需要提供一个高效 便捷 typesafe序列化 scala 类的方法 理想的情况是用户可以创建一个案例类 并且只要所有成员都是可序列化的 它似乎也应该如此 我准确地知道序列化和反序列化阶段的类型 因此不需要 也不能
  • 可选择将项目添加到 Scala 映射

    我正在寻找这个问题的惯用解决方案 我正在构建一个valScala 不可变 Map 并希望有选择地添加一项或多项 val aMap Map key1 gt value1 key2 gt value2 if condition key3 gt
  • 规范化且不可变的数据模型

    Haskell如何解决 规范化不可变数据结构 问题 例如 让我们考虑一个表示前女友 男友的数据结构 data Man Man name String exes Woman data Woman Woman name String exes
  • 如何使用 Spark 2 屏蔽列?

    我有一些表 我需要屏蔽其中的一些列 要屏蔽的列因表而异 我正在读取这些列application conf file 例如 对于员工表如下所示 id name age address 1 abcd 21 India 2 qazx 42 Ger
  • 使用 net.liftweb.json 或 scala.util.parsing.json 解析大型 (30MB) JSON 文件会出现 OutOfMemoryException。有什么建议吗?

    我有一个包含大量测试数据的 JSON 文件 我想解析这些数据并推送我正在测试的算法 它的大小约为 30MB 包含大约 60 000 个元素的列表 我最初在 scala util parsing json 中尝试了简单的解析器 如下所示 im
  • Play框架:单属性案例类的JSON读取

    我正在尝试为包含单个属性的案例类创建隐式 JSON Reads 但收到错误 Reads Nothing 不符合预期类型 这是代码 import play api libs functional syntax import play api
  • Scala:什么是 CompactBuffer?

    我试图弄清楚 CompactBuffer 的含义 和迭代器一样吗 请解释其中的差异 根据 Spark 的文档 它是 ArrayBuffer 的替代方案 可以提供更好的性能 因为它分配的内存更少 以下是 CompactBuffer 类文档的摘
  • Scala中有类似Java Stream的“peek”操作吗?

    在Java中你可以调用peek x gt println x 在 Stream 上 它将对每个元素执行操作并返回原始流 这与 foreach 不同 foreach 是 Unit Scala 中是否有类似的东西 最好是适用于所有 Monady
  • Slick和bonecp:org.postgresql.util.PSQLException:FATAL:抱歉,太多客户端已经错误

    当我在本地开发应用程序时 我使用以下命令启动我的 play2 应用程序sbt run 我喜欢如何更改代码 然后重新加载浏览器以查看我的更改 在大约 10 次代码更改之后 我收到 postgresql 太多连接错误 见下文 我的数据库连接使用
  • Spark DataFrame 序列化为无效 json

    TL DR 当我倾倒 Spark 时DataFrame作为 json 我总是得到类似的结果 key1 v11 key2 v21 key1 v12 key2 v22 key1 v13 key2 v23 这是无效的 json 我可以手动编辑转储
  • 单位安全平方根

    我只是想知道如何以与 F 正确交互的方式编写用户定义的平方根函数 sqrt 单位制 http blogs msdn com andrewkennedy archive 2008 09 04 units of measure in f par
  • Haskell 中列表列表的笛卡尔积

    给定一个长度列表的列表x所有子列表的长度都相同y 输出y x长度列表x包含每个子列表中的一项 例子 x 3 y 2 1 2 3 4 5 6 Output 2 3 8不同的输出 1 3 5 1 4 5 1 3 6 1 4 6 2 3 5 2
  • 将 IndexToString 应用于 Spark 中的特征向量

    Context 我有一个数据框 其中所有分类值都已使用 StringIndexer 进行索引 val categoricalColumns df schema collect case StructField name StringType
  • Spark 中的 Distinct() 函数如何工作?

    我是 Apache Spark 的新手 正在学习基本功能 有一个小疑问 假设我有一个元组 键 值 的 RDD 并且想从中获取一些唯一的元组 我使用distinct 函数 我想知道该函数基于什么基础认为元组是不同的 是基于键 值还是两者 di
  • 如何在 scala repl 和 sbt 控制台中关闭/打开 typer 阶段

    是否可以在不退出当前会话的情况下切换阶段 我尝试进入 power 模式 但它仍然不打印类型 在SBT中只需添加以下设置 set scalacOptions in Compile console Xprint typer 在 REPL 中你可

随机推荐

  • 无法在 Microsoft Powershell 中使用“mvn -D”参数运行 Maven,但可以在命令提示符下运行

    我正在尝试从命令行构建我们的网络项目 但跳过测试 我正在使用命令mvn clean install Dmaven test skip true 当我从传统的黑白命令提示符 又名 DOS shell 运行命令时 该命令有效 但是当我从 Win
  • 直接映射缓存

    直接映射缓存由 16 个块组成 主存包含 16K 块 每个块 8 字节 主存地址格式是什么 意思是每个字段的大小 我知道这些字段是 Tag Block Offset 我只是不知道如何获得每个的尺寸 这是作业吗 为了解决这个问题 您需要知道相
  • Android listview with Glide - 加载后位图加倍

    我正在开发一个安卓应用程序 我的片段之一包含一个简单的列表视图 显示朋友列表 每个朋友都可以有自己的个人资料图片 它是由 Glide 库设置的 当用户没有设置个人资料图片时 将显示默认图像 我的问题是 每次列表中的第一个元素都会获得与列表的
  • 如何使用 QEMU 的简单跟踪后端?

    这是后续this https stackoverflow com questions 37522552 qemu simple backend tracing dosent print anything comment65639854 37
  • javascript:禁用文本选择

    我正在使用 javascript 禁用网站上的文本选择 代码是 可以找到类似的脚本here http rainbow arch scriptmania
  • 使用声明隐藏名称

    include
  • prefix(_ maxLength:) 与符合 LazySequenceProtocol 的结构一起使用时会被类型擦除

    prefix maxLength returns a type erased Sequence in the following codeEXAMPLE http swift sandbox bluemix net repl 599df10
  • 下载文件时如何针对不同浏览器正确设置中文文件名

    我这样设置 response setHeader content disposition attachment filename URLEncoder encode 你好 txt utf 8 它在 Chrome 中有效 但在 Firefox
  • 如何将一个向量的参数插入另一个向量?

    我有一个大小为 5 的 重心 向量 parameter Length barycenters 5 1 2 3 4 5 我想将此向量的参数添加到另一个大小为 7 的向量 b prime 中 这意味着打算让 b prime 0 1 2 3 4
  • 闪亮动态添加输入字段和数据而无需重新渲染

    我正在尝试动态地将新变量添加到正在运行的闪亮应用程序中 但如果我开始编辑一个变量 则每次我添加其他变量时 值 文本和数字 都会重置 这example https stackoverflow com a 56468913 4083743无需使
  • 什么是好的“模板”Yosys 合成脚本?

    我想编写自己的 Yosys 综合脚本 从什么开始是一个好的模板 手册和网页包含各种示例 但没有 权威 的 hello world 示例 The synth命令运行通用综合任务的推荐脚本 看help synth http www cliffo
  • 函数调用后记住数组值

    如果我这样写 c def cf n c range 5 print c if any i gt 3 for i in c is True print hello cf 1 print c 然后我得到 1 2 3 4 hello 我对编程真的
  • 在 PyCharm IDE 5.0.4 中运行特定的单元测试函数

    我正在尝试使用 PyCharm 进行单元测试 使用unittest 并且能够使其工作 测试运行器很好地显示了测试用例和嵌套测试函数的列表 但是 一旦发现测试 我就找不到任何方法来 重新 运行特定的测试函数 唯一可用的按钮将运行整个测试列表
  • 如何解码 JSFuck 脚本?

    我在 JavaScript 中有这样的代码 在控制台中 它将返回 Array filter 如何解码大量与上面文本相似的文本 例如
  • 获取用户当前位置 iOS 8.0

    我尝试使用 MapKit 和 CoreLocation 获取用户当前位置 我对 Objective C 真的很陌生 就我的研究而言 旧版 iOS 到 iOS 8 0 的实现略有不同 我已经正确地遵循了一切 它仍在获取当前位置 我的实际目标是
  • 清除 Bootstrap 中的表单输入字段?

    Bootstrap 是否提供了通过按钮清除表单输入字段的功能 或者我需要通过 jquery 自己推出 从这篇文章jQuery Validate resetForm 不会重置 onFocus 验证 https stackoverflow co
  • 更改 config.assets.version 号有什么作用?

    更改 config assets version 号有什么作用 我知道资产会过期 正如评论中所写 但它在后台会做什么 它会删除所有已编译的资源吗 或者它是否采用该版本号并在其他地方使用它 它将使用另一个指纹 代码附加到文件名 来预编译资产
  • 量角器关闭当前选项卡

    我有一个非角度页面 我需要点击 2 个链接 单击自动在新选项卡中打开的链接之一时 现在我切换到新选项卡并设置browser ignoreSynchronization false因为新打开的选项卡是一个有角度的窗口 并调用我的测试之一 一经
  • 如何用权重标记图边

    Warning 当 Mathematica v 8 0 是最酷的孩子时 我发布了这个问题 该bug已于9 0 1版本解决 The help for EdgeLabels http reference wolfram com mathemat
  • Spark 中函数返回空列表

    下面是获取压缩文件中文件名列表的代码 def getListOfFilesInRepo zipFileRDD RDD String PortableDataStream List String val zipInputStream zipF