PySpark中的RDD基本操作

2023-05-16

PySpark中的RDD基本操作

【课程性质:PySpark数据处理】

文章目录

    • 1. 实验目标
    • 2. 本次实验主要使用的 P y t h o n Python Python
    • 3. 适用的对象
    • 4. 实验步骤
    • PySpark中的RDD基本操作
      • 步骤1 安装并引入必要的库
      • 步骤2获取数据并创建RDD
      • 步骤3 `filter` 转换
      • 步骤4 `map` 转换
      • 步骤5 `collect` 动作 `action`
    • PySpark中的RDDs Aggregations操作
      • 步骤6 根据标签检查交互持续时间
      • 步骤7 更好的方法,使用 `aggregate`
    • PySpark中的RDDs key value操作
      • 步骤8 为交互类型创建键值对RDD
      • 步骤9 具有键/值对RDDs的数据聚合

1. 实验目标

  • 使用 PySpark创建RDD,并学习RDD的基本操作

2. 本次实验主要使用的 P y t h o n Python Python

名称版本简介
r e q u e s t s requests requests 2.20.0 2.20.0 2.20.0线性代数
P a n d a s Pandas Pandas 0.25.0 0.25.0 0.25.0数据分析
P y S p a r k PySpark PySpark 2.4.3 2.4.3 2.4.3大数据处理
M a t p l o t l i b Matplotlib Matplotlib 3.0.1 3.0.1 3.0.1数据可视化

3. 适用的对象

  • 本课程假设您已经学习了 P y t h o n Python Python 基础,具备数据可视化基础
  • 学习对象:本科学生、研究生、人工智能、算法相关研究者、开发者
  • 大数据分析与人工智能

4. 实验步骤

PySpark中的RDD基本操作

步骤1 安装并引入必要的库

# 安装第三方库
!pip install pyspark==2.4.5
# 获取数据集
import zipfile
with zipfile.ZipFile('/resources/jupyter/pyspark/pyspark_dataset_kdd.zip') as z:
    z.extractall()

本实验将介绍三个基本但必不可少的 Spark操作。其中一个是***transformations***中的map and filter。另一个是***action***的collect。同时,我们将介绍 Spark 中的***persistence***概念。

步骤2获取数据并创建RDD

正如我们在第一个笔记本中所做的那样,我们将使用199年KDD杯提供的缩减数据集(1θ%),其中包含近50万个网络交互。该文件作为Gzip文件提供,我们将在下载到本地。

现在我们可以使用这个文件来创建RDD

data_file ="./kddcup.data_10_percent.gz"
raw_data=sc.textFile(data_file)

#查看前五行
raw_data.take(5)

现在我们将数据文件加载到 raw_data RDD中。

image-20210610170055935

步骤3 filter 转换

这个转换可以应用于RDDs,以便只保留满足特定条件的元素。
更具体地说,函数是在原始RDD中的每个元素上求值的。新生成的RDD将只包含那些使函数返回True的元素
例如,假设我们想要计算有多少normal.。我们数据集中进行操作。我们可以按照以下方式过滤我们的 raw_data RDD。

normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

image-20210610170409593

现在我们可以计算在新的RDD中有多少个元素。

from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print("There are {} 'normal.' interactions".format(normal_count))
print("Count completed in {} seconds".format(round(tt,3)))

image-20210610171228409

步骤4 map 转换

通过使用Spark中的map转换,我们可以对RDD中的每个元素应用一个函数。Python的lambdas对这一点特别有表现力。
在本例中,我们希望以csv格式读取数据文件。我们可以这样做,对RDD中的每个元素应用一个lambda函数,如下所示。

from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print("Parse completed in {} seconds".format(round(tt,3)))
pprint(head_rows[0])

image-20210610171623188

如果我们取很多元素而不是前几个呢?

t0 = time()
head_rows = csv_data.take(100000)
tt = time() - t0
print("Parse completed in {} seconds".format(round(tt,3)))

image-20210610171758638

使用 map 和预定义函数
当然,我们可以使用带有map的预定义函数。
假设我们希望RDD中的每个元素都是键值对,其中键是标记(例如***normal***),值是表示CSV格式文件中的行的整个元素列表。我们可以这样进行。

def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)
key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])

image-20210610172058421

步骤5 collect 动作 action

