如何在 Spark 中跳过 RDD 中的多行标头

2023-12-11

我的第一个 RDD 中的数据就像

1253
545553
12344896
1 2 1
1 43 2
1 46 1
1 53 2

现在前 3 个整数是我需要广播的一些计数器。 之后所有行都具有相同的格式,例如

1 2 1
1 43 2

在函数中对它们进行一些计算后,我会将 3 个计数器之后的所有这些值映射到一个新的 RDD。 但我无法理解如何分离前 3 个值并正常映射其余值。

我的Python代码是这样的

documents = sc.textFile("file.txt").map(lambda line: line.split(" "))

final_doc = documents.map(lambda x: (int(x[0]), function1(int(x[1]), int(x[2])))).reduceByKey(lambda x, y: x + " " + y)

仅当前 3 个值不在文本文件中时才有效,但如果包含它们,则会出现错误。

我不想跳过前 3 个值,而是将它们存储在 3 个广播变量中,然后在映射函数中传递剩余的数据集。

是的,文本文件必须采用该格式。我无法删除这 3 个值/计数器

Function1 只是进行一些计算并返回值。


  1. Python 2 的导入

    from __future__ import print_function
    
  2. 准备虚拟数据:

    s = "1253\n545553\n12344896\n1 2 1\n1 43 2\n1 46 1\n1 53 2"
    with open("file.txt", "w") as fw: fw.write(s)
    
  3. 读取原始输入:

    raw = sc.textFile("file.txt")
    
  4. 提取标题:

    header = raw.take(3)
    print(header)
    ### [u'1253', u'545553', u'12344896']
    
  5. 过滤线:

    • using zipWithIndex

      content = raw.zipWithIndex().filter(lambda kv: kv[1] > 2).keys()
      print(content.first())
      ## 1 2 1
      
    • using mapPartitionsWithIndex

      from itertools import islice
      
      content = raw.mapPartitionsWithIndex(
          lambda i, iter: islice(iter, 3, None) if i == 0 else iter)
      
      print(content.first())
      ## 1 2 1
      

