在 pyspark 中实现递归算法以查找数据帧中的配对

2024-04-30

我有一个火花数据框(prof_student_df)列出了时间戳的学生/教授对。每个时间戳有 4 位教授和 4 位学生,每个教授-学生对都有一个“分数”(因此每个时间范围有 16 行)。对于每个时间范围,我需要找到教授/学生之间的一对一配对,以最大化总体分数。每个教授在一个时间范围内只能与一名学生匹配。

例如,以下是一个时间范围内的配对/分数。

+------------+--------------+------------+-------+----------+
|    time    | professor_id | student_id | score | is_match |
+------------+--------------+------------+-------+----------+
| 1596048041 | p1           | s1         |   0.7 | FALSE    |
| 1596048041 | p1           | s2         |   0.5 | TRUE     |
| 1596048041 | p1           | s3         |   0.3 | FALSE    |
| 1596048041 | p1           | s4         |   0.2 | FALSE    |
| 1596048041 | p2           | s1         |   0.9 | TRUE     |
| 1596048041 | p2           | s2         |   0.1 | FALSE    |
| 1596048041 | p2           | s3         |  0.15 | FALSE    |
| 1596048041 | p2           | s4         |   0.2 | FALSE    |
| 1596048041 | p3           | s1         |   0.2 | FALSE    |
| 1596048041 | p3           | s2         |   0.3 | FALSE    |
| 1596048041 | p3           | s3         |   0.4 | FALSE    |
| 1596048041 | p3           | s4         |   0.8 | TRUE     |
| 1596048041 | p4           | s1         |   0.2 | FALSE    |
| 1596048041 | p4           | s2         |   0.3 | FALSE    |
| 1596048041 | p4           | s3         |  0.35 | TRUE     |
| 1596048041 | p4           | s4         |   0.4 | FALSE    |
+------------+--------------+------------+-------+----------+

目标是获取 is_match 列。它可以是布尔值或 0/1 位或任何有效的值。

在上面的示例中,p1 与 s2 匹配,p2 与 s1 匹配,p3 与 s4 匹配,p4 与 s3 匹配,因为这是使总分最大化的组合(得分为 2.55)。 有一种奇怪的边缘情况——在给定的时间范围内,教授或学生的人数可能少于 4 人。如果有 4 名教授和 3 名学生,那么 1 名教授将没有配对,并且他的所有 is_match 都将是错误的。同样,如果有 3 名教授和 4 名学生,则 1 名学生将没有配对,并且他的所有 is_match 都将为 false。

有谁知道我如何实现这个目标?我想我会按时间分区或分组,然后将数据输入到一些 UDF 中,该 UDF 会吐出配对,然后也许我必须将其连接回原始行(尽管我不确定)。我正在尝试在 pyspark 中实现此逻辑,并且可以使用 Spark sql/sql 或 pyspark。

理想情况下,我希望它尽可能高效,因为会有数百万行。在问题中,我提到了递归算法,因为这是一个传统的递归类型问题,但如果有一个不使用递归的更快的解决方案,我对此持开放态度。

非常感谢,我是 Spark 新手,对如何做到这一点有点困惑。

EDIT:澄清问题,因为我在示例中意识到我没有指定这一点 一天最多有 14 名教授和 14 名学生可供选择。我只是一次查看一天,这就是为什么我在数据框中没有日期。在任一时间范围内,最多有 4 名教授和 4 名学生。该数据框仅显示一个时间范围。但在下一个时间范围内,这四位教授可能会p5, p1, p7, p9或类似的东西。学生们可能仍然在s1, s2, s3, s4.


Edit:正如评论中所讨论的,为了解决更新中提到的问题,我们可以使用dense_rank将student_id每次转换为广义序列id,执行步骤1到3(使用student列),然后使用join进行转换student在每一个time回到他们原来的样子学生卡。见下文Step-0 and Step-4。如果一个 timeUnit 中的教授少于 4 名,则 Numpy 端的维度将调整为 4(使用 np_vstack() 和 np_zeros()),请参阅更新后的函数find_assigned.

你可以试试pandas_udf http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply and scipy.optimize.线性总和分配 https://docs.scipy.org/doc/scipy-0.18.1/reference/generated/scipy.optimize.linear_sum_assignment.html(注意:后端方法是匈牙利算法,如@cronoik在主要评论中),见下文:

from pyspark.sql.functions import pandas_udf, PandasUDFType, first, expr, dense_rank
from pyspark.sql.types import StructType
from scipy.optimize import linear_sum_assignment
from pyspark.sql import Window
import numpy as np