到目前为止,我们已经使用了actions中的counttake
我们需要学习的另一个基本动作是collect。基本上,它将把RDD中的所有元素都放到内存中,以便我们使用它们。因此,必须小心使用它,特别是在处理大型RDDs时。

一个使用原始数据的例子。

t0 = time()
all_raw = raw_data.collect()
tt = time() - t0
print("Data collected in {} seconds".format(round(tt,3)))

image-20210610172751115

作为组合前面所有内容的最后一个示例,我们希望收集所有normal.交互作为键值对。

# get data from file
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)

# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() -t0
normal_count = len(all_normal)
print("Data collected in {} seconds".format(round(tt,3)))
print("There are {} 'normal' interactions". format(normal_count))

image-20210610173058843

PySpark中的RDDs Aggregations操作

步骤6 根据标签检查交互持续时间

foldreduce都将函数作为参数应用于 RDD 的两个元素。fold操作与reduce操作的不同之处在于,它获取用于初始调用的额外初始***zero value***。这个值应该是所提供函数的恒等元素。

例如,假设我们想知道 normal interactions 和 attack interactions 的总持续时间。我们可以使用reduce,如下所示。

# 解析数据
csv_data = raw_data.map(lambda x: x.split(","))

# separate into different RDDs
normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.")
attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")

我们传递给 reduce 的函数获取和返回RDD类型相同的元素。

如果我们想要计算持续时间,我们需要将该元素提取到一个新的RDD中。

normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))
attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))

现在我们可以reduce这些新的RDDs。

total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)

print("Total duration for 'normal' interactions is %s"\
    %(total_normal_duration))
print("Total duration for 'attack' interactions is %s"\
    %(total_attack_duration))

image-20210610174843888

image-20210610174914098

我们可以更进一步,使用计数来计算持续时间的平均值。

normal_count = normal_duration_data.count()
attack_count = attack_duration_data.count()

print("Mean duration for 'normal' interactions is %s"\
    %(round(total_normal_duration/float(normal_count),3)))
print("Mean duration for 'attack' interactions is %s"\
    %(round(total_attack_duration/float(attack_count),3)))

image-20210610175034171

我们有一个第一个(而且过于简单)的方法来识别attack interactions。

步骤7 更好的方法,使用 aggregate

aggregate 操作将我们从返回与我们正在处理的 RDD 类型相同的约束中解放出来。与fold类似,我们提供了想要返回的类型的初始零值。

然后我们提供两个函数。第一个用于将 RDD 中的元素与累加器组合起来。第二个函数用于合并两个累加器。我们来实际计算一下之前的均值。

normal_sum_count = normal_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print("Mean duration for 'normal' interactions is %s"\
    %(round(normal_sum_count[0]/float(normal_sum_count[1]),3)))

image-20210610184255373

在上一步骤的聚合中,累加器的第一个元素保存总和,而第二个元素保存计数。

将累加器与RDD元素组合起来就是对值求和并增加计数。组合两个累加器只需要一个成对的和。

对于attack type interactions,我们也可以这样做。

attack_sum_count = attack_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print("Mean duration for 'attack' interactions is %s"\
    %(round(attack_sum_count[0]/float(attack_sum_count[1]),3)))

image-20210610184332227

image-20210610185730787

PySpark中的RDDs key value操作

步骤8 为交互类型创建键值对RDD

在这个实验中,我们想对我们的网络交互数据集做一些探索性的数据分析。更具体地说,我们希望根据每个网络交互类型的某些变量(如持续时间)来分析它们。为此,我们首先需要创建适合于此的RDD,其中每个交互都被解析为表示值的CSV行,并与对应的标记一起作为键放在一起。

通常,我们创建键/值对 RDDs 是通过使用map的函数应用于原始数据。这个函数返回给定 RDD 元素的对应对。我们可以这样进行。

csv_data = raw_data.map(lambda x: x.split(","))
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

现在,我们已经准备好了要使用的键/值对数据。让我们得到第一个元素,看看它是什么样的。

key_value_data.take(1)

image-20210610190144525

步骤9 具有键/值对RDDs的数据聚合

我们可以对具有键/值对的RDDs进行普通RDDs的所有转换和操作。我们只需要让函数与成对元素一起工作。

此外,Spark还提供了特定的功能来处理包含对元素的RDDs。它们与一般的RDDs非常相似。