NOTE:所有功劳都归于pzecevic and 肖恩·欧文(参见链接来源)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Spark 中跳过 RDD 中的多行标头 的相关文章

  • Python Popen 与 psexec 挂起 - 不良结果

    我对 subprocess Popen 和我认为是管道的问题有疑问 我有以下代码块 从 cli 运行时 100 都不会出现问题 p subprocess Popen psexec serverName get cmd c ver echo
  • django_openid_auth TypeError openid.yadis.manager.YadisServiceManager 对象不是 JSON 可序列化

    I used django openid auth在我的项目上 一段时间以来它运行得很好 但今天 我测试了该应用程序并遇到了这个异常 Environment Request Method GET Request URL http local
  • python future 和元组解包

    实现像使用 future 进行元组解包这样的事情的优雅 惯用的方法是什么 我有这样的代码 a b c f x y g a b z h y c 我想将其转换为使用期货 理想情况下我想写一些类似的东西 a b c ex submit f x y
  • python 中的代表

    我实现了这个简短的示例来尝试演示一个简单的委托模式 我的问题是 这看起来我已经理解了委托吗 class Handler def init self parent None self parent parent def Handle self
  • 如何迭代按值排序的 Python 字典?

    我有一本字典 比如 a 6 b 1 c 2 我想迭代一下by value 不是通过键 换句话说 b 1 c 2 a 6 最直接的方法是什么 sorted dictionary items key lambda x x 1 对于那些讨厌 la
  • 在 Tensorflow tf.nn.nce_loss 中出现 TypeError:'Mul' Op 的输入 'y' 的类型为 float32,与参数 'x' 的 int32 类型不匹配

    我正在研究 Tensor Flow 中的 Bag of Words 实现 并得到了 类型错误 Mul Op 的输入 y 的类型为 float32 与参数 x 的 int32 类型不匹配 在 tf nn nce loss 中 我尝试查看 tf
  • 如何使用 Plotly 中的直方图将所有离群值分入一个分箱?

    所以问题是 我可以在 Plotly 中绘制直方图 其中所有大于某个阈值的值都将被分组到一个箱中吗 所需的输出 但使用标准情节Histogram类我只能得到这个输出 import pandas as pd from plotly import
  • 将数据帧行转换为字典

    我有像下面的示例数据这样的数据帧 我正在尝试将数据帧中的一行转换为类似于下面所需输出的字典 但是当我使用 to dict 时 我得到了索引和列值 有谁知道如何将行转换为像所需输出那样的字典 任何提示都非常感激 Sample data pri
  • 从零开始的 numpy 形状意味着什么

    好的 我发现数组的形状中可以包含 0 对于将 0 作为唯一维度的情况 这对我来说是有意义的 它是一个空数组 np zeros 0 但如果你有这样的情况 np zeros 0 100 让我很困惑 为什么这么定义呢 据我所知 这只是表达空数组的
  • 如何从Python中的字符串中提取变量名称和值

    我有一根绳子 data var1 id 12345 name John White python中有没有办法将var1提取为python变量 更具体地说 我对字典变量感兴趣 这样我就可以获得变量的值 id和name python 这是由提供
  • 在 Pandas 中使用正则表达式的多种模式

    我是Python编程的初学者 我正在探索正则表达式 我正在尝试从 描述 列中提取一个单词 数据库名称 我无法给出多个正则表达式模式 请参阅下面的描述和代码 描述 Summary AD1 Low free DATA space in data
  • 使用 Firefox 绕过弹出窗口下载文件:Selenium Python

    我正在使用 selenium 和 python 来从中下载某些文件web page http www oceanenergyireland com testfacility corkharbour observations 我之前一直使用设
  • mac osx 10.8 上的初学者 python

    我正在学习编程 并且一直在使用 Ruby 和 ROR 但我觉得我更喜欢 Python 语言来学习编程 虽然我看到了 Ruby 和 Rails 的优点 但我觉得我需要一种更容易学习编程概念的语言 因此是 Python 但是 我似乎找不到适用于
  • 使用yield 进行字典理解

    作为一个人为的例子 myset set a b c d mydict item yield join item s for item in myset and list mydict gives as cs bs ds a None b N
  • 如何为每个屏幕添加自己的 .py 和 .kv 文件?

    我想为每个屏幕都有一个单独的 py 和 kv 文件 应通过 main py main kv 中的 ScreenManager 选择屏幕 设计应从文件 screen X kv 加载 类等应从文件 screen X py 加载 Screens
  • 当鼠标悬停在上面时,intellisense vscode 不显示参数或文档

    我正在尝试将整个工作流程从 Eclipse 和 Jupyter Notebook 迁移到 VS Code 我安装了 python 扩展 它应该带有 Intellisense 但它只是部分更糟糕 我在输入句点后收到建议 但当将鼠标悬停在其上方
  • 如何读取Python字节码?

    我很难理解 Python 的字节码及其dis module import dis def func x 1 dis dis func 上述代码在解释器中输入时会产生以下输出 0 LOAD CONST 1 1 3 STORE FAST 0 x
  • Elastic Beanstalk 中的 enum34 问题

    我正在尝试在 Elastic Beanstalk 中设置 django 环境 当我尝试通过requirements txt 文件安装时 我遇到了python3 6 问题 File opt python run venv bin pip li
  • 从 Twitter API 2.0 获取 user.fields 时出现问题

    我想从 Twitter API 2 0 端点加载推文 并尝试获取标准字段 作者 文本 和一些扩展字段 尤其是 用户 字段 端点和参数的定义工作没有错误 在生成的 json 中 我只找到标准字段 但没有找到所需的 user fields 用户
  • Scrapy Spider不存储状态(持久状态)

    您好 有一个基本的蜘蛛 可以运行以获取给定域上的所有链接 我想确保它保持其状态 以便它可以从离开的位置恢复 我已按照给定的网址进行操作http doc scrapy org en latest topics jobs html http d

