我想在 Palantir Foundry 中合并多个数据集,数据集的名称是动态的,因此我无法在transform_df()
静态地。有没有一种方法可以动态地将多个输入放入transform_df
并合并所有这些数据框?
我尝试循环数据集,例如:
li = ['dataset1_path', 'dataset2_path']
union_df = None
for p in li:
@transforms_df(
my_input = Input(p),
Output(p+"_output")
)
def my_compute_function(my_input):
return my_input
if union_df is None:
union_df = my_compute_function
else:
union_df = union_df.union(my_compute_function)
但是,这不会生成联合输出。
经过一些更改,这应该能够为您工作,这是带有 json 文件的动态数据集的示例,您的情况可能只会略有不同。这是一种通用的方法,您可以执行动态 json 输入数据集,该数据集应该适用于任何类型的动态输入文件类型或您可以指定的 Foundry 数据集的内部。此通用示例正在处理上传到平台中数据集节点的一组 json 文件。这应该是完全动态的。此后建立工会应该是一件简单的事情。
这里也有一些额外的记录。
希望这可以帮助
from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
import json
import logging
def transform_generator():
transforms = []
transf_dict = {## enter your dynamic mappings here ##}
for value in transf_dict:
@transform(
out=Output(' path to your output here '.format(val=value)),
inpt=Input(" path to input here ".format(val=value)),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext
filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path) as fi:
data = json.load(fi)
file_dates.append(data)
logging.info('info logs:')
logging.info(file_dates)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
df_2 = df_2.withColumn('upload_date', F.current_date())
df_2.drop_duplicates()
out.write_dataframe(df_2)
transforms.append(update_logs)
return transforms
TRANSFORMS = transform_generator()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)