例如,我们有一个reduceByKey转换,我们可以使用它来计算每种网络交互类型的总持续时间。

key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

durations_by_key.collect()

image-20210610190227067

我们对键/值对有一个特定的计数操作。

counts_by_key = key_value_data.countByKey()
counts_by_key

image-20210610190332863

使用combineByKey

这是最常见的按键聚合函数。大多数其他每个键组合器都是使用它实现的。我们可以把它看作是aggregate 的等效,因为它允许用户返回与输入数据类型不同的值。

例如,我们可以使用它来计算每种类型的平均持续时间,如下所示。

sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()

image-20210610190427588

我们可以看到,参数与前一个实验中传递给aggregate的参数非常相似。与每种类型关联的结果都是成对的。如果我们想要得到平均值,我们需要在收集结果之前做除法。

duration_means_by_type = sum_counts.map(lambda x: (x[0], round(x[1][0]/x[1][1],3))).collectAsMap()

# Print them sorted
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
    print (tag, duration_means_by_type[tag])

image-20210610190824184

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

PySpark中的RDD基本操作 的相关文章

  • 删除Activiti表单

    DROP TABLE IF EXISTS ACT RE DEPLOYMENT DROP TABLE IF EXISTS ACT GE PROPERTY DROP TABLE IF EXISTS ACT HI ACTINST DROP TAB
  • 表单重置的2种方式

    1 34 resetBtn 34 click function 34 advertForm 34 0 reset 2 34 resetBtn 34 click function document getElementById 34 adve
  • mybatis的一些笔记

    目录 结论 xff1a mybatis是与数据库交互的框架 MyBatis 01 HelloWorld MyBatis 02 config MyBatis 03 mapper sql映射文件的增删改查 1 单个参数 xff1a mybati
  • SpringMVC笔记

    目录 QuickStart 重要注解 64 RequestMapping 1 把数据封装到bean中且bean中有引用类型user name的值与bean 的参数必须完全一致 xff0c 自动封装 xff0c 引用的类型用 级联user u
  • 解决ubuntu下c++标准库缺少conio.h

    该头文件是用于检测键盘输入的 xff0c 在windows下 xff0c c 43 43 标准库是自带的 xff0c 但是在Linux下没有该头文件 xff0c 可以通过手动的方式进行添加 步骤如下 xff1a 1 打开终端 xff0c s
  • 重装系统之gpt分区与mbr分区

    重装ubunutu系统时往往会碰到不能将系统装在所选分区的 情况 xff0c 一般是因为硬盘原本分区不是gpt分区 重装windows系统时往往会碰到不能将系统装在所选分区的 情况 xff0c 一般是因为硬盘原本分区不是mbr分区 gpt
  • Debian的GNOME汉化(美化)简易教程(转)

    Debian的GNOME汉化 美化 简易教程 转 64 more 64 Debian的GNOME汉化 美化 简易教程 fcitx开机启动 tahoma英文显示 simsun中文显示 openoffice字体调整 这是我自己汉化GNOME的心
  • Centos8 系统下安装jdk1.8

    查看是否已经安装java java version 检查 yum 中有没有 java1 8 包 yum list java 1 8 安装jdk yum install java 1 8 0 openjdk y
  • 3.3:如何在Python中创建文件?

    一 在Python中创建文件的主要步骤包括 1 使用open 函数打开一个文件对象 2 使用文件对象的write 方法向文件中写入内容 3 关闭文件对象 以释放文件资源 二 open 函数有三个参数 1 文件0名 需要创建的文件的名称和路径
  • Android设置暗码简要流程

    设置暗码 1 Phone对暗码的简要处理流程 路径 packages services Telephony src com android phone SpecialCharSequenceMgr java phone中对暗码的处理 spa
  • [教程]Ubuntu20.04安装Node.js

    前言 这两天在装一个工具软件的时候 xff0c 要求先安装Node js xff0c 我也不懂这是用来干什么的 xff0c 只知道跟JavaScript有关 不懂没关系 xff0c 装就完事了 xff0c 管它呢 本文将介绍3种在Ubunt
  • Ubuntu使用管理员(root)身份登录系统

    Ubuntu使用管理员 xff08 root xff09 身份登录系统 一 Ubuntu安装好后 xff0c Ubuntu系统默认root用户是不能登录的 xff0c 密码也是空的 如果想要使用root用户登录 xff0c 必须先为root
  • Android使用代码进行界面布局和改变图标、标题、名称、主界面

    一 代码进行界面布局 lt xml version 61 34 1 0 34 encoding 61 34 utf 8 34 gt 二 Android Studio改变图标 标题 名称 主界面 注意 xff1a 64 表示是在哪个目录或者是
  • Linux(Ubuntu)系统如何安装Python

    Linux 系统是为编程而生的 xff0c 因此绝大多数的 Linux 发行版 xff08 Ubuntu CentOS 等 xff09 都默认自带了 Python 有的 Linux 发行版甚至还会自带两个版本的 Python xff0c 例
  • 如何用python实现多线程爬虫

    当单线程python爬虫已经不能满足企业需求时 xff0c 很多程序员会进行改代码或者增加服务器数量 xff0c 这样虽说也能达到效果 xff0c 但是对于人力物力也是一笔不小的消耗 如果是技术牛点的 xff0c 正常都会自己重新改写多线程
  • ## Hive分析疫情数据

    拿到的数据部分如下 xff1a 4月27日 黑龙江 境外输入 不详 0 45 0 黑龙江卫健委 https m thepaper cn newsDetail forward 7160075 4月27日 内蒙古 境外输入 不详 0 8 0 央
  • python --根据windows窗口名称、进程pid打开窗口(pygetwindow详解)

    pygetwindow详解 简介 pygetwindow是一个Python库 xff0c 用于获取 操作和管理当前打开的窗口 它提供了一些常用的窗口操作方法 xff0c 包括获取窗口句柄 xff0c 获取窗口位置和大小 xff0c 移动和调
  • Redhat Linux advance Server V2.1无法进入桌面(转)

    Redhat Linux advance Server V2 xff11 无法进入桌面 转 64 more 64 在本地调试安装了个Redhat Linux advance Server V2 1 xff0c 一共有5张光盘 xff0c 我
  • Mac上类似于xshell的远程工具:finalshell 和 royal tsx

    FinalShell 国产 国产 国产 自己研发的 是一体化的的服务器 网络管理软件 不仅是ssh客户端 还是功能强大的开发 运维工具 充分满足开发 运维需求 特色功能 免费海外服务器远程桌面加速 ssh加速 本地化命令输入框 支持自动补全
  • css查找元素注意事项

    一 CSS ID 选择器查找元素 1 注意 xff1a 如果元素的ID不唯一 xff0c 或者是动态的 或者name以及linktext属性值也不唯一 我们就需要考虑用Xpath来查找元素了 xff0c 然后再对元素执行操作 不管用什么方式

