如何在 Pyspark 中使用 dataframe 中的函数 withColumn 函数?

2023-12-07

我定义了一些字典和一个函数:

dict_TEMPERATURE = {(0, 70): 'Low', (70.01, 73.99): 'Normal-Low',(74, 76): 'Normal', (76.01, 80): 'Normal-High', (80.01, 300): 'High'}
...
hierarchy_dict = {'TEMP': dict_TEMPERATURE, 'PRESS': dict_PRESSURE, 'SH_SP': dict_SHAFT_SPEED, 'POI': dict_POI, 'TRIG': dict_TRIGGER}



def function_definition(valor, atributo):

    dict_atributo = hierarchy_dict[atributo]
    valor_generalizado = None

    if isinstance(valor, (int, long, float, complex)):

        for key, value in dict_atributo.items():

            if(isinstance(key, tuple)):
                lista = list(key)

                if (valor > key[0] and valor < key[1]):
                    valor_generalizado = value

    else: # if it is not numeric
        valor_generalizado = dict_atributo.get(valor)


    return valor_generalizado

该函数的基本功能是:检查作为参数传递给“function_definition”函数的值,并根据其字典的引用替换其值。

因此,如果我调用“function_definition(60, 'TEMP')”,它将返回“LOW”。

另一方面,我有一个具有以下结构的数据框(这是一个示例):

+----+-----+-----+---+----+
|TEMP|SH_SP|PRESS|POI|TRIG|
+----+-----+-----+---+----+
|   0|    1|    2|  0|   0|
|   0|    2|    3|  1|   1|
|   0|    3|    4|  2|   1|
|   0|    4|    5|  3|   1|
|   0|    5|    6|  4|   1|
|   0|    1|    2|  5|   1|
+----+-----+-----+---+----+

我想要做的是根据上面定义的函数替换数据帧的一列的值,所以我有下一个代码行:

dataframe_new = dataframe.withColumn(atribute_name, function_definition(dataframe[atribute_name], atribute_name))

但执行时我收到下一条错误消息:

AssertionError: col should be Column

我的代码有什么问题?怎么能这么做呢?


Your function_definition(价值,属性)返回单个字符串 (勇猛概括)对于单个valor.

断言错误:col 应该是 Column意味着您正在将参数传递给WithColumn(列名,列)那不是一个专栏。 所以你必须转换你的数据,以便Column,例如,如下所示。

例如数据框(与您的结构相同):

a = [(10.0,1.2),(73.0,4.0)] # like your dataframe, this is only an example

dataframe = spark.createDataFrame(a,["tp", "S"]) # tp and S are random names for these columns

dataframe.show()
+----+---+
|  tp|  S|
+----+---+
|10.0|1.2|
|73.0|4.0|
+----+---+

如你看到的here

udf创建表示用户定义函数 (UDF) 的列表达式。

解决方案:

from pyspark.sql.functions import udf

attr = 'TEMP'
udf_func = udf(lambda x: function_definition(x,attr),returnType=StringType())

dataframe_new = dataframe.withColumn("newCol",udf_func(dataframe.tp))
dataframe_new.show()

+----+---+----------+
|  tp|  S|    newCol|
+----+---+----------+
|10.0|1.2|       Low|
|73.0|4.0|Normal-Low|
+----+---+----------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Pyspark 中使用 dataframe 中的函数 withColumn 函数? 的相关文章

随机推荐