读取 Vertex AI Pipelines 中的数据

2024-01-18

这是我第一次使用 Google 的 Vertex AI Pipelines。我检查了这个代码实验室 https://codelabs.developers.google.com/vertex-pipelines-intro?hl=en#0这个帖子 https://towardsdatascience.com/how-to-set-up-custom-vertex-ai-pipelines-step-by-step-467487f81cad and 这个帖子 https://medium.com/google-cloud/google-vertex-ai-the-easiest-way-to-run-ml-pipelines-3a41c5ed153,在一些源自的链接之上官方文档 https://cloud.google.com/vertex-ai/docs/pipelines/introduction?hl=es-419。我决定将所有这些知识运用到工作中,在一些玩具示例中:我计划构建一个由 2 个组件组成的管道:“get-data”(读取存储在 Cloud Storage 中的一些 .csv 文件)和“report-data” (它基本上返回前一个组件中读取的 .csv 数据的形状)。此外,我谨慎地包括一些建议 https://stackoverflow.com/questions/71351821/reading-file-from-vertex-ai-and-google-cloud-storage本论坛提供。我目前拥有的代码如下:


from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform

# Components section   

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    import pandas as pd
    from google.cloud import storage
    
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # path = "gs://my-bucket/program_grouping_data.zip"
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
):
    import pandas as pd
    df = pd.read_csv(inputd.path)
    return df.shape


# Pipeline section

@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="my-pipeline",
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )

# Compilation section

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)

# Running and submitting job

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
    enable_caching=True,
)

run1.submit()

我很高兴看到管道编译没有错误,并成功提交了作业。然而“我的幸福持续得很短”,当我去 Vertex AI Pipelines 时,我偶然发现了一些“错误”,如下所示:

DAG 失败,因为某些任务失败。失败的任务是:[获取数据]。;由于上述错误,作业(project_id = my-project,job_id = 4290278978419163136)失败。处理作业失败:{project_number = xxxxxxxx, job_id = 4290278978419163136}

我在网络上没有找到任何相关信息,也找不到任何日志或类似的内容,而且我感到有点不知所措,因为这个(看似)简单示例的解决方案仍然困扰着我。

很明显,我不知道我错了什么或哪里错了。有什么建议吗?


根据评论中提供的一些建议,我认为我成功地使我的演示管道正常工作。我将首先包含更新的代码:

from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from datetime import datetime
from google.cloud import aiplatform
from typing import NamedTuple


# Importing 'COMPONENTS' of the 'PIPELINE'

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    """Reads a csv file, from some location in Cloud Storage"""
    import ast
    import pandas as pd
    from google.cloud import storage
    
    # 'Pulling' demo .csv data from a know location in GCS
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # Reading the pulled demo .csv data
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
) -> NamedTuple("output", [("rows", int), ("columns", int)]):
    """From a passed csv file existing in Cloud Storage, returns its dimensions"""
    import pandas as pd
    
    df = pd.read_csv(inputd.path+".csv")
    
    return df.shape


# Building the 'PIPELINE'

@pipeline(
    # i.e. in my case: PIPELINE_ROOT = 'gs://my-bucket/test_vertex/pipeline_root/'
    # Can be overriden when submitting the pipeline
    pipeline_root=PIPELINE_ROOT,
    name="readcsv-pipeline",  # Your own naming for the pipeline.
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )
    

# Compiling the 'PIPELINE'    

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)


# Running the 'PIPELINE'

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={
        "url": "test_vertex/pipeline_root/program_grouping_data.zip",
        "bucket": "my-bucket"
    },
    enable_caching=True,
)

# Submitting the 'PIPELINE'

run1.submit()

现在,我将添加一些补充评论,总之,这些评论设法解决了我的问题:

  • 首先,为您的用户启用“日志查看器”(roles/logging.viewer),将极大地帮助解决管道中的任何现有错误(注意:该角色对我有用,但是您可能想要寻找更好的匹配)为了你自己的目的而扮演的角色)。这些错误将显示为“日志”,可以通过单击相应的按钮来访问:
  • 注意:在上图中,当显示“日志”时,仔细检查每个日志(接近创建管道的时间)可能会有所帮助,因为通常每个日志都对应一个警告或错误行:
  • 其次,我的管道的输出是一个元组。在我原来的方法中,我只是返回普通元组,但建议返回命名元组 https://docs.python.org/3/library/typing.html#typing.NamedTuple反而。一般来说,如果需要输入/输出一个或多个“小值“(int 或 str,出于任何原因),选择一个 NamedTuple 来执行此操作。
  • 第三,当管道之间的连接是Input[Dataset] or Ouput[Dataset],需要添加文件扩展名(并且很容易忘记)。以输出为例get_data组件,并注意如何通过专门添加文件扩展名来记录数据,即dataset.path + ".csv".

当然,这是一个非常小的示例,项目可以轻松扩展到大型项目,但是作为某种“Hello Vertex AI Pipelines”,它会很好地工作。

谢谢。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

读取 Vertex AI Pipelines 中的数据 的相关文章

