Pyspark:如何处理 python 用户定义函数中的空值

2024-02-16

我想使用一些非 pyspark 原生的字符串相似性函数,例如数据帧上的 jaro 和 jaro-winkler 度量。这些在 python 模块中很容易获得,例如jellyfish。对于没有的情况,我可以写 pyspark udf 很好null存在价值观,即将猫与狗进行比较。当我将这些 udf 应用于数据时null价值观存在,但不起作用。在像我正在解决的问题中,其中一个字符串是很常见的null

我需要帮助让我的字符串相似性 udf 一般工作,更具体地说,在其中一个值是的情况下工作null

我编写了一个 udf,它在输入数据中没有空值时起作用:

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import jellyfish.cjellyfish

def jaro_winkler_func(df, column_left, column_right):

    jaro_winkler_udf = udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2), returnType=DoubleType())

    df = (df
          .withColumn('test',
                      jaro_winkler_udf(df[column_left], df[column_right])))

    return df

输入和输出示例:

+-----------+------------+
|string_left|string_right|
+-----------+------------+
|       dude|         dud|
|       spud|         dud|
+-----------+------------+
+-----------+------------+------------------+
|string_left|string_right|              test|
+-----------+------------+------------------+
|       dude|         dud|0.9166666666666666|
|       spud|         dud|0.7222222222222222|
+-----------+------------+------------------+

当我对具有空值的数据运行此操作时,我会得到通常的大量火花错误,最适用的似乎是TypeError: str argument expected。我认为这是由于null数据中的值,因为它在没有数据时起作用。

我修改了上面的函数来检查两个值是否不为空,并且只有在这种情况下才运行该函数,否则返回 0。

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
import jellyfish.cjellyfish

def jaro_winkler_func(df, column_left, column_right):

    jaro_winkler_udf = udf(f=lambda s1, s2: jellyfish.jaro_winkler(s1, s2), returnType=DoubleType())

    df = (df
       .withColumn('test',
                   F.when(df[column_left].isNotNull() & df[column_right].isNotNull(),
                          jaro_winkler_udf(df[column_left], df[column_right]))
                   .otherwise(0.0)))

    return df

但是,我仍然遇到与以前相同的错误。

示例输入以及我希望的输出:

+-----------+------------+
|string_left|string_right|
+-----------+------------+
|       dude|         dud|
|       spud|         dud|
|       spud|        null|
|       null|        null|
+-----------+------------+
+-----------+------------+------------------+
|string_left|string_right|              test|
+-----------+------------+------------------+
|       dude|         dud|0.9166666666666666|
|       spud|         dud|0.7222222222222222|
|       spud|        null|0.0               |
|       null|        null|0.0               |
+-----------+------------+------------------+

我们将稍微修改一下您的代码,它应该可以正常工作:

@udf(DoubleType())
def jaro_winkler(s1, s2):
    if not all((s1, s2)):  # or, if None in (s1, s2):
        out = 0
    else:
        out = jellyfish.jaro_winkler(s1, s2)
    return out


def jaro_winkler_func(df, column_left, column_right):
    df = df.withColumn("test", jaro_winkler(df[column_left], df[column_right]))
    return df

