如何将数组(即列表)列转换为向量

2024-04-04

问题的简短版本!

考虑以下代码片段(假设spark已经设置为一些SparkSession):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

请注意,温度字段是浮点数列表。我想将这些浮点列表转换为 MLlib 类型Vector,我希望使用基本的方式来表达这种转换DataFrameAPI 而不是通过 RDD(这是低效的,因为它将所有数据从 JVM 发送到 Python,处理是在 Python 中完成的,我们没有得到 Spark 的 Catalyst 优化器的好处,yada yada)。我该怎么做呢?具体来说:

  1. 有没有办法让直接演员工作?请参阅下文了解详细信息(以及解决方法的失败尝试)?或者,还有其他操作可以达到我想要的效果吗?
  2. 我在下面建议的两种替代解决方案(UDF 与分解/重新组装列表中的项目)中哪一个更有效?或者还有其他几乎但不完全正确的替代方案比它们中的任何一个都更好吗?

直接演员阵容不起作用

这就是我期望的“正确”解决方案。我想将列的类型从一种类型转换为另一种类型,因此我应该使用强制转换。作为一些上下文,让我提醒您将其转换为另一种类型的正常方法:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

现在例如df_with_strings.collect()[0]["temperatures"][1] is '-7.0'。但如果我转换为 ml Vector 那么事情就不会那么顺利:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

这给出了一个错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

哎呀!任何想法如何解决这一问题?

可能的替代方案

替代方案 1:使用VectorAssembler

有一个Transformer这似乎对这项工作来说几乎是理想的:VectorAssembler http://spark.apache.org/docs/latest/ml-features.html#vectorassembler。它需要一列或多列并将它们连接成一个向量。不幸的是只需要Vector and Float列,不Array列,因此以下内容不起作用:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

它给出了这个错误:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

我能想到的最好的解决方法是将列表分解为多列,然后使用VectorAssembler再次将它们全部收集起来:

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

这似乎是理想的,除了TEMPERATURE_COUNT超过100,有时超过1000。(另一个问题是,如果你事先不知道数组的大小,代码会更复杂,尽管我的数据不是这样的。)Spark实际上是这样吗?生成具有那么多列的中间数据集,或者它只是认为这是单个项目短暂通过的中间步骤(或者当它看到这些列的唯一用途是组装成时,它是否完全优化了这个离开步骤向量)?

替代方案 2:使用 UDF

一个更简单的替代方法是使用 UDF 进行转换。这让我可以非常直接地用一行代码表达我想要做的事情,并且不需要创建一个包含大量列的数据集。但所有这些数据都必须在 Python 和 JVM 之间交换,并且每个单独的数字都必须由 Python 处理(Python 迭代单个数据项的速度非常慢)。看起来是这样的:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

可忽略的言论

这个漫无目的的问题的其余部分是我在试图找到答案时想到的一些额外的东西。大多数阅读本文的人可能会跳过它们。

不是解决方案:使用Vector首先

在这个简单的示例中,可以首先使用向量类型创建数据,但当然我的数据并不是真正要并行化的 Python 列表,而是从数据源读取的。但郑重声明,情况如下:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

低效解决方案:使用map()

一种可能性是使用 RDDmap()方法将列表转换为Vector。这与 UDF 的想法类似,但更糟糕的是,因为序列化等成本是针对每一行中的所有字段而产生的,而不仅仅是正在操作的字段。作为记录,该解决方案如下所示:

df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()

尝试解决演员阵容失败

在绝望中我注意到Vector在内部由具有四个字段的结构表示,但使用该类型结构的传统转换也不起作用。这是一个插图(我使用 udf 构建了结构,但 udf 不是重要的部分):

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)

这给出了错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"

就我个人而言,我会选择 Python UDF,不会为其他任何事情烦恼:

  • Vectors不是本机 SQL 类型,因此会以某种方式产生性能开销。特别是这个过程需要两个步骤,其中数据首先从外部类型转换为行 https://stackoverflow.com/a/32454596/6910411, 进而使用泛型从行到内部表示RowEncoder http://apache-spark-developers-list.1001551.n3.nabble.com/What-is-mainly-different-from-a-UDT-and-a-spark-internal-type-that-ExpressionEncoder-recognized-td20370.html.
  • 任何下游机器学习Pipeline比简单的转换要贵得多。此外,它需要一个与上述相反的过程