随机推荐

  • Angular 中的子父母沟通最佳实践

    我正在努力提高 Angular 的水平 并且想了解儿童与家长之间沟通的最佳实践 我当前想要使用的应用程序是 Angular 6 我知道我可以使用 ViewChild Output 或创建服务在子父组件之间进行通信 还有其他方式进行沟通吗 如
  • 无法让OkHttp的response.body.toString()返回字符串

    我正在尝试使用 OkHttp 获取一些 json 数据 但当我尝试记录时无法弄清楚为什么response body toString 我得到的是Results com squareup okhttp Call RealResponseBod
  • 在最新的 MapBox SDK 6.7 中旋转和更改标记的位置

    Mapbox Android SDK 6 7 0 我们正在开发的应用程序的要求是 我们必须在不同的 LatLng 位置添加多个标记 并使用一些方位旋转它们 在旧的mapbox版本 4 2 1 中 我们可以毫无问题地做到这一点 Working
  • 如何在可编码类型中使用 Any

    我目前正在与Codable输入我的项目并面临问题 struct Person Codable var id Any id上面的代码可以是String or an Int 这就是原因id属于类型Any 我知道Any is not Codabl
  • 在 ActionScript 3 中加密/解密图像

    我正在开发一个移动应用程序 其中包含 本质上 图片消息功能 我需要使用简单的纯文本密码对图像进行加密 解密 由于所捕获图像的性质 加密和解密过程在设备上进行是 至关重要的 在听完有关 Flex 移动开发的演示后 我决定使用 Flex SDK
  • 为什么查询不会因子查询中不存在的列而失败?

    我在查询中拼写错误 并遇到了 MySQL 的奇怪行为 create table aaa id bigint auto increment primary key amount int not null other column varcha
  • sklearn 绘制带有标签的混淆矩阵

    我想绘制一个混淆矩阵来可视化分类器的性能 但它仅显示标签的数量 而不显示标签本身 from sklearn metrics import confusion matrix import pylab as pl y test business
  • 是否有可能并且以良好的形式在 Angular 中重用相同的数据工厂?

    我正在查看 Angular 的 CRUD 通用工厂 我目前更喜欢使用它而不是使用服务 app factory dataFactory http function http var urlBase odata ContentTypes The
  • Vue 3 外部组件/插件在运行时加载

    我正在设计一个架构Vue 3具有基于分布式模块所有权的应用程序 模块系统将用插件来表示 似乎是最合适的解决方案 允许vuex模块和vue router动态注入 每个此类模块 插件都将由在独立存储库中工作的专门团队开发 我们不能使用npm每个
  • 重定向 stdin 和 stdout,其中 stdin 首先关闭

    这实际上与我已经回答的另一个问题有关 这个问题在这里 将一个进程对象的标准输出重定向到另一个进程对象的标准输入 我的问题是 我认为 获取输入的程序应该在程序输出之前退出 这是与我正在做的事情等效的 bash tccat i dev sr0
  • 如何让 jqGrid 重新加载到服务器?

    我们使用网格上的 jqGrid 导航器重新加载按钮loadonce设置为 true 重新加载按钮目前不会返回服务器获取数据 我们如何才能让重新加载去服务器获取最新数据 我相信我们可以使用beforeRefresh设置网格的回调data to
  • 任务“:app:validateSigningDebug”执行失败。 java.util.concurrent.ExecutionException:

    我刚刚创建了一个新的 flutter 应用程序 当我尝试在模拟器上运行它时 会出现以下错误消息 在调试模式下在为 x86 构建的 Android SDK 上启动 lib main dart 运行 Gradle 任务 assembleDebu
  • Angular:如何监视 $element.on

    我有一个指令 它在其中做了类似的事情link功能 angular module myApp directive barFoo function return restrict E link function scope element el
  • 这个异步任务方法有什么问题?

    这只是一个简单的异步任务 但我总是遇到奇怪的编译器错误 此代码来自 ASP NET 4 项目中的 Web API 服务 使用 VS2010 创建 即使ContinueWith 非泛型 隐式返回Task 但此错误仍然存 在 Code publ
  • Console.WriteLine 与 Print

    我知道这听起来像是一个非常愚蠢的问题 但我很困惑 只能找到 WriteLine vs debug log 问题的答案 而不是这个问题 我最初开始使用 Visual Studio 在那里我学会了使用 Console WriteLine exa
  • Spring、Hibernate 事务。加入 A 中创建的线程 B 中的事务。可能吗?

    是否可以在另一个线程中使用事务 就像传递交易一样创建于thread A然后执行一些逻辑Thread B在同一笔交易中 我有两个队列和单独的执行器 用于处理某些实体类型的数量 然而 批处理作业是管理两个人口并等待每个完成 这将是不必要创建两个
  • 未提及的结构体字段是否*总是*初始化为零(即当结构体位于堆栈上时)?

    根据实验 在 Clang 和 GCC 中 使用 O2 和 O0 是seems在下面的代码中 typedef struct foo s int i int j foo t int main void foo t foo i 42 foo j
  • 更新到 OSX 10.9,现在使用自制程序出现 ruby​​ 错误

    更新到 Mavericks 并更新了 Xcode 现在我在尝试使用brew时遇到此错误 brew cleanup System Library Frameworks Ruby framework Versions 2 0 usr lib r
  • 登录后的 C# .NET Cookie 处理

    我花了一些时间研究这个问题 但仍然无法弄清楚 它看起来很简单 所以我觉得自己像个白痴问这个问题 但经过一段时间的研究后 我似乎无法掌握它的窍门 我需要以编程方式登录该网站 https wholesale frontiercoop com 存
  • 如何在 Spark 中跳过 RDD 中的多行标头

    我的第一个 RDD 中的数据就像 1253 545553 12344896 1 2 1 1 43 2 1 46 1 1 53 2 现在前 3 个整数是我需要广播的一些计数器 之后所有行都具有相同的格式 例如 1 2 1 1 43 2 在函数