我正在尝试继承 DataFrame 类并添加其他自定义方法,如下所示,以便我可以流畅地链接并确保所有方法引用相同的数据帧。我收到异常,因为列不可迭代
from pyspark.sql.dataframe import DataFrame
class Myclass(DataFrame):
def __init__(self,df):
super().__init__(df._jdf, df.sql_ctx)
def add_column3(self):
// Add column1 to dataframe received
self._jdf.withColumn("col3",lit(3))
return self
def add_column4(self):
// Add column to dataframe received
self._jdf.withColumn("col4",lit(4))
return self
if __name__ == "__main__":
'''
Spark Context initialization code
col1 col2
a 1
b 2
'''
df = spark.createDataFrame([("a",1), ("b",2)], ["col1","col2"])
myobj = MyClass(df)
## Trying to accomplish below where i can chain MyClass methods & Dataframe methods
myobj.add_column3().add_column4().drop_columns(["col1"])
'''
Expected Output
col2, col3,col4
1,3,4
2,3,4
'''
实际上,您不需要继承 DataFrame 类即可向 DataFrame 对象添加一些自定义方法。
在 Python 中,您可以添加一个自定义属性来包装您的方法,如下所示:
# decorator to attach a function to an attribute
def add_attr(cls):
def decorator(func):
@wraps(func)
def _wrapper(*args, **kwargs):
f = func(*args, **kwargs)
return f
setattr(cls, func.__name__, _wrapper)
return func
return decorator
# custom functions
def custom(self):
@add_attr(custom)
def add_column3():
return self.withColumn("col3", lit(3))
@add_attr(custom)
def add_column4():
return self.withColumn("col4", lit(4))
return custom
# add new property to the Class pyspark.sql.DataFrame
DataFrame.custom = property(custom)
# use it
df.custom.add_column3().show()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)