但如果你真的想要其他选择,你可以:

  • 带有 Python 包装器的 Scala UDF:

    Install sbt http://www.scala-sbt.org/download.html按照项目网站上的说明进行操作。

    创建具有以下结构的 Scala 包:

    .
    ├── build.sbt
    └── udfs.scala
    

    Edit build.sbt(调整以反映 Scala 和 Spark 版本):

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.4.4",
      "org.apache.spark" %% "spark-mllib" % "2.4.4"
    )
    

    Edit udfs.scala:

    package com.example.spark.udfs
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector
    
    object udfs {
      val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }
    

    Package:

    sbt package
    

    并包含(或等效项,具体取决于 Scala 版本):

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
    

    作为论据--driver-class-path启动 shell/提交应用程序时。

    在 PySpark 中定义一个包装器:

    from pyspark.sql.column import _to_java_column, _to_seq, Column
    from pyspark import SparkContext
    
    def as_vector(col):
        sc = SparkContext.getOrCreate()
        f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
        return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
    

    Test:

    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_vec.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- vector: vector (nullable = true)
    
  • 将数据转储为反映的 JSON 格式DenseVector架构并读回:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    json_vec = to_json(struct(struct(
        lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
        col("temperatures").alias("values")
    ).alias("v")))
    
    schema = StructType([StructField("v", VectorUDT())])
    
    with_parsed_vector = df.withColumn(
        "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    with_parsed_vector.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_parsed_vector.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)
    
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将数组(即列表)列转换为向量 的相关文章

随机推荐

  • 服务层相互依赖

    我正在设计一个使用服务层的 asp net mvc 应用程序 如果我们有一个服务依赖于另一个服务怎么办 例如 假设我们有以下模型 class UserService IUserService implementation requires
  • Angular ui-grid 表、客户端分页和滚动

    我正在尝试将一个小项目从 jquery 移植到 angularjs 我使用 DataTables 绘制从虚拟机接收到的诊断数据 这是一个示例 DataTables 可以轻松地对数据进行分页 这样做的好处是在导航时不捕获鼠标滚动 这是当页面包
  • 获取/读取 laravel 5.8 存储非公共文件夹文件以查看?

    尝试从我的视图访问 storage app folder1 a png public function viewStorageFiles fileFullPath Storage disk local gt path folder1 a p
  • Laravel 5.4 迁移错误

    我面临着一个无法解决的问题 我的应用程序使用 Ubuntu 16 04 PHP 7 1 MySQL 和 NGINX 托管在我的服务器上 我的应用程序运行完美 当我 ssh 到我的服务器和应用程序根目录时 我运行以下命令 php artisa
  • 为什么Python的shelf要求所有键都是字符串?

    有据可查 Python 的 shelve 模块要求所有键都是字符串 并且有各种解决方法 请参阅线程here https stackoverflow com questions 4013452 how do i take integer ke
  • Domino R9.0.1 FP4:禁用与 ImportConvertHeaders 相关的日志记录?

    最近 我们安装了FP4 现在控制台上有很多消息 当使用 XPages 从浏览器打开或保存邮寄文档时 就会发生这种情况 邮件中的富文本字段采用 MIME 格式 许多行出现引用 ImportConvertHeaders 例如 25 06 201
  • 在 Firefox 中以编程方式单击 标签不起作用

    我有一个问题click jquery 的函数 我创建一个 a 元素与document createElement a 并想致电click 关于该元素的功能 关于这个元素 我想创建一个 Excel 文件并将其保存在桌面上 My code bo
  • iOS 7:如何为 UIControlStateHighlighted 设置 UIBarButtonItem backButtonBackgroundImage?

    我正在尝试在正常和突出显示状态下设置后退按钮的背景图像 void configureBackButtonInNavigationItem UINavigationItem item UIBarButtonItem backBarButton
  • 使用 Vim 命令打开标记中的当前文件

    我一直在尝试为 Brett Terpstra 的 Marked 应用程序创建一个命令 不幸的是我无法让它工作 上标记奖励包 http support markedapp com kb how to tips and tricks marke
  • 使用 Python 将 JSON 插入 MySQL

    我有一个 Python 中的 JSON 对象 我正在使用 Python DB API 和 SimpleJson 我正在尝试将 json 插入 MySQL 表中 目前出现错误 我相信这是由于 JSON 对象中的单引号 造成的 如何使用 Pyt
  • 如何识别该图像中的矩形?

    我有一张带有水平线和垂直线的图像 事实上 这张图片是BBC网站转换成水平线和垂直线的 我的问题是我希望能够找到图像中的所有矩形 我想编写一个计算机程序来查找所有矩形 有谁知道如何做到这一点或提出有关如何开始的想法 作为一个人 这个任务对于我
  • 为什么 Bootstrap 的滚动间谍不工作?

    我有一个简单的 Bootstrap 模式设置 在模式中我有内容nav并设置了滚动间谍 但是 它不起作用 我看到它被激活 但导航从未更新 完整的源代码在这里太长了 你将无法看到我得到的效果 所以我设置了一个jsfiddle http jsfi
  • AAudio 或 OpenSL

    我开始用 C 实现我的游戏音频部分 我看到有 2 个可用的音频框架 AAudio https developer android com ndk guides audio aaudio aaudio html https developer
  • 如何在Unity3D中创建脚本图标?

    I created a scipt and editor for it Now I want to assosiate an icon with it like this 这样的事该怎么办呢 找不到任何文档 http unity3d com
  • 如何从 .NET Core 3.0 WPF 应用程序使用 SOAP Web 服务

    I have a SOAP web service I want to consume it in my WPF app I am using NET Core 3 0 Visual Studio 2019 So I used the Mi
  • 如何计算两个时间字符串之间的时间间隔

    我有两个时间 一个开始时间和一个停止时间 格式为 10 33 26 HH MM SS 我需要两个时间之间的差异 我一直在浏览 Python 文档并在线搜索 我想它可能与日期时间和 或时间模块有关 我无法让它正常工作 并且只在涉及约会时寻找如
  • 在 do...while 中将相关表达式匹配设置为 false

    我正在尝试编写一些非常基本的代码 但我也在正则表达式上挑战自己 我已经能够将代码搞乱到一定程度 但我真正遇到的问题是我试图在表达式为 false 时运行 do while 循环 此时我完全没有收到任何错误 但 do while 循环继续运行
  • 线程冻结主 UI

    Hello我目前正在编写一个服务器监控应用程序 Classes public class Server public string SERVERNAME public string ENVIRONMENT public string VER
  • java.lang.ClassNotFoundException: org.springframework.orm.hibernate4.LocalSessionFactoryBean

    我是 Spring 新手 这是我的第一个示例 JSF 2 PrimeFaces 3 Spring 和 Hibernate 集成 这是pom xml
  • 如何将数组(即列表)列转换为向量

    问题的简短版本 考虑以下代码片段 假设spark已经设置为一些SparkSession from pyspark sql import Row source data Row city Chicago temperatures 1 0 2