火花 >= 2.4
您可以使用array_remove
:
from pyspark.sql.functions import array_remove
df.withColumn("src_ex", array_remove("sources", 32)).show()
+---------------+---------------+
| sources| src_ex|
+---------------+---------------+
| [62]| [62]|
| [7, 32]| [7]|
| [62]| [62]|
| [18, 36, 62]| [18, 36, 62]|
|[7, 31, 36, 62]|[7, 31, 36, 62]|
| [7, 32, 62]| [7, 62]|
+---------------+---------------+
or filter
:
from pyspark.sql.functions import expr
df.withColumn("src_ex", expr("filter(sources, x -> not(x <=> 32))")).show()
+---------------+---------------+
| sources| src_ex|
+---------------+---------------+
| [62]| [62]|
| [7, 32]| [7]|
| [62]| [62]|
| [18, 36, 62]| [18, 36, 62]|
|[7, 31, 36, 62]|[7, 31, 36, 62]|
| [7, 32, 62]| [7, 62]|
+---------------+---------------+
火花
有很多事情:
-
DataFrame
不是一个列表列表。实际上,它甚至不是一个普通的 Python 对象,它没有len
它不是Iterable
.
- 你的专栏看起来很简单
array
type.
- 你无法参考
DataFrame
(或 UDF 内的任何其他分布式数据结构)。
- 直接传递给 UDF 调用的每个参数都必须是
str
(列名称)或Column
目的。传递字面意义的使用lit
功能。
唯一剩下的只是列表理解:
from pyspark.sql.functions import lit, udf
def drop_from_array_(arr, item):
return [x for x in arr if x != item]
drop_from_array = udf(drop_from_array_, ArrayType(IntegerType()))
用法示例:
df = sc.parallelize([
[62], [7, 32], [62], [18, 36, 62], [7, 31, 36, 62], [7, 32, 62]
]).map(lambda x: (x, )).toDF(["sources"])
df.withColumn("src_ex", drop_from_array("sources", lit(32)))
结果:
+---------------+---------------+
| sources| src_ex|
+---------------+---------------+
| [62]| [62]|
| [7, 32]| [7]|
| [62]| [62]|
| [18, 36, 62]| [18, 36, 62]|
|[7, 31, 36, 62]|[7, 31, 36, 62]|
| [7, 32, 62]| [7, 62]|
+---------------+---------------+