如何将 DataFrame 作为输入传递给 Spark UDF?

2024-06-19

我有一个数据框,我想对每一行应用一个函数。该函数依赖于其他数据帧。

简化的例子。我有如下三个数据框:

df = sc.parallelize([
    ['a', 'b', 1],
    ['c', 'd', 3]
    ]).toDF(('feat1', 'feat2', 'value'))

df_other_1 = sc.parallelize([
        ['a', 0, 1, 0.0],
        ['a', 1, 3, 0.1],
        ['a', 3, 10, 1.0],
        ['c', 0, 10, 0.2],
        ['c', 10, 25, 0.5]
        ]).toDF(('feat1', 'lower', 'upper', 'score'))

df_other_2 = sc.parallelize([
        ['b', 0, 4, 0.1],
        ['b', 4, 20, 0.5],
        ['b', 20, 30, 1.0],
        ['d', 0, 5, 0.05],
        ['d', 5, 22, 0.9]
        ]).toDF(('feat1', 'lower', 'upper', 'score'))

对于每一行df,我想收集唯一的上限值feat1 and feat2 from df_other_1 and df_other_2,即第一行的唯一值为 (1, 3, 10, 4, 20, 30)。然后,我将它们排序为 (30, 20, 10, 4, 3, 1) 并添加到前面,比第一个数字高一个数字。这df会变成这样:

