我正在创建一个胶水作业,需要处理来自 s3 路径的每日 4TB 数据量 -s3://<path>/<year>/<month>/<day>/<hour>/
。因此,我创建了一个循环,按每小时文件夹(每个 155Gb)将数据读入 Spark df,过滤某些类别,并作为按过滤类别分区的镶木地板文件写回 s3(s3://<path>/category=<category>/year=<year>/month=<month>/day=<day>/hour=<hour>/
)。我使用 60 个 G2.X 工作节点,每个节点有(8 个 vCPU、32 GB 内存、128 GB 磁盘)。 S3写入速度极慢,需要10多个小时才能完成。除了增加节点数量之外,是否有办法加快/优化 s3 写入?
def s3_load_job(input_list):
hour, year, month, day = input_list
logger.info(f"hour in s3 func {hour}")
# get data from s3
s3_path = f"s3://<path>/{year}/{month}/{day}/{hour}/"
logger.info(f"print s3 path {s3_path}")
#user defined library function that return spark df
df = get_df_from_s3(glueContext, s3_path)
df = df.withColumn('category', F.lower(F.col('category')))
df_rep = df.where(F.col('category').isin({ "A", "B", "C","D"}))
#write to s3
datasink4 = DynamicFrame.fromDF(df_rep, glueContext, "datasink4")
glueContext.write_dynamic_frame.from_options(frame = datasink4,
connection_type = "s3",
connection_options =
{"path":"s3://<path>/"
,"partitionKeys"["category","year","month","day","hour"]}
,format = "glueparquet" )
def main():
year = '2020'
month = '08'
day = '01'
hours = ["%.2d" % i for i in range(24)]
input_list = [[hour, year, month, day] for hour in hours]
logger.info(f"input_list {input_list}")
for i in input_list:
s3_load_job(i)
job.commit()
if __name__ == "__main__":
main()