df = spark.createDataFrame([
    ('1596048041', 'p1', 's1', 0.7), ('1596048041', 'p1', 's2', 0.5), ('1596048041', 'p1', 's3', 0.3),
    ('1596048041', 'p1', 's4', 0.2), ('1596048041', 'p2', 's1', 0.9), ('1596048041', 'p2', 's2', 0.1),
    ('1596048041', 'p2', 's3', 0.15), ('1596048041', 'p2', 's4', 0.2), ('1596048041', 'p3', 's1', 0.2),
    ('1596048041', 'p3', 's2', 0.3), ('1596048041', 'p3', 's3', 0.4), ('1596048041', 'p3', 's4', 0.8),
    ('1596048041', 'p4', 's1', 0.2), ('1596048041', 'p4', 's2', 0.3), ('1596048041', 'p4', 's3', 0.35),
    ('1596048041', 'p4', 's4', 0.4)
] , ['time', 'professor_id', 'student_id', 'score'])

N = 4
cols_student = [*range(1,N+1)]

Step-0:添加额外的列student,并使用所有独特的组合创建一个新的数据框 df3time + student_id + student.

w1 = Window.partitionBy('time').orderBy('student_id')

df = df.withColumn('student', dense_rank().over(w1))
+----------+------------+----------+-----+-------+                              
|      time|professor_id|student_id|score|student|
+----------+------------+----------+-----+-------+
|1596048041|          p1|        s1|  0.7|      1|
|1596048041|          p2|        s1|  0.9|      1|
|1596048041|          p3|        s1|  0.2|      1|
|1596048041|          p4|        s1|  0.2|      1|
|1596048041|          p1|        s2|  0.5|      2|
|1596048041|          p2|        s2|  0.1|      2|
|1596048041|          p3|        s2|  0.3|      2|
|1596048041|          p4|        s2|  0.3|      2|
|1596048041|          p1|        s3|  0.3|      3|
|1596048041|          p2|        s3| 0.15|      3|
|1596048041|          p3|        s3|  0.4|      3|
|1596048041|          p4|        s3| 0.35|      3|
|1596048041|          p1|        s4|  0.2|      4|
|1596048041|          p2|        s4|  0.2|      4|
|1596048041|          p3|        s4|  0.8|      4|
|1596048041|          p4|        s4|  0.4|      4|
+----------+------------+----------+-----+-------+

df3 = df.select('time','student_id','student').dropDuplicates()
+----------+----------+-------+                                                 
|      time|student_id|student|
+----------+----------+-------+
|1596048041|        s1|      1|
|1596048041|        s2|      2|
|1596048041|        s3|      3|
|1596048041|        s4|      4|
+----------+----------+-------+

Step-1:使用pivot来找到教授与学生的矩阵,注意我们将分数的负数设置为pivot的值,以便我们可以使用scipy.optimize.linear_sum_assignment来找到作业问题的最小成本:

df1 = df.groupby('time','professor_id').pivot('student', cols_student).agg(-first('score'))
+----------+------------+----+----+-----+----+
|      time|professor_id|   1|   2|    3|   4|
+----------+------------+----+----+-----+----+
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|
+----------+------------+----+----+-----+----+

Step-2:使用 pandas_udf 和 scipy.optimize.linear_sum_assignment 获取列索引,然后将相应的列名称分配给新列assigned:

# returnSchema contains one more StringType column `assigned` than schema from the input pdf:
schema = StructType.fromJson(df1.schema.jsonValue()).add('assigned', 'string')

# since the # of students are always N, we can use np.vstack to set the N*N matrix
# below `n` is the number of professors/rows in pdf
# sz is the size of input Matrix, sz=4 in this example
def __find_assigned(pdf, sz):
  cols = pdf.columns[2:]
  n = pdf.shape[0]
  n1 = pdf.iloc[:,2:].fillna(0).values
  _, idx = linear_sum_assignment(np.vstack((n1,np.zeros((sz-n,sz)))))
  return pdf.assign(assigned=[cols[i] for i in idx][:n])

find_assigned = pandas_udf(lambda x: __find_assigned(x,N), schema, PandasUDFType.GROUPED_MAP)

df2 = df1.groupby('time').apply(find_assigned)
+----------+------------+----+----+-----+----+--------+
|      time|professor_id|   1|   2|    3|   4|assigned|
+----------+------------+----+----+-----+----+--------+
|1596048041|          p4|-0.2|-0.3|-0.35|-0.4|       3|
|1596048041|          p2|-0.9|-0.1|-0.15|-0.2|       1|
|1596048041|          p1|-0.7|-0.5| -0.3|-0.2|       2|
|1596048041|          p3|-0.2|-0.3| -0.4|-0.8|       4|
+----------+------------+----+----+-----+----+--------+

Note:根据建议@OluwafemiSule,我们可以使用参数maximize而不是否定分数值。该参数可用SciPy 1.4.0+ https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.linear_sum_assignment.html:

  _, idx = linear_sum_assignment(np.vstack((n1,np.zeros((N-n,N)))), maximize=True)

