gigl.common.services.vertex_ai#
Class for interacting with Vertex AI.
Below are some brief definitions of the terminology used by Vertex AI Pipelines:
Resource name: A globally unique identifier for the pipeline, follows https://google.aip.dev/122 and is of the form projects/<project-id>/locations/<location>/pipelineJobs/<job-name> Job name: aka job_id aka PipelineJob.name the name of a pipeline run, must be unique for a given project and location Display name: AFAICT purely cosmetic name for a pipeline, can be filtered on but does not show up in the UI Pipeline name: The name for the pipeline supplied by the pipeline definition (pipeline.yaml).
And a walkthrough to explain how the terminology is used: ```py @kfp.dsl.component def source() -> int:
return 42
@kfp.dsl.component def doubler(a: int) -> int:
return a * 2
@kfp.dsl.component def adder(a: int, b: int) -> int:
return a + b
@kfp.dsl.pipeline def get_pipeline() -> int: # NOTE: get_pipeline here is the Pipeline name
source_task = source() double_task = doubler(a=source_task.output) adder_task = adder(a=source_task.output, b=double_task.output) return adder_task.output
tempdir = tempfile.TemporaryDirectory() tf = os.path.join(tempdir.name, “pipeline.yaml”) print(f”Writing pipeline definition to {tf}”) kfp.compiler.Compiler().compile(get_pipeline, tf) job = aip.PipelineJob(
display_name=”this_is_our_pipeline_display_name”, template_path=tf, pipeline_root=”gs://my-bucket/pipeline-root”,
- )
- job.submit(service_account=”my-sa@my-project.gserviceaccount.com”) 
Which outputs the following: Creating PipelineJob PipelineJob created. Resource name: projects/my-project-id/locations/us-central1/pipelineJobs/get-pipeline-20250226170755 To use this PipelineJob in another session: pipeline_job = aiplatform.PipelineJob.get(‘projects/my-project-id/locations/us-central1/pipelineJobs/get-pipeline-20250226170755’) View Pipeline Job: https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/get-pipeline-20250226170755?project=my-project-id Associating projects/my-project-id/locations/us-central1/pipelineJobs/get-pipeline-20250226170755 to Experiment: example-experiment
And job has some properties set as well:
`py
print(f"{job.display_name=}") # job.display_name='this_is_our_pipeline_display_name'
print(f"{job.resource_name=}") # job.resource_name='projects/my-project-id/locations/us-central1/pipelineJobs/get-pipeline-20250226170755'
print(f"{job.name=}") # job.name='get-pipeline-20250226170755' # NOTE: by default, the "job name" is the pipeline name + datetime
`
Attributes#
Classes#
| A class representing a Vertex AI service. | |
Module Contents#
- class gigl.common.services.vertex_ai.VertexAIService(project, location, service_account, staging_bucket)[source]#
- A class representing a Vertex AI service. - Parameters:
- project (str) – The project ID. 
- location (str) – The location of the service. 
- service_account (str) – The service account to use for authentication. 
- staging_bucket (str) – The staging bucket for the service. 
 
 - get_pipeline_job_from_job_name(job_name)[source]#
- Fetches the pipeline job with the given job name. - Parameters:
- job_name (str) 
- Return type:
- google.cloud.aiplatform.PipelineJob 
 
 - static get_pipeline_run_url(project, location, job_name)[source]#
- Returns the URL for the pipeline run. - Parameters:
- project (str) 
- location (str) 
- job_name (str) 
 
- Return type:
- str 
 
 - launch_graph_store_job(compute_pool_job_config, storage_pool_job_config)[source]#
- Launch a Vertex AI Graph Store job. - This launches one Vertex AI CustomJob with two worker pools, see https://cloud.google.com/vertex-ai/docs/training/distributed-training for more details. - Note - We use the job_name, timeout, and enable_web_access from the compute pool job config. These fields, if set on the storage pool job config, will be ignored. - Parameters:
- compute_pool_job_config (VertexAiJobConfig) – The configuration for the compute pool job. 
- storage_pool_job_config (VertexAiJobConfig) – The configuration for the storage pool job. 
 
- Returns:
- The completed CustomJob. 
- Return type:
- google.cloud.aiplatform.CustomJob 
 
 - launch_job(job_config)[source]#
- Launch a Vertex AI CustomJob. See the docs for more info. https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.CustomJob - Parameters:
- job_config (VertexAiJobConfig) – The configuration for the job. 
- Returns:
- The completed CustomJob. 
- Return type:
- google.cloud.aiplatform.CustomJob 
 
 - run_pipeline(display_name, template_path, run_keyword_args, job_id=None, labels=None, experiment=None)[source]#
- Runs a pipeline using the Vertex AI Pipelines service. For more info, see the Vertex AI docs https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob#google_cloud_aiplatform_PipelineJob_submit - Parameters:
- display_name (str) – The display of the pipeline. 
- template_path (Uri) – The path to the compiled pipeline YAML. 
- run_keyword_args (dict[str, str]) – Runtime arguements passed to your pipeline. 
- job_id (Optional[str]) – The ID of the job. If not provided will be the pipeline_name + datetime. Note: The pipeline_name and display_name are not the same. Note: pipeline_name comes is defined in the template_path and ultimately comes from Python pipeline definition. If provided, must be unique. 
- labels (Optional[dict[str, str]]) – Labels to associate with the run. 
- experiment (Optional[str]) – The name of the experiment to associate the run with. 
 
- Returns:
- The PipelineJob created. 
- Return type:
- google.cloud.aiplatform.PipelineJob 
 
 - static wait_for_run_completion(resource_name, timeout=DEFAULT_PIPELINE_TIMEOUT_S, polling_period_s=60)[source]#
- Waits for a run to complete. - Parameters:
- resource_name (str) – The resource name of the run. 
- timeout (float) – The maximum time to wait for the run to complete, in seconds. Defaults to 7200. 
- polling_period_s (int) – The time to wait between polling the run status, in seconds. Defaults to 60. 
 
- Returns:
- None 
- Return type:
- None 
 
 