根据预期的行为,您需要更改测试:

  • if not all((s1, s2)):两者都返回 0null和空的 细绳''.
  • if None in (s1, s2):仅返回 0null
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pyspark:如何处理 python 用户定义函数中的空值 的相关文章

  • 火花内存不足

    我有一个文件夹 里面有 150 G 的 txt 文件 大约 700 个文件 平均每个 200 MB 我使用 scala 来处理文件并最终计算一些汇总统计数据 我认为有两种可能的方法可以做到这一点 手动循环所有文件 对每个文件进行计算并最终合
  • Pandas/Google BigQuery:架构不匹配导致上传失败

    我的谷歌表中的架构如下所示 price datetime DATETIME symbol STRING bid open FLOAT bid high FLOAT bid low FLOAT bid close FLOAT ask open
  • 将 python2.7 与 Emacs 24.3 和 python-mode.el 一起使用

    我是 Emacs 新手 我正在尝试设置我的 python 环境 到目前为止 我已经了解到在 python 缓冲区中使用 python mode el C c C c将当前缓冲区的内容加载到交互式 python shell 中 显然使用了什么
  • 使用Python请求登录Google帐户

    在多个登录页面上 需要谷歌登录才能继续 我想用requestspython 中的库以便让我自己登录 通常这很容易使用requests库 但是我无法让它工作 我不确定这是否是由于 Google 做出的一些限制 也许我需要使用他们的 API 或
  • 如何加速spark df.write jdbc到postgres数据库?

    我是 Spark 新手 正在尝试使用 df write 加速将数据帧的内容 可以有 200k 到 2M 行 附加到 postgres 数据库 df write format jdbc options url psql url spark d
  • 使用字典映射数据帧索引

    为什么不df index map dict 工作就像df column name map dict 这是尝试使用index map的一个小例子 import pandas as pd df pd DataFrame one A 10 B 2
  • 您可以格式化 pandas 整数以进行显示,例如浮点数的“pd.options.display.float_format”?

    我见过this https stackoverflow com questions 18404946 py pandas formatdataframe and this https stackoverflow com questions
  • 立体太阳图 matplotlib 极坐标图 python

    我正在尝试创建一个与以下类似的简单的立体太阳路径图 http wiki naturalfrequent com wiki Sun Path Diagram http wiki naturalfrequency com wiki Sun Pa
  • Python 2:SMTPServerDisconnected:连接意外关闭

    我在用 Python 发送电子邮件时遇到一个小问题 me my email address you recipient s email address me email protected cdn cgi l email protectio
  • 从Python中的字典列表中查找特定值

    我的字典列表中有以下数据 data I versicolor 0 Sepal Length 7 9 I setosa 0 I virginica 1 I versicolor 0 I setosa 1 I virginica 0 Sepal
  • Python,将函数的输出重定向到文件中

    我正在尝试将函数的输出存储到Python中的文件中 我想做的是这样的 def test print This is a Test file open Log a file write test file close 但是当我这样做时 我收到
  • javascript 是否有等效的 __repr__ ?

    我最接近Python的东西repr这是 function User name password this name name this password password User prototype toString function r
  • Jupyter Notebook 找不到 Python 模块

    不知道发生了什么 但每当我使用 ipython 氢 原子 或 jupyter 笔记本时都找不到任何已安装的模块 我知道我安装了 pandas 但笔记本说找不到 我应该补充一点 当我正常运行脚本时 python script py 它确实导入
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • 如何使用原始 SQL 查询实现搜索功能

    我正在创建一个由 CS50 的网络系列指导的应用程序 这要求我仅使用原始 SQL 查询而不是 ORM 我正在尝试创建一个搜索功能 用户可以在其中查找存储在数据库中的书籍列表 我希望他们能够查询 书籍 表中的 ISBN 标题 作者列 目前 它
  • 如何断言 Unittest 上的可迭代对象不为空?

    向服务提交查询后 我会收到一本字典或一个列表 我想确保它不为空 我使用Python 2 7 我很惊讶没有任何assertEmpty方法为unittest TestCase类实例 现有的替代方案看起来并不正确 self assertTrue
  • 在本地网络上运行 Bokeh 服务器

    我有一个简单的 Bokeh 应用程序 名为app py如下 contents of app py from bokeh client import push session from bokeh embed import server do
  • Python ImportError:无法导入名称 __init__.py

    我收到此错误 ImportError cannot import name life table from cdc life tables C Users tony OneDrive Documents Retirement retirem
  • 如何计算Python中字典中最常见的前10个值

    我对 python 和一般编程都很陌生 所以请友善 我正在尝试分析包含音乐信息的 csv 文件并返回最常听的前 n 个乐队 从下面的代码中 每听一首歌曲都是一个列表中的字典条目 格式如下 album Exile on Main Street
  • 使用随机放置的 NaN 创建示例 numpy 数组

    出于测试目的 我想创建一个M by Nnumpy 数组与c随机放置的 NaN import numpy as np M 10 N 5 c 15 A np random randn M N A mask np nan 我在创建时遇到问题mas

随机推荐