Step-3:使用 SparkSQLstack https://spark.apache.org/docs/latest/api/sql/index.html#stack函数对上述 df2 进行归一化,对分数值取反并过滤分数为 NULL 的行。所需is_match列应该有assigned==student:

df_new = df2.selectExpr(
  'time',
  'professor_id',
  'assigned',
  'stack({},{}) as (student, score)'.format(len(cols_student), ','.join("int('{0}'), -`{0}`".format(c) for c in cols_student))
) \
.filter("score is not NULL") \
.withColumn('is_match', expr("assigned=student"))

df_new.show()
+----------+------------+--------+-------+-----+--------+
|      time|professor_id|assigned|student|score|is_match|
+----------+------------+--------+-------+-----+--------+
|1596048041|          p4|       3|      1|  0.2|   false|
|1596048041|          p4|       3|      2|  0.3|   false|
|1596048041|          p4|       3|      3| 0.35|    true|
|1596048041|          p4|       3|      4|  0.4|   false|
|1596048041|          p2|       1|      1|  0.9|    true|
|1596048041|          p2|       1|      2|  0.1|   false|
|1596048041|          p2|       1|      3| 0.15|   false|
|1596048041|          p2|       1|      4|  0.2|   false|
|1596048041|          p1|       2|      1|  0.7|   false|
|1596048041|          p1|       2|      2|  0.5|    true|
|1596048041|          p1|       2|      3|  0.3|   false|
|1596048041|          p1|       2|      4|  0.2|   false|
|1596048041|          p3|       4|      1|  0.2|   false|
|1596048041|          p3|       4|      2|  0.3|   false|
|1596048041|          p3|       4|      3|  0.4|   false|
|1596048041|          p3|       4|      4|  0.8|    true|
+----------+------------+--------+-------+-----+--------+

Step-4:使用 join 将student转换回student_id(如果可能,请使用广播连接):

df_new = df_new.join(df3, on=["time", "student"])
+----------+-------+------------+--------+-----+--------+----------+            
|      time|student|professor_id|assigned|score|is_match|student_id|
+----------+-------+------------+--------+-----+--------+----------+
|1596048041|      1|          p1|       2|  0.7|   false|        s1|
|1596048041|      2|          p1|       2|  0.5|    true|        s2|
|1596048041|      3|          p1|       2|  0.3|   false|        s3|
|1596048041|      4|          p1|       2|  0.2|   false|        s4|
|1596048041|      1|          p2|       1|  0.9|    true|        s1|
|1596048041|      2|          p2|       1|  0.1|   false|        s2|
|1596048041|      3|          p2|       1| 0.15|   false|        s3|
|1596048041|      4|          p2|       1|  0.2|   false|        s4|
|1596048041|      1|          p3|       4|  0.2|   false|        s1|
|1596048041|      2|          p3|       4|  0.3|   false|        s2|
|1596048041|      3|          p3|       4|  0.4|   false|        s3|
|1596048041|      4|          p3|       4|  0.8|    true|        s4|
|1596048041|      1|          p4|       3|  0.2|   false|        s1|
|1596048041|      2|          p4|       3|  0.3|   false|        s2|
|1596048041|      3|          p4|       3| 0.35|    true|        s3|
|1596048041|      4|          p4|       3|  0.4|   false|        s4|
+----------+-------+------------+--------+-----+--------+----------+

df_new = df_new.drop("student", "assigned")
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 pyspark 中实现递归算法以查找数据帧中的配对 的相关文章

