gigl.common.services.vertex_ai#

Class for interacting with Vertex AI.

Below are some brief definitions of the terminology used by Vertex AI Pipelines:

Resource name: A globally unique identifier for the pipeline, follows https://google.aip.dev/122 and is of the form projects/<project-id>/locations/<location>/pipelineJobs/<job-name> Job name: aka job_id aka PipelineJob.name the name of a pipeline run, must be unique for a given project and location Display name: AFAICT purely cosmetic name for a pipeline, can be filtered on but does not show up in the UI Pipeline name: The name for the pipeline supplied by the pipeline definition (pipeline.yaml).

And a walkthrough to explain how the terminology is used: ```py @kfp.dsl.component def source() -> int:

return 42

@kfp.dsl.component def doubler(a: int) -> int:

return a * 2

@kfp.dsl.component def adder(a: int, b: int) -> int:

return a + b

@kfp.dsl.pipeline def get_pipeline() -> int: # NOTE: get_pipeline here is the Pipeline name

source_task = source() double_task = doubler(a=source_task.output) adder_task = adder(a=source_task.output, b=double_task.output) return adder_task.output

tempdir = tempfile.TemporaryDirectory() tf = os.path.join(tempdir.name, “pipeline.yaml”) print(f”Writing pipeline definition to {tf}”) kfp.compiler.Compiler().compile(get_pipeline, tf) job = aip.PipelineJob(

display_name=”this_is_our_pipeline_display_name”, template_path=tf, pipeline_root=”gs://my-bucket/pipeline-root”,

)

job.submit(service_account=”my-sa@my-project.gserviceaccount.com”)

```

Which outputs the following: Creating PipelineJob PipelineJob created. Resource name: projects/my-project-id/locations/us-central1/pipelineJobs/get-pipeline-20250226170755 To use this PipelineJob in another session: pipeline_job = aiplatform.PipelineJob.get(‘projects/my-project-id/locations/us-central1/pipelineJobs/get-pipeline-20250226170755’) View Pipeline Job: https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/get-pipeline-20250226170755?project=my-project-id Associating projects/my-project-id/locations/us-central1/pipelineJobs/get-pipeline-20250226170755 to Experiment: example-experiment

And job has some properties set as well:

`py print(f"{job.display_name=}") # job.display_name='this_is_our_pipeline_display_name' print(f"{job.resource_name=}") # job.resource_name='projects/my-project-id/locations/us-central1/pipelineJobs/get-pipeline-20250226170755' print(f"{job.name=}") # job.name='get-pipeline-20250226170755' # NOTE: by default, the "job name" is the pipeline name + datetime `

Attributes#

Classes#

VertexAIService

A class representing a Vertex AI service.

VertexAiJobConfig

Configuration for a Vertex AI CustomJob worker pool.

Module Contents#

class gigl.common.services.vertex_ai.VertexAIService(project, location, service_account, staging_bucket)[source]#

A class representing a Vertex AI service.

Parameters:
  • project (str) – The project ID.

  • location (str) – The location of the service.

  • service_account (str) – The service account to use for authentication.

  • staging_bucket (str) – The staging bucket for the service.

get_pipeline_job_from_job_name(job_name)[source]#

Fetches the pipeline job with the given job name.

Parameters:

job_name (str)

Return type:

google.cloud.aiplatform.PipelineJob

static get_pipeline_run_url(project, location, job_name)[source]#

Returns the URL for the pipeline run.

Parameters:
  • project (str)

  • location (str)

  • job_name (str)

Return type:

str

launch_graph_store_job(compute_pool_job_config, storage_pool_job_config)[source]#

Launch a Vertex AI Graph Store job.

This launches one Vertex AI CustomJob with two worker pools, see https://cloud.google.com/vertex-ai/docs/training/distributed-training for more details.

Note

We use the job_name, timeout, and enable_web_access from the compute pool job config. These fields, if set on the storage pool job config, will be ignored.

Parameters:
  • compute_pool_job_config (VertexAiJobConfig) – The configuration for the compute pool job.

  • storage_pool_job_config (VertexAiJobConfig) – The configuration for the storage pool job.

Returns:

The completed CustomJob.

Return type:

google.cloud.aiplatform.CustomJob

launch_job(job_config)[source]#

Launch a Vertex AI CustomJob. See the docs for more info. https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.CustomJob

Parameters:

job_config (VertexAiJobConfig) – The configuration for the job.

Returns:

The completed CustomJob.

Return type:

google.cloud.aiplatform.CustomJob

run_pipeline(display_name, template_path, run_keyword_args, job_id=None, labels=None, experiment=None)[source]#

Runs a pipeline using the Vertex AI Pipelines service. For more info, see the Vertex AI docs https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob#google_cloud_aiplatform_PipelineJob_submit

