如何将我自己的函数添加为 ML pyspark Pipeline 中的自定义阶段? [复制]

2024-05-03

Florian 的示例代码

-----------+-----------+-----------+
|ball_column|keep_the   |hall_column|
+-----------+-----------+-----------+
|          0|          7|         14|
|          1|          8|         15|
|          2|          9|         16|
|          3|         10|         17|
|          4|         11|         18|
|          5|         12|         19|
|          6|         13|         20|
+-----------+-----------+-----------+

代码的第一部分删除禁止列表中的列名称

#first part of the code

banned_list = ["ball","fall","hall"]
condition = lambda col: any(word in col for word in banned_list)
new_df = df.drop(*filter(condition, df.columns))

所以上面的代码应该删除ball_column and hall_column.

代码的第二部分存储列表中的特定列。对于这个例子,我们将剩下的唯一一个存储在桶中,keep_column.

bagging = 
    Bucketizer(
        splits=[-float("inf"), 10, 100, float("inf")],
        inputCol='keep_the',
        outputCol='keep_the')

现在使用管道对柱进行装袋如下

model = Pipeline(stages=bagging).fit(df)

bucketedData = model.transform(df)

如何添加代码的第一块(banned list, condition, new_df)将机器学习管道作为一个阶段?


我相信这会达到你想要的效果。您可以创建一个自定义Transformer,并将其添加到Pipeline。请注意,我稍微更改了您的函数,因为我们无法访问您提到的所有变量,但概念保持不变。

希望这可以帮助!

import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Bucketizer
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd

# CUSTOM TRANSFORMER ----------------------------------------------------------------
class ColumnDropper(Transformer):
    """
    A custom Transformer which drops all columns that have at least one of the
    words from the banned_list in the name.
    """

    def __init__(self, banned_list: Iterable[str]):
        super(ColumnDropper, self).__init__()
        self.banned_list = banned_list

    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.drop(*[x for x in df.columns if any(y in x for y in self.banned_list)])
        return df


# SAMPLE DATA -----------------------------------------------------------------------
df = pd.DataFrame({'ball_column': [0,1,2,3,4,5,6],
                   'keep_the': [6,5,4,3,2,1,0],
                   'hall_column': [2,2,2,2,2,2,2] })
df = spark.createDataFrame(df)


# EXAMPLE 1: USE THE TRANSFORMER WITHOUT PIPELINE -----------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
df_example = column_dropper.transform(df)


# EXAMPLE 2: USE THE TRANSFORMER WITH PIPELINE --------------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
bagging = Bucketizer(
        splits=[-float("inf"), 3, float("inf")],
        inputCol= 'keep_the',
        outputCol="keep_the_bucket")
model = Pipeline(stages=[column_dropper,bagging]).fit(df)
bucketedData = model.transform(df)
bucketedData.show()

Output:

+--------+---------------+
|keep_the|keep_the_bucket|
+--------+---------------+
|       6|            1.0|
|       5|            1.0|
|       4|            1.0|
|       3|            1.0|
|       2|            0.0|
|       1|            0.0|
|       0|            0.0|
+--------+---------------+

另请注意,如果您的自定义方法需要安装(例如自定义StringIndexer),您还应该创建一个自定义Estimator:

class CustomTransformer(Transformer):

    def _transform(self, df) -> DataFrame:


class CustomEstimator(Estimator):

    def _fit(self, df) -> CustomTransformer:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将我自己的函数添加为 ML pyspark Pipeline 中的自定义阶段? [复制] 的相关文章

随机推荐