这是我第一次使用 Google 的 Vertex AI Pipelines。我检查了这个代码实验室 https://codelabs.developers.google.com/vertex-pipelines-intro?hl=en#0也这个帖子 https://towardsdatascience.com/how-to-set-up-custom-vertex-ai-pipelines-step-by-step-467487f81cad and 这个帖子 https://medium.com/google-cloud/google-vertex-ai-the-easiest-way-to-run-ml-pipelines-3a41c5ed153,在一些源自的链接之上官方文档 https://cloud.google.com/vertex-ai/docs/pipelines/introduction?hl=es-419。我决定将所有这些知识运用到工作中,在一些玩具示例中:我计划构建一个由 2 个组件组成的管道:“get-data”(读取存储在 Cloud Storage 中的一些 .csv 文件)和“report-data” (它基本上返回前一个组件中读取的 .csv 数据的形状)。此外,我谨慎地包括一些建议 https://stackoverflow.com/questions/71351821/reading-file-from-vertex-ai-and-google-cloud-storage本论坛提供。我目前拥有的代码如下:
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform
# Components section
@component(
packages_to_install=[
"google-cloud-storage",
"pandas",
],
base_image="python:3.9",
output_component_file="get_data.yaml"
)
def get_data(
bucket: str,
url: str,
dataset: Output[Dataset],
):
import pandas as pd
from google.cloud import storage
storage_client = storage.Client("my-project")
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(url)
blob.download_to_filename('localdf.csv')
# path = "gs://my-bucket/program_grouping_data.zip"
df = pd.read_csv('localdf.csv', compression='zip')
df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')
@component(
packages_to_install=["pandas"],
base_image="python:3.9",
output_component_file="report_data.yaml"
)
def report_data(
inputd: Input[Dataset],
):
import pandas as pd
df = pd.read_csv(inputd.path)
return df.shape
# Pipeline section
@pipeline(
# Default pipeline root. You can override it when submitting the pipeline.
pipeline_root=PIPELINE_ROOT,
# A name for the pipeline.
name="my-pipeline",
)
def my_pipeline(
url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
bucket: str = "my-bucket"
):
dataset_task = get_data(bucket, url)
dimensions = report_data(
dataset_task.output
)
# Compilation section
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path="pipeline_job.json"
)
# Running and submitting job
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
run1 = aiplatform.PipelineJob(
display_name="my-pipeline",
template_path="pipeline_job.json",
job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
enable_caching=True,
)
run1.submit()
我很高兴看到管道编译没有错误,并成功提交了作业。然而“我的幸福持续得很短”,当我去 Vertex AI Pipelines 时,我偶然发现了一些“错误”,如下所示:
DAG 失败,因为某些任务失败。失败的任务是:[获取数据]。;由于上述错误,作业(project_id = my-project,job_id = 4290278978419163136)失败。处理作业失败:{project_number = xxxxxxxx, job_id = 4290278978419163136}
我在网络上没有找到任何相关信息,也找不到任何日志或类似的内容,而且我感到有点不知所措,因为这个(看似)简单示例的解决方案仍然困扰着我。
很明显,我不知道我错了什么或哪里错了。有什么建议吗?