随机推荐

  • 如何从 JVM 分析和监控 gc.log 垃圾收集器日志文件

    我想知道直观分析和监视 java gc log 文件的最佳方法是什么 GCViewer https github com chewiebug GCViewer是迄今为止我发现的最有趣的工具 但我想知道是否有更好的或好的解决方案来监视多个远程
  • Rails 4:为什么字体在生产环境中无法加载?

    我无法在生产中的 Rails 4 应用程序中加载字体 但它在开发中正常工作 资产在部署时在服务器上预编译 我的字体在 app assets fonts 我的应用程序 css font face font family WalkwayBold
  • 从 pydev 中的另一个项目导入

    我已经四处寻找很长一段时间了 但我就是找不到答案 类似的问题涉及第三方库等的一些棘手案例 但我的情况似乎很简单 尽管如此 我还是不明白这是如何工作的 我正在使用 Eclipse 3 5 2 Pydev 2 2 0 在 Ubuntu 11 0
  • 当记录包含 json 或字符串的混合时,如何防止 Postgres 中的“json 类型的无效输入语法”

    我有一个文本列 其中包含 JSON 和计划文本 我想将其转换为 JSON 然后选择一个特定的属性 例如 user data user name jim user name sally some random data string 我试过了
  • Mockito isA() 和任何...()

    有什么区别 verify mock times 1 myMethod Matchers isA String class verify mock times 1 myMethod Matchers anyString 来自 Mockito
  • 在 std::string 中使用自定义分配器来重用已分配的字符缓冲区

    我需要在 std string 对象中使用已分配的 char 缓冲区 带有字符串内容 经过一些研究 我发现这几乎是不可能的 并且 std string 总是有自己的私有数据副本 我能想到的唯一剩下的方法是使用自定义分配器 该分配器将返回已分
  • 提交如何从一个文件的日志中消失?

    因此 我对文件进行了更改 将其推送到我们的主存储库 并在那里看到了它 大卫从那个存储库中取出并做了一些事情 但看不到我的改变 由于 David 是典型的 Microsoft 受害者 因此我要求他将其拥有的内容推回存储库 然后我会在那里查看
  • iOS 应用程序无法在 Testflight Ad Hoc Distribution 上启动

    我正在开发一个应用程序 当我通过 Xcode 运行它时 它可以在我的手机上完美运行 但是当我通过 TestFlight 分发测试版时 没有用户可以运行它 NOTE 他们可以毫无问题地在手机上安装该应用程序 该应用程序在启动前关闭 并且 Te
  • android.permission.BATTERY_STATS 使用情况

    我正在探索有关电池的 Android API 选项 什么可能性授予许可android permission BATTERY STATS 如果我可以在不声明此类权限的情况下读取电池电量的 android intent action BATTE
  • Flash 和 Google Drive SDK 无法相互通信

    我需要创建一个连接到 Google Drive SDK 的 Flash Web 应用程序来检索公共二进制文件 但我面临跨域安全问题 我可以使用 javascript 来做到这一点跨站点 xmlhttprequest与 CORS 在 AS3
  • 序列化向量

    我正在尝试为我正在开发的游戏实现加载和保存 我要保存的是 A char 二维数组 矩阵 An ArrayList
  • Flex网格:左右交替

    使用弹性盒 我想将一系列 div 垂直放置在包含 div 的下方 有些左 有些右 其中每个 div L 和 R 是容器 div 宽度的 70 L div 必须固定到容器的左侧 R div 必须固定到容器的右侧 L R L L R R R L
  • Geopandas PostGIS 连接

    我最近开始在 python 中使用 Geopandas 进行一些空间工作 并且对此非常满意 我目前正在尝试阅读 PostGIS 功能 但不太了解如何参数化数据库连接 而且似乎不清楚在文档中 GeoDataFrame from postgis
  • 为什么将 Visual Studio 解决方案添加到 TFS 时空文件夹会消失?

    我有这个 Visual Studio 解决方案 其中包含一个项目 该项目具有由多个空文件夹组成的模板文件夹树 当我使用 源代码管理 gt 将解决方案添加到源代码管理 菜单项将此解决方案添加到 Team Foundation Server T
  • 隐藏 QLPreviewController 的右键?

    我在应用程序中对 QLPreviewController 进行子类化并使用以下代码 QLPreviewControllerSubClass preview QLPreviewControllerSubClass alloc init sel
  • 您可以在创建现有的 mysql 触发器后对其进行修改吗?

    In mysql我可以创建一个触发器 然后显示有关它的信息 如下所示 mysql gt show triggers like fooTrigger 该命令提供的输出看起来非常像 select 语句 其中一行显示匹配的触发器 是否可以更新它向
  • 通过 Raven Studio 中的 RQL 将额外的列/字段添加到 RavenDB 集合

    我试图通过向每个实体添加额外的字段来更新整个集合 这doesn t做这件事 from things as t update put id t NewField 有人可以帮忙解决一下语法吗 Thanks Just do from things
  • 如何在VIM中映射CAPS LOCK键?

    我在Windows下使用VIM 并希望将 CAPSLOCK 映射到 Ctrl 有办法做到这一点吗 顺便说一句 我在网上看到了大量如何使用注册表 hack 交换 CAPS 和 Esc 的示例 但它们都没有使用 VIM 映射命令 而是使用外部工
  • Slim 框架 - 无法用点解释路由

    问题陈述 我目前正在开发内部 RESTful API 并使用我们的主域名作为环境标识符 然而 我注意到 Slim 根本不喜欢其中有点的路线 示例案例 我有一个使用 PHP 内置 Web 服务器运行的本地 Web 服务器 并且我调用了php
  • 读取 Vertex AI Pipelines 中的数据

    这是我第一次使用 Google 的 Vertex AI Pipelines 我检查了这个代码实验室 https codelabs developers google com vertex pipelines intro hl en 0也这个