根据您的要求,有多种可能的调度方式:
1.云作曲家
云作曲家 https://cloud.google.com/composer/docs/concepts/overview是一个托管的 Apache Airflow,可帮助您创建、安排、监控和管理工作流程。
您可以按照以下步骤使用 Composer 每两周安排一次工作:
- 创建作曲家环境。
- Write a DAG https://cloud.google.com/composer/docs/how-to/using/writing-dags#structure文件并将自定义训练 python 代码添加到 DAG 文件中。
- 由于自定义训练作业是Python代码,因此Python运算符 https://airflow.apache.org/docs/apache-airflow/1.10.4/_api/airflow/operators/python_operator/index.html可以用来安排任务。
- 在 DAG 文件中,您需要提供开始时间,即计划从哪个时间开始,并且您需要将计划间隔定义为两周,如下所示:
with models.DAG(
'composer_sample_bq_notify',
schedule_interval=datetime.timedelta(weeks=2),
default_args=default_dag_args) as dag:
或者,您也可以使用Unix cron https://man7.org/linux/man-pages/man5/crontab.5.html字符串格式(* * * * *)进行调度。
IE。在您每两周安排一次的情况下,cron 格式将类似于:* * 1,15 * *
您可以使用 PythonOperator 传递自定义作业所需的参数op_args 和 op_kwargs 参数 https://airflow.apache.org/docs/apache-airflow/1.10.4/_api/airflow/operators/python_operator/index.html.
DAG文件写入后,需要将其push到dags/Composer 环境桶内的文件夹。
您可以在 Airflow UI 中检查计划 DAG 的状态。
预定的 DAG 文件如下所示:
样本_dag.py:
from __future__ import print_function
import datetime
from google.cloud import aiplatform
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': YESTERDAY,
}
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(weeks=2),
default_args=default_dag_args) as dag:
def create_custom_job_sample(
project: str,
display_name: str,
container_image_uri: str,
location: str,
api_endpoint: str,
):
# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(client_options=client_options)
custom_job = {
"display_name": display_name,
"job_spec": {
"worker_pool_specs": [
{
"machine_spec": {
"machine_type": "n1-standard-4",
"accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
"accelerator_count": 1,
},
"replica_count": 1,
"container_spec": {
"image_uri": container_image_uri,
"command": [],
"args": [],
},
}
]
},
}
parent = f"projects/{project}/locations/{location}"
response = client.create_custom_job(parent=parent, custom_job=custom_job)
print("response:", response)
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=create_custom_job_sample,
op_kwargs={"project" : "your_project","display_name" : "name","container_image_uri":"uri path","location": "us-central1","api_endpoint":"us-central1-aiplatform.googleapis.com"}
)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='job scheduled')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash
2. 云调度器:要使用以下方式安排作业云调度器 https://cloud.google.com/scheduler/docs/http-target-auth#creating_a_scheduler_job_with_authentication您将需要进行以下配置:
-
Target : HTTP
-
URL: 作业的端点 URL (示例:“us-central1-aiplatform.googleapis.com”)
-
验证标头:用于 *.googleapis.com 上托管的 Google API 的 OAuth 令牌
3. 安排循环管道 https://cloud.google.com/vertex-ai/docs/pipelines/run-pipeline使用 Kubeflow Pipelines SDK 运行:
您可以使用 Python 和 Kubeflow Pipelines SDK 安排定期管道运行。
from kfp.v2.google.client import AIPlatformClient
api_client = AIPlatformClient(project_id=PROJECT_ID,
region=REGION)
api_client.create_schedule_from_job_spec(
job_spec_path=COMPILED_PIPELINE_PATH,
schedule=* * 1,15 * *,
time_zone=TIME_ZONE,
parameter_values=PIPELINE_PARAMETERS
)