使用带有正则表达式的字典(Scala?)的 PySpark UDF 优化挑战

2024-01-04

我正在尝试优化下面的代码(PySpark UDF)。

它为我提供了所需的结果(基于我的数据集),但在非常大的数据集(大约 180M)上速度太慢。

结果(准确度)优于可用的 Python 模块(例如 geotext、hdx-python-country)。所以我不是在寻找另一个模块。

数据框:

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],   
  ["Kalverstraat Amsterdam","Mary"],   
  ["Kalverstraat Amsterdam, Netherlands","Lex"] 
]).toDF("address","name")

正则表达式.csv:

iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$

......<many, many more>

从以下位置创建 Pandas DataFrameregex.csv, 通过...分组iso2并加入keywords (\bArizona\b|\bTexas\b\bFlorida\b|\bUS$).

df = pd.read_csv(regex.csv, sep=';')
df_regex = df.groupby('iso2').agg({'keywords': '|'.join }).reset_index()

功能:

def get_iso2(x): 
 
    iso2={}
    
    for j, row in df_regex.iterrows():
 
        regex = re.compile(row['keywords'],re.I|re.M)         
        matches = re.finditer(regex, x)
        
        for m in matches:
            iso2[row['iso2']] = iso2.get(row['iso2'], 0) + 1
            
    return [key for key, value in iso2.items() for _ in range(value)]

PySpark UDF:

get_iso2_udf = F.udf(get_iso2, T.ArrayType(T.StringType()))

创建新列:

df_new = df.withColumn('iso2',get_iso2_udf('address')

预期样本输出:

[US,US,NL]
[CA]
[BE,BE,AU]

有些地方出现在多个国家(输入是包含城市、省、州、国家...的地址栏)

Sample:

3030 Whispering Pines Circle, 普洛斯珀德克萨斯州, 美国 ->[US,US,US]
阿姆斯特丹 Kalverstraat ->[US,NL]
Kalverstraat 阿姆斯特丹, 荷兰 ->[US, NL, NL]

也许在 PySpark 中使用 Scala UDF 是一种选择,但我不知道如何做。

非常感谢您的优化建议!


IIUC,您可以在不使用UDF的情况下尝试以下步骤:

from pyspark.sql.functions import expr, first, collect_list, broadcast, monotonically_increasing_id, flatten
import pandas as pd

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],
  ["Kalverstraat Amsterdam","Mary"],
  ["Kalverstraat Amsterdam, Netherlands","Lex"],
  ["xvcv", "ddd"]
]).toDF("address","name")

Step-1:将 df_regex 转换为 Spark 数据帧df1并添加一个 unique_id 到df.

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

# create regex patterns:
df_regex = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)}).reset_index()

df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords                                                                         |
+----+---------------------------------------------------------------------------------+
|CA  |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$                             |
|NL  |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$                                          |
|US  |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+

df = df.withColumn('id', monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address                                        |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle, Prosper Texas, US|John|0  |
|Kalverstraat Amsterdam                         |Mary|1  |
|Kalverstraat Amsterdam, Netherlands            |Lex |2  |
|xvcv                                           |ddd |3  |
+-----------------------------------------------+----+---+

Step-2:使用 rlike 将 df_regex 左连接到 df

df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("upper(d1.address) rlike d2.keywords"), "left")
df2.show()
+--------------------+----+---+----+--------------------+
|             address|name| id|iso2|            keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|
|                xvcv| ddd|  3|null|                null|
+--------------------+----+---+----+--------------------+

Step-3:统计匹配的数量d2.keywords in d1.address通过分裂d1.address by d2.keywords,然后将结果数组的大小减少 1:

df3 = df2.withColumn('num_matches', expr("size(split(upper(d1.address), d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|         -2|
+--------------------+----+---+----+--------------------+-----------+

Step-4: use 数组重复 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.array_repeat重复的值iso2 num_matches次(需要火花2.4+):

df4 = df3.withColumn("iso2", expr("array_repeat(iso2, num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
|             address|name| id|        iso2|            keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John|  0|[US, US, US]|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|        [NL]|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|        [US]|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|    [NL, NL]|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|        [US]|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|          []|                null|         -2|
+--------------------+----+---+------------+--------------------+-----------+

Step-5:groupby 并进行聚合:

df_new = df4 \
    .groupby('id') \
    .agg(
      first('address').alias('address'),
      first('name').alias('name'),
      flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id|             address|name|   countries|
+---+--------------------+----+------------+
|  0|3030 Whispering P...|John|[US, US, US]|
|  1|Kalverstraat Amst...|Mary|    [NL, US]|
|  3|                xvcv| ddd|          []|
|  2|Kalverstraat Amst...| Lex|[NL, NL, US]|
+---+--------------------+----+------------+

选择:步骤 3 也可以由 Pandas UDF 处理:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas import Series
import re

@pandas_udf("int", PandasUDFType.SCALAR)
def get_num_matches(addr, ptn):
    return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])

df3 = df2.withColumn("num_matches", get_num_matches(expr('upper(address)'), 'keywords'))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|          0|
+--------------------+----+---+----+--------------------+-----------+

Notes:

  1. 由于不区分大小写的模式匹配成本很高,因此我们转换了关键字的所有字符(除了锚点或转义字符,例如\b, \B, \A, \z) 改为大写。
  2. 只是提醒一下,使用的模式rlike and regexp_replace是基于 Java 的,而 pandas_udf 是基于 Python 的,在 regex.csv 中设置模式时可能会略有不同。

方法2:使用pandas_udf

由于使用 join 和 groupby 会触发数据混洗,因此上述方法可能会很慢。您的测试只剩下一种选择:

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

df_ptn = spark.sparkContext.broadcast(
    df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
)
df_ptn.value
#{'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',
# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',
# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'}

# REF: https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf, PandasUDFType, flatten

def __get_iso2(addr, ptn):   
   return Series([ reduce(iconcat, [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])

get_iso2 = pandas_udf(lambda x:__get_iso2(x, df_ptn), "array<string>", PandasUDFType.SCALAR)

df.withColumn('iso2', get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US, US, US]|
|Kalverstraat Amst...|Mary|  1|    [NL, US]|
|Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+

或者在 pandas_udf 中返回一个数组的数组(w/oreduce and iconcat)并做flatten与火花:

def __get_iso2_2(addr, ptn):
    return Series([ [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()] for s in addr ])

get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x, df_ptn), "array<array<string>>", PandasUDFType.SCALAR)

df.withColumn('iso2', flatten(get_iso2_2(expr("upper(address)")))).show()

Update:要查找独特的国家/地区,请执行以下操作:

def __get_iso2_3(addr, ptn):
  return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])

get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x, df_ptn), "array<string>", PandasUDFType.SCALAR)