df = sc.parallelize([
        ['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1]],
        ['c', 'd', 3, [26, 25, 22, 10, 5]]
        ]).toDF(('feat1', 'feat2', 'value', 'lst'))

然后,对于每一行df并且对于每个各自的值lst,我想计算总和score来自两者df_other_1 and df_other_2其中每个值lst落在upper and lower。我的目标是找到每个中的最低值lst总分高于某个阈值(例如 1.4)。

下面介绍如何计算总分。所以,对于第一行df,第一个值lst是 31。df_other_1 for feat1,它高于最高的存储桶,因此得分为 1。同样df_other_2。所以,总分就是1+1=2。对于值为 10(同样是第一行),总分将为 1 + 0.5 = 1.5。

这就是如何df最后看起来像:

df = sc.parallelize([
            ['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1], [2.0, 2.0, 2.0, 1.5, 1.5, 1.1, 0.2], 4],
            ['c', 'd', 3, [26, 25, 22, 10, 5], [2.0, 1.5, 1.4, 1.4, 1.1], 25]
            ]).toDF(('feat1', 'feat2', 'value', 'lst', 'total_scores', 'target_value'))

我实际上正在寻找这些目标值4 and 25。中间步骤并不重要。

=================================================== =======================

到目前为止,这是我尝试过的:

def get_threshold_for_row(feat1, feat2, threshold):

    this_df_other_1 = df_other_1.filter(col('feat1') == feat1)
    this_df_other_2 = df_other_2.filter(col('feat1') == feat2)

    values_feat_1 = [i[0] for i in this_df_other_1.select('upper').collect()]
    values_feat_1.append(values_feat_1[-1] + 1)
    values_feat_2 = [i[0] for i in this_df_other_2.select('upper').collect()]
    values_feat_2.append(values_feat_2[-1] + 1)

    values = values_feat_1 + values_feat_2
    values = list(set(values)) #Keep unique values
    values.sort(reverse=True)  #Sort from largest to smallest

    df_1_score = df_2_score = 0
    prev_value = 10000 #Any large number
    prev_score = 10000

    for value in values:
        df_1_score = get_score_for_key(this_df_other_1, 'feat_1', feat_1, value)
        df_2_score = get_score_for_key(this_df_other_2, 'feat_1', feat_2, value)

        total_score = df_1_score + df_2_score

        if total_score < threshold and prev_score >= threshold:
            return prev_value

        prev_score = total_score
        prev_value = value


def is_dataframe_empty(df):
    return len(df.take(1)) == 0

def get_score_for_key(scores_df, grouping_key, this_id, value):

    if is_dataframe_empty(scores_df):
        return 0.0

    w = Window.partitionBy([grouping_key]).orderBy(col('upper'))

    scores_df_tmp = scores_df.withColumn("prev_value", lead(scores_df.upper).over(w))\
                        .withColumn("is_last", when(col('prev_value').isNull(), 1).otherwise(0))\
                        .drop('prev_value')

    scores_df_tmp = scores_df_tmp.withColumn("next_value", lag(scores_df_tmp.upper).over(w))\
                        .withColumn("is_first", when(col('next_value').isNull(), 1).otherwise(0))\
                        .drop('next_value').cache()

    grouping_key_score = scores_df_tmp.filter((col(grouping_key) == this_id) & 
                              (((value >= col('from_value')) & (value < col('to_value'))) | 
                                ((value >= col('to_value')) & (col('is_last') == 1)) |
                                ((value < col('from_value')) & (col('is_first') == 1)) |
                                (col('from_value').isNull()))) \
                    .withColumn('final_score', when(value <= col('to_value'), col('score')).otherwise(1.0)) \
                    .collect()[0]['final_score']

    return grouping_key_score

df.rdd.map(lambda r: (r['feat_1'], r['feat_2'])) \
    .map(lambda v: (v[0], v[1], get_threshold_for_row(v[0], v[1], 1.4)))
    .toDF()

但我得到:AttributeError: 'Py4JError' object has no attribute 'message'

抱歉发了这么长的帖子。有任何想法吗?


我有一个数据框,我想对每一行应用一个函数。该函数依赖于其他数据帧。

tl;dr这在 UDF 中是不可能的。

从最广泛的意义上来说,UDF 是一个接受零个或多个列值(作为列引用)的函数(实际上是 Catalyst 表达式)。

如果 UDF 是用户定义的聚合函数 (UDAF),则 UDF 只能处理在最广泛的情况下可能是整个 DataFrame 的记录。

如果您想在 UDF 中处理多个 DataFrame,您必须joinDataFrames 包含要用于 UDF 的列。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 DataFrame 作为输入传递给 Spark UDF? 的相关文章

随机推荐

  • 微服务与 SOA 的不同之处 [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我一直在寻找 SOA 和微服务架构风格之间的差异 并找到了一个很好的链接https www infoq com articles boot mi
  • 防止按下回车键时提交表单[重复]

    这个问题在这里已经有答案了 我们如何防止按下回车键时提交表单 实际上我有一个文本框 在该文本框中输入一个值并单击输入时 textbox2 将获得焦点 默认情况下 单击输入按钮后将提交表单 所以我无法得到输出 我在提交按钮的 onclick
  • 如何使用 WKWebView 正确实施身份验证质询?

    我正在构建一个网络浏览器 但在网络方面我真的是新手 我想测试下面的代码示例 但我没有现实生活中的示例可以使用 void webView WKWebView webView didReceiveAuthenticationChallenge
  • 该变量未声明或从未分配警告

    这是基类 public class BaseClass UserControl protected ListView list protected TreeView tree public BaseClass 儿童班 public part
  • router.navigate 不起作用(Angular6,延迟加载)

    我是 Angular 4 的新手 目前使用 v 6 我一直在尝试使用this router navigate 登陆 从登录组件重定向到登陆组件的功能 它无法正常工作 它将显示登录页面一秒钟 然后再次重定向回登录页面 但是 例如 如果我尝试浏
  • 在 for 循环中修改列表元素

    我有一个清单a我想更改其元素a i j 根据一个函数f 我能比天真的方式做得更好吗 for index in range i j a index f a 我所说的更好是指更接近于map f a 或者更快的东西 您可以分配给切片 a i j
  • 用于验证 ip 列表中的 ip 范围的正则表达式

    我有正则表达式用于验证 50 个 ips 逗号分隔的列表 25 0 5 2 0 4 0 9 01 0 9 0 9 3 25 0 5 2 0 4 0 9 01 0 9 0 9 1 50 列表示例 10 10 10 1 127 0 0 1 现在
  • extern 关键字对 C 函数的影响

    在C中 我没有注意到任何影响extern在函数声明之前使用关键字 起初 我认为在定义时extern int f 在单个文件中forces您可以在文件范围之外实现它 然而我发现两者 extern int f int f return 0 an
  • 使用底格里斯河从纬度/经度获取人口普查区

    我有相对较多的坐标 我想获取其人口普查区 除了 FIPS 代码 我知道我可以使用以下命令查找各个纬度 经度对call geolocator latlon 已完成here https stackoverflow com questions 5
  • 查找 Ivy 中隐藏的依赖项

    我使用 Apache Ivy IvyDE 来获取项目的依赖项 它们是
  • UITableViewCell 的 viewDidAppear

    我通常使用viewDidAppear方法在视图完成出现后在视图上执行一些 UI 操作 我在各种情况下使用了此方法 它非常有用 但是 我需要在视图上进行一些 UI 更改UITableViewCell当它完成出现后 SDK中是否有任何可用的方法
  • CSS3 中均匀间隔的导航链接占据 ul 的整个宽度

    我想创建一个水平导航链接列表 其中导航链接均匀分布并占据封闭容器的整个宽度 ul 导航链接可以有不同的宽度 第一个和最后一个链接应与链接的开头和结尾对齐 ul 分别 意味着链接不居中 如下所示 left side right side li
  • UI图像位置

    我使用以下代码在 UIView 中放置一些图像 UIImage image UIGraphicsBeginImageContext CGSizeMake 480 320 int k 0 int posY 0 for int i 0 i lt
  • 您可以为 None 指定类型参数或告诉编译器它是一个 Option[String] 吗?

    我想知道我是否可以在我的代码中写这样的东西 None String 我很惊讶没有人提到它的存在Option empty scala gt Option empty String res0 Option String None 请注意 在许多
  • 无法找到请求的工厂 com.ctc.wstx.stax.WstxInputFactory

    我正在构建 Oracle Agile PLM CustomAction Px 我在Px内部调用了一个web服务来处理一些数据 我部署后 它给出了 类未找到异常 javax xml ws Service 所以我复制了jaxws api 2 1
  • Python中如何知道文件的编码? [复制]

    这个问题在这里已经有答案了 有谁知道如何在Python中获取文件的编码 我知道您可以使用编解码器模块打开具有特定编码的文件 但您必须提前知道它 import codecs f codecs open file txt r utf 8 有没有
  • 使用 shell 脚本将行附加到 /etc/hosts 文件

    我有一个新的 Ubuntu 12 04 VPS 我正在尝试编写一个安装脚本来完成整个 LAMP 安装 我遇到问题的地方是在 etc hosts文件 我当前的主机文件如下所示 127 0 0 1 localhost Venus The fol
  • python 排列有问题

    我在排列方面遇到一些问题 当谈到Python时 我真的是一个大菜鸟 所以任何帮助将不胜感激 假设我在文本文件中有一个范围为 1 6 的列表 例如 它看起来像 1 2 3 4 5 6 我想打开所述 txt 文件并计算这 6 个数字中 N 的所
  • Angular 中有主控制器好吗?

    我不知道这是否是一个好的做法 我在路由配置中定义了一个控制器 但是因为我的HomeCtrl is in ng if他听不到的声明loginSuccess所以我做了MainCtrl它监听loginSuccess并做出适当的反应 这段代码工作得
  • 如何将 DataFrame 作为输入传递给 Spark UDF?

    我有一个数据框 我想对每一行应用一个函数 该函数依赖于其他数据帧 简化的例子 我有如下三个数据框 df sc parallelize a b 1 c d 3 toDF feat1 feat2 value df other 1 sc para