【pyspark】DataFrame基础操作(二)

2023-11-16

介绍一下 pyspark 的 DataFrame 基础操作。

一、选择和访问数据

PySpark DataFrame 是惰性计算的,简单地选择一列不会触发计算,但它会返回一个 Column 实例。并且,大多数按列操作都返回 Column 实例

df.a
# 输出
Column<b'a'>


from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())
# 输出
True
  • DataFrame.select() : 获取指定的 Column。
df.select(df.c).show()
# 输出
+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+
  • DataFrame.withColumn() : 更新或添加 Column。
df.withColumn('upper_c', upper(df.c)).show()
# 输出
+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+
  • DataFrame.filter() : 筛选数据。
df.filter(df.a == 1).show()
# 输出
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+

二、应用函数

PySpark 支持各种 UDF 和 API,以允许用户执行 Python 原生函数。例如,下面的示例允许用户在 Python 原生函数中直接使用pandas Series中的 API。

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

# 输出
+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+

另一个例子是DataFrame.mapInPandas允许用户直接使用pandas DataFrame中的 API,而不受结果长度等任何限制。

def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

# 输出
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+

三、数据分组操作

PySpark DataFrame 还提供了一种使用常用方法拆分-应用-组合策略来处理分组数据的方法。它按特定条件对数据进行分组,对每个组应用一个函数,然后将它们组合回 DataFrame。

df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

# 输出
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+

分组,然后用avg()函数应用于结果组,输出平均值。同样的也可以来计算max()、min()等等。

df.groupby('color').avg().show()

# 输出
+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
|black|    6.0|   60.0|
| blue|    3.0|   30.0|
+-----+-------+-------+

还可以使用 pandas API 对每个分组应用 Python 自定义函数。

def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

# 输出
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
+-----+------+---+---+

cogroup 和 applyInPandas 操作。

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def asof_join(l, r):
    return pd.merge_asof(l, r, on='time', by='id')

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
    asof_join, schema='time int, id int, v1 double, v2 string').show()

# 输出
+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【pyspark】DataFrame基础操作(二) 的相关文章

  • 《看完就懂系列》谈谈数据埋点的原理与实现

    这是大冰块2021年第7篇原创文章 和大冰块一起在前端领域努力吧 写在前面 之前公司接了个做广告的业务 甲方财大气粗 沟通也特别顺利 刚开始的时候 大家都摩拳擦掌兴致满满 觉得这个项目奖金一定会翻一番 于是第一版赶得很急 上线之后 点击率与

