问题的简短版本!
考虑以下代码片段(假设spark
已经设置为一些SparkSession
):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
请注意,温度字段是浮点数列表。我想将这些浮点列表转换为 MLlib 类型Vector
,我希望使用基本的方式来表达这种转换DataFrame
API 而不是通过 RDD(这是低效的,因为它将所有数据从 JVM 发送到 Python,处理是在 Python 中完成的,我们没有得到 Spark 的 Catalyst 优化器的好处,yada yada)。我该怎么做呢?具体来说:
- 有没有办法让直接演员工作?请参阅下文了解详细信息(以及解决方法的失败尝试)?或者,还有其他操作可以达到我想要的效果吗?
- 我在下面建议的两种替代解决方案(UDF 与分解/重新组装列表中的项目)中哪一个更有效?或者还有其他几乎但不完全正确的替代方案比它们中的任何一个都更好吗?
直接演员阵容不起作用
这就是我期望的“正确”解决方案。我想将列的类型从一种类型转换为另一种类型,因此我应该使用强制转换。作为一些上下文,让我提醒您将其转换为另一种类型的正常方法:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
现在例如df_with_strings.collect()[0]["temperatures"][1]
is '-7.0'
。但如果我转换为 ml Vector 那么事情就不会那么顺利:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
这给出了一个错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
哎呀!任何想法如何解决这一问题?
可能的替代方案
替代方案 1:使用VectorAssembler
有一个Transformer
这似乎对这项工作来说几乎是理想的:VectorAssembler http://spark.apache.org/docs/latest/ml-features.html#vectorassembler。它需要一列或多列并将它们连接成一个向量。不幸的是只需要Vector
and Float
列,不Array
列,因此以下内容不起作用:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
它给出了这个错误:
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
我能想到的最好的解决方法是将列表分解为多列,然后使用VectorAssembler
再次将它们全部收集起来:
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
这似乎是理想的,除了TEMPERATURE_COUNT
超过100,有时超过1000。(另一个问题是,如果你事先不知道数组的大小,代码会更复杂,尽管我的数据不是这样的。)Spark实际上是这样吗?生成具有那么多列的中间数据集,或者它只是认为这是单个项目短暂通过的中间步骤(或者当它看到这些列的唯一用途是组装成时,它是否完全优化了这个离开步骤向量)?
替代方案 2:使用 UDF
一个更简单的替代方法是使用 UDF 进行转换。这让我可以非常直接地用一行代码表达我想要做的事情,并且不需要创建一个包含大量列的数据集。但所有这些数据都必须在 Python 和 JVM 之间交换,并且每个单独的数字都必须由 Python 处理(Python 迭代单个数据项的速度非常慢)。看起来是这样的:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
可忽略的言论
这个漫无目的的问题的其余部分是我在试图找到答案时想到的一些额外的东西。大多数阅读本文的人可能会跳过它们。
不是解决方案:使用Vector
首先
在这个简单的示例中,可以首先使用向量类型创建数据,但当然我的数据并不是真正要并行化的 Python 列表,而是从数据源读取的。但郑重声明,情况如下:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
低效解决方案:使用map()
一种可能性是使用 RDDmap()
方法将列表转换为Vector
。这与 UDF 的想法类似,但更糟糕的是,因为序列化等成本是针对每一行中的所有字段而产生的,而不仅仅是正在操作的字段。作为记录,该解决方案如下所示:
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
尝试解决演员阵容失败
在绝望中我注意到Vector
在内部由具有四个字段的结构表示,但使用该类型结构的传统转换也不起作用。这是一个插图(我使用 udf 构建了结构,但 udf 不是重要的部分):
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
这给出了错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"