Parameters:
  • display_name (str) – The display of the pipeline.

  • template_path (Uri) – The path to the compiled pipeline YAML.

  • run_keyword_args (dict[str, str]) – Runtime arguements passed to your pipeline.

  • job_id (Optional[str]) – The ID of the job. If not provided will be the pipeline_name + datetime. Note: The pipeline_name and display_name are not the same. Note: pipeline_name comes is defined in the template_path and ultimately comes from Python pipeline definition. If provided, must be unique.

  • labels (Optional[dict[str, str]]) – Labels to associate with the run.

  • experiment (Optional[str]) – The name of the experiment to associate the run with.

Returns:

The PipelineJob created.

Return type:

google.cloud.aiplatform.PipelineJob

static wait_for_run_completion(resource_name, timeout=DEFAULT_PIPELINE_TIMEOUT_S, polling_period_s=60)[source]#

Waits for a run to complete.

Parameters:
  • resource_name (str) – The resource name of the run.

  • timeout (float) – The maximum time to wait for the run to complete, in seconds. Defaults to 7200.

  • polling_period_s (int) – The time to wait between polling the run status, in seconds. Defaults to 60.

Returns:

None

Return type:

None

property project: str[source]#

The GCP project that is being used for this service.

Return type:

str

class gigl.common.services.vertex_ai.VertexAiJobConfig[source]#

Configuration for a Vertex AI CustomJob worker pool.

Each field maps to a property on the WorkerPoolSpec / MachineSpec / DiskSpec / ContainerSpec protos that Vertex AI uses to describe a CustomJob.

Example

>>> from google.cloud.aiplatform_v1.types import ReservationAffinity
>>> reservation = ReservationAffinity(
...     reservation_affinity_type=ReservationAffinity.Type.SPECIFIC_RESERVATION,
...     key="compute.googleapis.com/reservation-name",
...     values=["projects/p/zones/us-central1-a/reservations/r"],
... )

See https://docs.cloud.google.com/vertex-ai/docs/training/use-reservations for reservation prerequisites.

Parameters:
  • job_name – Display name and base ID used for the Vertex AI CustomJob.

  • container_uri – Docker image URI containing the job binary.

  • command – Entrypoint command executed inside the container.

  • args – Optional command-line arguments appended after command.

  • environment_variables – Optional env vars injected into each worker replica.

  • machine_type – Compute Engine machine type (e.g. n1-standard-4).

  • accelerator_type – Accelerator type string (e.g. NVIDIA_TESLA_A100). ACCELERATOR_TYPE_UNSPECIFIED — the default — means CPU-only.

  • accelerator_count – Number of accelerators per replica. Set to 0 for CPU-only jobs.

  • replica_count – Number of worker replicas in the pool.

  • boot_disk_type – Boot disk type for each replica (e.g. pd-ssd).

  • boot_disk_size_gb – Boot disk size in GB for each replica.

  • labels – Optional key/value labels attached to the job (e.g. for billing / cost attribution).

  • timeout_s – Optional job timeout in seconds. Falls back to DEFAULT_CUSTOM_JOB_TIMEOUT_S when None.

  • enable_web_access – Enables interactive shell access to workers via the Vertex AI web console.

  • scheduling_strategy – Optional aiplatform.gapic.Scheduling.Strategy (e.g. spot, flex-start). None uses the Vertex AI default.

  • reservation_affinity – Optional ReservationAffinity that maps to MachineSpec.reservation_affinity. None uses the Vertex AI default (no reservation).

accelerator_count: int = 0[source]#
accelerator_type: str = 'ACCELERATOR_TYPE_UNSPECIFIED'[source]#
args: list[str] | None = None[source]#
boot_disk_size_gb: int = 100[source]#
boot_disk_type: str = 'pd-ssd'[source]#
command: list[str][source]#
container_uri: str[source]#
enable_web_access: bool = True[source]#
environment_variables: list[google.cloud.aiplatform_v1.types.env_var.EnvVar] | None = None[source]#
job_name: str[source]#
labels: dict[str, str] | None = None[source]#
machine_type: str = 'n1-standard-4'[source]#
replica_count: int = 1[source]#
reservation_affinity: google.cloud.aiplatform_v1.types.ReservationAffinity | None = None[source]#
scheduling_strategy: google.cloud.aiplatform.gapic.Scheduling.Strategy | None = None[source]#
timeout_s: int | None = None[source]#
gigl.common.services.vertex_ai.DEFAULT_CUSTOM_JOB_TIMEOUT_S: Final[int] = 86400[source]#
gigl.common.services.vertex_ai.DEFAULT_PIPELINE_TIMEOUT_S: Final[int] = 129600[source]#
gigl.common.services.vertex_ai.LEADER_WORKER_INTERNAL_IP_FILE_PATH_ENV_KEY: Final[str] = 'LEADER_WORKER_INTERNAL_IP_FILE_PATH'[source]#
gigl.common.services.vertex_ai.logger[source]#