随机推荐

  • 高并发平台技术栈大起底

    技术栈 technology stack 就是一个公司的透视镜 从某些程度上可以展示出公司的技术实力 从技术桟也可以看出整个平台的技术要素 平台大小规模等 今天来给大家分享我司的技术全家桶 总览 闲来无事就想着还可以总结总结什么 平台架构
  • 时序分析基本概念介绍——时钟sdc

    上次介绍了sdc的基本概念 那接下来几期 我们来讲解一些比较常用的sdc命令 虽然sdc大大小小有上百条命令 但实际常用的其实就那么10几条 今天我们来介绍下与时钟相关的命令 主要有以下命令 create clock create gene
  • Zlib的安装与测试

    官方网址 http www zlib net 进入官网看到 如图所示 最新版本为zlib 1 2 11 然后你用wget http www zlib net zlib 1 2 11或者wget http www zlib net zlib
  • NGINX监听不同端口的配置

    server listen 81 监听的端口 server name localhost 监听的域名 location message 当当问这个路径时实际上会去访问下面这个地址 proxy pass http localhost xxx
  • 【计算机视觉40例】案例14:指纹识别

    导读 本文是专栏 计算机视觉40例简介 的第14个案例 指纹识别 该专栏简要介绍李立宗主编 计算机视觉40例 从入门到深度学习 OpenCV Python 一书的40个案例 目前 该书已经在电子工业出版社出版 大家可以在京东 淘宝 当当等平
  • 谈谈互联网免费思维

    大家好 我是测试小牛 一位平凡又普通的95后互联网博主 今天跟大家聊下互联网的免费思维 在移动互联网时代的今天 可能任何一家互联网创业公司的崛起都具备这种思维 比如当年快滴跟滴滴之争 美团跟饿了么之争 非但不收钱 还花钱大量补贴用户 去使用
  • [思考进阶]05 人与人之间的差距,在于“自律”

    除了要提升自己的技术能力 思维的学习和成长也非常非常重要 特推出此 思考进阶 系列 进行刻意练习 从而提升自己的认知 我认识两个成功的长者 其中有一个人 每天记录当天的重要事项 比如今天和谁签了什么合同 房子装修还差什么材料 女儿20岁生日
  • 【教程+实例】Python爬虫实例——用Python爬虫爬取bangumi上的galgame资讯

    文章目录 0 前言 1 什么是爬虫 2 什么东西可以爬 3 简易爬虫示例 4 如何筛选数据 5 正则表达式 6 最终代码 如何修改代码 以适应自己的需求 效果截图 7 彩蛋 本人与知乎用户间宫羽咲sama是同一人 内容同步更新在CSDN和知
  • 1.通用文件模型及VFS结构-基础

    一 reiserFS 新型文件系统 特点 1 采用完全平衡树来容纳数据 ReiserFS是基于平衡树 STree 的文件系统结构 尤其对于大量文件的巨型文件系统 如服务器上的文件系统 搜索速度要比ext2快 ext2使用局部的二分查找法 综
  • 客户服务器被 ddos 攻击,应该怎么办?

    转自 点击打开链接https www v2ex com t 145842 首页 注册 登录 V2EX way to explore V2EX 是一个关于分享和探索的地方 现在注册 已注册用户请 登录 V2EX 提问指南 广告
  • plSQL中修改代码字体的大小

    在第一次打开PLSQL时 大部分人看代码字体的大小肯定不习惯 这时候只需要修改一下字体的大小即可 首先找到左上角的 工具 然后点击第一个首选项 然后再用户界面找到 字体 最后点击编译器中的 选择 即可 然后就可以进行字体大小的调节了
  • Android推送总结

    http blog csdn net baidu 26352053 article details 54135107 最近Android开发当中推送技术是热点 互联网上不同的博客关于推送的介绍也非常的多 大致上关于推送技术 我们可以有使用第
  • JAVA_HOME is not set

    关于JAVA HOME没有设置 本人是在配置spark集群的时候显示 JAVA HOME is not set 具体如下 但是我们在 cd JAVA HOME 还是能够进去 查阅网上资料有各种各样的解决办法 但是看着没啥关系 于是我猜测是没
  • 解决百度网盘(百度云)分享链接不存在失效、分享的文件已经被取消的问题

    解决百度网盘 百度云 分享链接不存在失效 分享的文件已经被取消的问题 参考文章 1 解决百度网盘 百度云 分享链接不存在失效 分享的文件已经被取消的问题 2 https www cnblogs com hafiz p 5496391 htm
  • spring--容器创建过程(IOC和AOP的过程)

    在前面两篇中我们介绍了IOC和AOP 知道了IOC容器就是来管理每个Bean的 而AOP就是对这些Bean进行功能的拓展 那么这个过程是怎么样实现的呢 IOC和AOP又是怎么样的一个过程呢 一 Spring容器创建过程 我们通过一个简单的创
  • 配置网卡信息/etc/sysconfig/network-scripts/ifcfg-eth0,修改ip

    1 ifconfig查看ip信息 root用户下 root edgzrip2 ifconfig a eth0 Link encap Ethernet HWaddr 00 50 56 2B 27 67 inet addr 192 168 23
  • 一键列出所有容器IP地址脚本

    文章目录 使用场景 效果 列出所有容器 自动跳过无终端的容器 脚本 案例 查看网络带宽占用 过滤IP对应的容器 解决带宽占用问题 使用场景 在服务器带宽被占用 但不好定位是哪个容器时 通过用此脚本来快速过滤容器名 以便解决问题 效果 列出所
  • [C++]使用关键字new创建对象

    1 首先解释new关键字的作用 在堆中开辟指定数据类型的空间 调用指定数据类型的构造函数 创建对象 返回创建的对象 int pn new int new与delete搭配使用 这种写法 pn为栈上的一个指针 指向堆上所对应的内存块 int
  • 逻辑地址、物理地址和线性地址

    逻辑地址 logical address 包含在机器语言指令中用来指定一个操作数或一条指令的地址 这种寻址方式在80X86著名的分段结构中表现的尤为具体 它促使MS DOS或Windows程序员把程序分成若干段 每一个逻辑地址都由一个段 s
  • 【pyspark】DataFrame基础操作(二)

    介绍一下 pyspark 的 DataFrame 基础操作 一 选择和访问数据 PySpark DataFrame 是惰性计算的 简单地选择一列不会触发计算 但它会返回一个 Column 实例 并且 大多数按列操作都返回 Column 实例