df.withColumn('iso2', get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
|             address|name|    iso2|
+--------------------+----+--------+
|3030 Whispering P...|John|    [US]|
|Kalverstraat Amst...|Mary|[NL, US]|
|Kalverstraat Amst...| Lex|[NL, US]|
|                xvcv| ddd|      []|
+--------------------+----+--------+

方法 3:使用列表理解:

如同@CronosNull 的方法,如果 regex.csv 列表是可管理的,您可以使用列表理解来处理此问题:

from pyspark.sql.functions import size, split, upper, col, array, expr, flatten

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()

df1 = df.select("*", *[ (size(split(upper(col('address')), v))-1).alias(k) for k,v in df_ptn.items()])

df1.select(*df.columns, flatten(array(*[ expr("array_repeat('{0}',`{0}`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US, US, US]|
|Kalverstraat Amst...|Mary|  1|    [NL, US]|
|Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用带有正则表达式的字典(Scala?)的 PySpark UDF 优化挑战 的相关文章

随机推荐

  • deleteRule CSSKeyframesRule 方法在 IE11 中混淆行为

    我使用 css keyframes 创建了圆形的基本动画 我正在使用 javascript 通过单击圆圈内来触发动画开始 停止 动画本身可以分为 5 个 循环 阶段 暂停 扩展 暂停 收缩 暂停 参见下面的 keyframes css部分
  • Symfony 2 中的多个防火墙未找到 check_path

    我正在尝试在 Symfony 2 中设置后端和前端防火墙系统 我有两个登录表单 一个用于前端 另一个用于管理控制面板 不同的提供商等等 我的配置如下 security firewalls backend pattern admin anon
  • 日期时间解析错误:提供的日期时间表示无效时间

    我有一种情况 日期是 3 13 2016 2 41 00 AM 当我按时区转换日期时 出现错误 DateTime dt DateTime Parse 3 13 2016 2 41 00 AM DateTime Date Time TimeZ
  • Tomcat 8.5 上的 JSSE 客户端轮询器出现高 CPU 负载

    我在 Windows Server 2008R2 和 Java 1 8 0 92 上运行 Tomcat 8 5 3 该进程消耗大量 CPU 约 50 来自 4 个 CPU JTop 显示 到目前为止 消耗最多的两个线程是 https jss
  • 保留 Rxjs 生成的复选框的状态

    基于在下拉列表中选择不同的项目 我生成带有复选框的项目的 html 如何保留复选框的状态 即每当下拉列表中的值发生变化时选中 取消选中 请参阅此处的 plunkrhttps plnkr co edit PUG3g7dfTbQjPyIgGLz
  • 活动之间图像转换时屏幕闪烁

    I implemented an image transition between two activities using the new shared elements from lollipop It s working but I
  • 单击时提交表单

    我的登录系统有问题 每当我单击登录按钮或注册按钮时 它会将我重定向到一个白色页面 上面写着 也就是说 它干扰了我的登录操作 这是我认为导致问题的代码
  • 带有方形按钮的 Android 布局

    我想做一个与此类似的布局 www ImageBanana net 布局 png http www imagebanana com img 9kmlhy66 thumb layout png http www imagebanana com
  • 部署多个具有共享缓存和会话的 Grails 实例?

    我正在寻找一种解决方案 允许我部署多个具有共享缓存 EhCache Server 和会话的负载平衡 Grails 实例 这可能吗 我找不到任何关于此的文档 连接到公共 EhCache 服务器或使用分布式 EhCache 以及共享会话 也使用
  • 跳转注意:未指定主类型

    我正在 Android GoogleApp 中执行第一步 我正在尝试探索 Jumpnote 示例 http code google com p jumpnote http code google com p jumpnote 我能够将 An
  • 内的

    我制作了一个简单的 html 片段 其中包含以下内容 a href div Something here div a 它显然提醒我 div 不能位于 a 标签 我使用了 div 因为我希望整个框 在本例中为 div 成为一个按钮 所以子类
  • 具有相交轮廓线的 Matplotlib 轮廓图

    我正在尝试使用 python 中的 matplotlib 绘制以下数据的等高线图 数据的形式是这样的 x y height 77 23 22 34 56 77 53 22 87 63 77 37 22 54 72 77 29 22 44 8
  • 使用子查询与派生表进行内连接

    环境 SQL 2008 R2 我使用子查询创建了一个派生表并与主表连接 我只是想知道子查询是否只执行一次 还是针对结果集中的每一行执行 考虑以下示例 虚构的表名称仅供参考 SELECT E EID DT Salary FROM Employ
  • 在 C# / .NET 中执行批量更新的最快方法

    我试图找出通过 SQL Server 中的迷你控制台应用程序执行批量更新的最佳方法是什么 我已经编写了自己的批量更新方式 如下所示 SqlCommand command new SqlCommand command Connection n
  • 在移动 Web 应用程序中使用 JavaScript

    我需要在移动 Web 应用程序的表单中使用 JavaScript 我需要一些有关它的信息 涉及浏览器兼容性以及在移动 Web 应用程序中使用 JavaScript 的方式 语法 这是一场噩梦 这就像 20 世纪 90 年代使用网络浏览器工作
  • 使用 Json.NET 将 JSON 反序列化为对象

    我正在玩一点新的堆栈溢出 API https blog stackoverflow com 2010 03 stack overflow api private beta starts 不幸的是 我的 JSON 有点弱 所以我需要一些帮助
  • C#:如何在特定时间启动线程[重复]

    这个问题在这里已经有答案了 如何在一天中的特定时间 例如 16 00 启动后台线程 因此 当应用程序启动时 线程将等待那个时间 但如果应用程序在那之后启动 那么线程将立即运行 ThreadPool QueueUserWorkItem Met
  • 适用于 SDK 环境的 Android 名片扫描仪库 [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 有没有名片扫描仪的库 我是 Android 新手 帮帮我 提前致谢 工具信息 ABBYY 移动 OCR
  • 让整个CSS表变得很重要!

    有没有办法让整个 CSS 样式表优先于另一个样式表 我知道你可以做到这一点 重要的是 但我可以用一行来做到这一点 而不是修改工作表上的所有数千个属性吗 Thanks 确保您想要的样式表最后调用 或者您想要的特定样式最后调用 例如 使用这个
  • 使用带有正则表达式的字典(Scala?)的 PySpark UDF 优化挑战

    我正在尝试优化下面的代码 PySpark UDF 它为我提供了所需的结果 基于我的数据集 但在非常大的数据集 大约 180M 上速度太慢 结果 准确度 优于可用的 Python 模块 例如 geotext hdx python countr