如何在 Vertex AI 中安排重复运行自定义训练作业

2024-03-16

我已将训练代码打包为 python 包,然后能够将其作为 Vertex AI 上的自定义训练作业运行。现在,我希望能够安排这项工作运行一次,比如每两周运行一次,并重新训练模型。 CustomJobSpec 中的调度设置仅允许 2 个字段:“timeout”和“restartJobOnWorkerRestart”,因此无法使用 CustomJobSpec 中的调度设置。我能想到的实现此目标的一种方法是使用“CustomPythonPackageTrainingJobRunOp”Google Cloud Pipeline 组件一步创建 Vertex AI 管道,然后安排管道按照我认为合适的方式运行。有更好的选择来实现这一目标吗?

Edit:

我能够使用 Cloud Scheduler 安排自定义训练作业,但我发现在 AIPlatformClient 中使用 create_schedule_from_job_spec 方法在 Vertex AI 管道中非常容易使用。我在 gcp 中使用 Cloud Scheduler 安排自定义作业所采取的步骤如下,link https://cloud.google.com/scheduler/docs/http-target-auth#setting_up_the_service_account到谷歌文档:

  1. 将目标类型设置为 HTTP
  2. 对于指定自定义作业的 url,我遵循this https://cloud.google.com/vertex-ai/docs/training/create-custom-job#curl链接获取url
  3. 对于身份验证,在 Auth 标题下,我选择了“添加 OAauth 令牌”

您还需要在项目中拥有一个“Cloud Scheduler 服务帐户”,并具有“授予它的 Cloud Scheduler 服务代理角色”。尽管文档表示,如果您在 2019 年 3 月 19 日之后启用了 Cloud Scheduler API,则应该自动设置此设置,但对我来说情况并非如此,必须手动添加具有该角色的服务帐户。


Answer recommended by Google Cloud /collectives/google-cloud Collective

根据您的要求,有多种可能的调度方式:

1.云作曲家

云作曲家 https://cloud.google.com/composer/docs/concepts/overview是一个托管的 Apache Airflow,可帮助您创建、安排、监控和管理工作流程。

您可以按照以下步骤使用 Composer 每两周安排一次工作:

  • 创建作曲家环境。
  • Write a DAG https://cloud.google.com/composer/docs/how-to/using/writing-dags#structure文件并将自定义训练 python 代码添加到 DAG 文件中。
  • 由于自定义训练作业是Python代码,因此Python运算符 https://airflow.apache.org/docs/apache-airflow/1.10.4/_api/airflow/operators/python_operator/index.html可以用来安排任务。
  • 在 DAG 文件中,您需要提供开始时间,即计划从哪个时间开始,并且您需要将计划间隔定义为两周,如下所示:
with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=2),
        default_args=default_dag_args) as dag:

或者,您也可以使用Unix cron https://man7.org/linux/man-pages/man5/crontab.5.html字符串格式(* * * * *)进行调度。

IE。在您每两周安排一次的情况下,cron 格式将类似于:* * 1,15 * *

您可以使用 PythonOperator 传递自定义作业所需的参数op_args 和 op_kwargs 参数 https://airflow.apache.org/docs/apache-airflow/1.10.4/_api/airflow/operators/python_operator/index.html.

DAG文件写入后,需要将其push到dags/Composer 环境桶内的文件夹。

您可以在 Airflow UI 中检查计划 DAG 的状态。

预定的 DAG 文件如下所示:

样本_dag.py:

from __future__ import print_function

import datetime

from google.cloud import aiplatform

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)


default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': YESTERDAY,
}

with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(weeks=2),
        default_args=default_dag_args) as dag:
    
    def create_custom_job_sample(
    project: str,
    display_name: str,
    container_image_uri: str,
    location: str,
    api_endpoint: str,
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.JobServiceClient(client_options=client_options)
    custom_job = {
        "display_name": display_name,
        "job_spec": {
            "worker_pool_specs": [
                {
                    "machine_spec": {
                        "machine_type": "n1-standard-4",
                        "accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
                        "accelerator_count": 1,
                    },
                    "replica_count": 1,
                    "container_spec": {
                        "image_uri": container_image_uri,
                        "command": [],
                        "args": [],
                    },
                }
            ]
        },
    }
    parent = f"projects/{project}/locations/{location}"
    response = client.create_custom_job(parent=parent, custom_job=custom_job)
    print("response:", response)
    
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=create_custom_job_sample,
        op_kwargs={"project" : "your_project","display_name" : "name","container_image_uri":"uri path","location": "us-central1","api_endpoint":"us-central1-aiplatform.googleapis.com"}
        )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='job scheduled')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

2. 云调度器:要使用以下方式安排作业云调度器 https://cloud.google.com/scheduler/docs/http-target-auth#creating_a_scheduler_job_with_authentication您将需要进行以下配置:

  • Target : HTTP
  • URL: 作业的端点 URL (示例:“us-central1-aiplatform.googleapis.com”)
  • 验证标头:用于 *.googleapis.com 上托管的 Google API 的 OAuth 令牌

3. 安排循环管道 https://cloud.google.com/vertex-ai/docs/pipelines/run-pipeline使用 Kubeflow Pipelines SDK 运行:

您可以使用 Python 和 Kubeflow Pipelines SDK 安排定期管道运行。

from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(project_id=PROJECT_ID,
                           region=REGION)

api_client.create_schedule_from_job_spec(
    job_spec_path=COMPILED_PIPELINE_PATH,
    schedule=* * 1,15 * *,
    time_zone=TIME_ZONE,
    parameter_values=PIPELINE_PARAMETERS
)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Vertex AI 中安排重复运行自定义训练作业 的相关文章

随机推荐