随机推荐

  • OS2.3.7:多生产者,多消费者问题

    文章目录 0 问题描述1 问题分析2 实现3 总结 0 问题描述 桌子上有一只盘子 xff0c 每次只能向其中放入一个水果 爸爸专向盘子中放苹果 xff0c 妈妈专向盘子中放橘子 xff0c 儿子专等着吃盘子中的橘子 xff0c 女儿专等着
  • java 方法名类名命名规范

    一 命名规范 1 项目名全部小写 2 包名全部小写 3 类名首字母大写 xff0c 如果类名由多个单词组成 xff0c 每个单词的首字母都要大写 大驼峰 xff0c 如 xff1a public class MyFirstClass 4 变
  • Qt arm环境安装

    一 相关工作准备 Qt opensource 和 Qt everywhere 下载 链接 版本为5 9 8 arm linux gcc下载 链接 版本为4 8 3 tslib 下载 链接 版本为1 21 ps 可以不安装Qt opensou
  • STM32驱动ST7789V2 tft屏幕

    一 简介 本次教程使用的是1 54寸240 240像素的tft屏幕 xff0c 其接口协议为SPI协议 在使用的过程中仅需要四根数据即可驱动点亮屏幕 然后硬件使用的是STM32F103C8T6核心板 xff0c 用的是SPI2 一般购买屏幕
  • linux设置复杂度策略、登录超时处理功能

    1 在字符终端下 xff0c 实现某一用户连续错误登陆N次后 xff0c 就锁定该用户X分钟 pam tally2 执行 vi etc pam d login 在 PAM 1 0 下新起一行 xff0c 加入 auth required p
  • 飞控陀螺仪,磁力计,加速计,四元数姿态结算

    MPU6050主要包含陀螺仪和加速度计 陀螺仪主要测量角速度 xff0c 即可以测出某一时间段物体转过的角度 加速度计测量的是物体的加速度 xff0c 重力加速度即物体受重力作用的情况下具有的加速度 xff0c 物体静止时 xff0c 加速
  • 智慧物业管理系统(Springboot)

    开发工具 xff1a IDEA xff0c jdk1 8 数据库 xff1a mysql5 7 前台框架 xff1a layui 后端技术 xff1a springboot 项目描述 xff1a 1 前台住户登录 2 智慧物业管理后台 2
  • 北京大学2020公开课 AVL-Python实现代码

    class TreeNode def init self key val left 61 None right 61 None parent 61 None self key 61 key self payload 61 val self
  • Docker-2020详细教程<配合千锋Java学习营>

    Docker 2020详细教程 lt 配合千锋Java学习营 gt 2020 Docker最新超详细版教程通俗易懂 一 Docker介绍 1 下载Dcoker依的赖环境 想安装Docker xff0c 需要先将依赖的环境全部下载下来 xff
  • 使用阿里云部署Flask网页

    使用阿里云部署Flask网页 前端网页部署 阿里云apache CentOS 配置好Apache后 xff0c 将一整个html css js文件全部copy进 var www html目录下 之后就可以通过访问IP地址访问到你的index
  • MapReduce的个人理解

    MapReduce的个人理解 文章目录 MapReduce模型简介Map和Reduce函数这里给出一个简单实例 MapReduce的工作流程工作流程概述MapReduce的各个执行阶段 Shuffle过程详解Shuffle过程简介Map端的
  • Hadoop配置

    Hadoop配置 文章目录 Linux shell配置环境变量使环境变量生效Hadoop 集群安装配置到两台阿里云linux主机上Hadoop集群模式安装实验环境实验内容1 安装jdk2 下面来修改环境变量3 安装hadoop4 下面来修改
  • HDFS 的使用和管理

    HDFS 的使用和管理 文章目录 HDFS 的使用和管理实验环境实验内容实验步骤1 启动hadoop的hdfs相关进程2 用jps查看HDFS是否启动3 验证HDFS运行状态4 ls 命令5 put 命令6 moveFromLocal 命令
  • HDFS API操作

    HDFS API操作 实验环境 Linux Ubuntu 16 04 前提条件 xff1a 1 xff09 Java 运行环境部署完成 2 xff09 Hadoop 的单点部署完成 上述前提条件 xff0c 我们已经为你准备就绪了 实验内容
  • HBase的安装部署和使用

    HBase的安装部署和使用 文章目录 HBase的安装部署和使用实验环境实验内容实验步骤1 点击 34 命令行终端 34 xff0c 打开新的命令行窗口2 解压安装包3 更改文件夹名和所属用户4 设置HBASE HOME环境变量5 修改hb
  • 熟悉常用的HBase操作

    熟悉常用的HBase操作 文章目录 实验环境实验内容1 编程实现以下指定功能 xff0c 并用Hadoop提供的HBase Shell命令完成相同的任务 xff08 1 xff09 列出HBase所有的表的相关信息 xff0c 如表名 创建
  • Hive的安装部署和管理

    Hive的安装部署和管理 文章目录 实验环境实验内容实验步骤1 点击 34 命令行终端 34 xff0c 打开新窗口2 解压安装包3 更改文件夹名和所属用户4 设置HIVE HOME环境变量5 导入MySql jdbc jar包到hive
  • Hive数仓:使用桶表

    Hive数仓 xff1a 使用桶表 文章目录 Hive数仓 xff1a 使用桶表实验环境实验步骤1 点击 34 命令行终端 34 xff0c 打开新窗口2 启动MySQL3 指定元数据数据库类型并初始化Schema4 启动Hadoop5 启
  • python 获取当前文件路径

    一 Python 获取当前文件路径方法 sys path 0 获取文件当前工作目录路径 绝对路径 sys argv 0 获得模块所在的路径 由系统决定是否是全名 若显示调用python指令 xff0c 如python demo py xff
  • PySpark中的RDD基本操作

    PySpark中的RDD基本操作 课程性质 xff1a PySpark数据处理 文章目录 1 实验目标2 本次实验主要使用的 P y t h