随机推荐

  • java有跳过列表实现吗

    I find ConcurrentSkipListSet http download oracle com javase 6 docs api java util concurrent ConcurrentSkipListSet html在
  • SELECT DISTINCT 和 SELECT UNIQUE 之间的区别[重复]

    这个问题在这里已经有答案了 可能的重复 Oracle 9i 中的唯一与不同多列 https stackoverflow com questions 174912 unique vs distinct multi column in orac
  • 确定 IE 中的 HTML 按钮是否被禁用

    我最近正在做一些 VBA 工作 我需要签入网页以单击按钮 如果启用 如果禁用则不要单击 但 我不知道如何让 VBA 检查禁用的按钮 这是按钮代码
  • 订购“混合”向量(带有字母的数字)

    我怎样才能订购像这样的矢量 c 7 10a 10b 10c 8 9 11c 11b 11a 12 gt alph in alph 1 7 8 9 10a 10b 10c 11a 11b 11c 12 并用它对 data frame 进行排序
  • Material UI:“提供给类属性的键未实现”

    我正在使用 withStyles HOC 来覆盖一些 Material UI 组件样式 主题和断点 显然有一些我不明白的地方 因为我不断收到这样的错误 警告 Material UI 关键tab提供给类属性 未在 Items 中实现 您只能覆
  • sizeWithFont: constrainedToSize: with UITextView (有时) 没有创建正确的高度 - iPhone

    我使用以下代码来确定所需的高度UITextView正在添加到UITableViewCell 并确定每个的高度UITableViewCell 这在 90 的情况下有效 但某些传入数据 始终是相同的传入数据 会出现不足 如果我允许在UIText
  • 作曲家转储自动加载和 git

    为了部署实用性 我们决定将供应商目录提交到 git 存储库中 一切正常 但是每次我们运行 Composer dump autoload 时 它都会修改以下文件 这非常烦人 因为它似乎并不代表文件的特定状态 每次重新运行它时它看起来都像一个随
  • 使用 RSpec 测试模块内的类

    所以 我的 ruby 代码中有一个模块 如下所示 module MathStuff class Integer def least factor implementation code end end end 我有一些 RSpec 测试 我
  • 如何在 iOS 中打开 URL 而不指定 HTTP 或 HTTPS?

    在我的 iOS 应用程序中 我使用以下代码打开链接 UIApplication sharedApplication openURL NSURL URLWithString NSString stringWithFormat myurl 链接
  • 如何使用我的服务器作为代理通过 PHP 下载文件?

    我需要我的服务器充当第三方服务器 文件最初所在的位置 和最终用户之间的代理 也就是说 我的服务器从第3方服务器下载文件 然后用户从我的服务器下载它 这将导致产生文件大小两倍的带宽 这个过程如何使用PHP来实现呢 fp fopen url r
  • 如何测试具有多个输入调用的循环?

    我正在尝试测试一个依赖多个用户输入来返回某个值的函数 我已经在这里寻找了多个答案 但没有一个能够解决我的问题 我看到了参数化 模拟和猴子补丁的东西 但没有任何帮助 我认为很大程度上是因为我没有清楚地理解正在做的事情背后的概念 并且我无法适应
  • 如何在python中修改html树?

    假设有一些可变片段html代码 p span class code string 1 span class code string 2 span class code string 3 span span span p p span cla
  • ftplib: 在 LIST 期间/之后出现 socket.error // ssl._sslobj.shutdown() / 连接超时

    我尝试使用客户端证书连接到 FTPS 服务器 我尝试了两台不同的服务器 我无法控制它们 但应该非常相似 连接建立 PWD 命令成功 在一台服务器上 LIST 命令成功 但在第二台服务器上 它产生正确的结果 文件列表 但之后 显然在 SSL
  • 在 Android 中移动目录的最快方法?

    在 Android 中移动目录最快的方法是什么 在大多数情况下 但并非所有情况 源和目标位于同一 SD 卡文件系统上 目前 我的代码遍历整个目录结构 并将每个文件的内容复制到新位置的同名新文件中 然后它会验证文件大小是否匹配 然后删除源文件
  • 如何自定义 magento onepage 结账表单

    我正在使用 Magento 1 5 1 0 我想在单页结账表单中自定义地址块 我想删除 传真 输入字段并将 区域 下拉列表放在国家 地区 下拉列表 下方 这个形式是在哪里定义的 亲切的问候 伯蒂 导航到您的主题文件夹 默认文件位于以下位置
  • 大师系统要求

    我们将使用 Virtuoso 来存储 RDF 三重计数一开始将为 1 亿 我需要知道典型的 RAM CPU 磁盘等应该是什么 查询将使用 SPARQL 并且查询会有点复杂 请提供您的意见 Virtuoso 版本 6 x 三元组 四元组 的平
  • JAVA - 路径问题(在 Eclipse 中有效,在 cmd 中无效)

    为什么下面的启动在 Eclipse 中有效 private static MaxentTagger maxentTagger new MaxentTagger c DP lemma models english left3words dis
  • phonegap、iphone 和最大的坏处idleTimerDisabled

    阅读了很多关于如何防止 iPhone 在运行我的应用程序时进入睡眠状态的内容 我现在非常不高兴 因为没有任何效果 here https stackoverflow com questions 1058717 idletimerdisable
  • Internet Explorer 无法打开 Internet 站点操作中止,如何修复此错误?

    此代码在 IE 中给出错误 Internet Explorer 无法打开 Internet 站点操作中止 如何修复此错误 var tip p Most computers will open PDF documents tip automa
  • 在 pyspark 中实现递归算法以查找数据帧中的配对

    我有一个火花数据框 prof student df 列出了时间戳的学生 教授对 每个时间戳有 4 位教授和 4 位学生 每个教授 学生对都有一个 分数 因此每个时间范围有 16 行 对于每个时间范围 我需要找到教授 学生之间的一对一配对 以