"""Shared functionality for launching Vertex AI jobs for training and inference."""
from collections.abc import Mapping
from typing import Final, Optional
from google.cloud.aiplatform_v1.types import (
ReservationAffinity,
Scheduling,
accelerator_type,
env_var,
)
from gigl.common import Uri
from gigl.common.constants import (
DEFAULT_GIGL_RELEASE_SRC_IMAGE_CPU,
DEFAULT_GIGL_RELEASE_SRC_IMAGE_CUDA,
)
from gigl.common.logger import Logger
from gigl.common.services.vertex_ai import VertexAiJobConfig, VertexAIService
from gigl.env.distributed import COMPUTE_CLUSTER_LOCAL_WORLD_SIZE_ENV_KEY
from gigl.src.common.constants.components import GiGLComponents
from gigl.src.common.types.pb_wrappers.gigl_resource_config import (
GiglResourceConfigWrapper,
)
from gigl.src.common.utils.gigl_env import get_gigl_runtime_env_vars
from snapchat.research.gbml.gigl_resource_config_pb2 import (
VertexAiGraphStoreConfig,
VertexAiReservationAffinity,
VertexAiResourceConfig,
)
_LAUNCHABLE_COMPONENTS: frozenset[GiGLComponents] = frozenset(
[GiGLComponents.Trainer, GiGLComponents.Inferencer]
)
_RESERVATION_AFFINITY_KEY: Final[str] = "compute.googleapis.com/reservation-name"
_VALID_RESERVATION_AFFINITY_TYPES: Final[frozenset[str]] = frozenset(
{"NO_RESERVATION", "ANY_RESERVATION", "SPECIFIC_RESERVATION"}
)
[docs]
def launch_single_pool_job(
vertex_ai_resource_config: VertexAiResourceConfig,
job_name: str,
task_config_uri: Uri,
resource_config_uri: Uri,
process_command: str,
process_runtime_args: Mapping[str, str],
resource_config_wrapper: GiglResourceConfigWrapper,
cpu_docker_uri: Optional[str],
cuda_docker_uri: Optional[str],
component: GiGLComponents,
vertex_ai_region: str,
) -> None:
"""Launch a single pool job on Vertex AI.
Args:
vertex_ai_resource_config: The Vertex AI resource configuration
job_name: Raw GiGL applied task identifier
task_config_uri: URI to the task configuration
resource_config_uri: URI to the resource configuration
process_command: Command to run in the container
process_runtime_args: Runtime arguments for the process
resource_config_wrapper: Wrapper for the resource configuration
cpu_docker_uri: Docker image URI for CPU execution
cuda_docker_uri: Docker image URI for GPU execution
component: The GiGL component (Trainer or Inferencer)
vertex_ai_region: The Vertex AI region to launch the job in
"""
if component not in _LAUNCHABLE_COMPONENTS:
raise ValueError(
f"Invalid component: {component}. Expected one of: {_LAUNCHABLE_COMPONENTS}"
)
vertex_ai_job_name = _build_vertex_ai_job_name(
job_name=job_name,
component=component,
)
is_cpu_execution = _determine_if_cpu_execution(
vertex_ai_resource_config=vertex_ai_resource_config
)
cpu_docker_uri = cpu_docker_uri or DEFAULT_GIGL_RELEASE_SRC_IMAGE_CPU
cuda_docker_uri = cuda_docker_uri or DEFAULT_GIGL_RELEASE_SRC_IMAGE_CUDA
container_uri = cpu_docker_uri if is_cpu_execution else cuda_docker_uri
job_config = _build_job_config(
vertex_ai_job_name=vertex_ai_job_name,
applied_task_identifier=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
command_str=process_command,
args=process_runtime_args,
use_cuda=not is_cpu_execution,
container_uri=container_uri,
vertex_ai_resource_config=vertex_ai_resource_config,
env_vars=[
env_var.EnvVar(name="TF_CPP_MIN_LOG_LEVEL", value="3"),
*_build_common_gigl_env_vars(
applied_task_identifier=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
cpu_docker_uri=cpu_docker_uri,
cuda_docker_uri=cuda_docker_uri,
component=component,
),
],
labels=resource_config_wrapper.get_resource_labels(component=component),
)
logger.info(f"Launching {component.value} job with config: {job_config}")
vertex_ai_service = VertexAIService(
project=resource_config_wrapper.project,
location=vertex_ai_region,
service_account=resource_config_wrapper.service_account_email,
staging_bucket=resource_config_wrapper.temp_assets_regional_bucket_path.uri,
)
vertex_ai_service.launch_job(job_config=job_config)
[docs]
def launch_graph_store_enabled_job(
vertex_ai_graph_store_config: VertexAiGraphStoreConfig,
job_name: str,
task_config_uri: Uri,
resource_config_uri: Uri,
compute_commmand: str,
compute_runtime_args: Mapping[str, str],
storage_command: str,
storage_args: Mapping[str, str],
resource_config_wrapper: GiglResourceConfigWrapper,
cpu_docker_uri: Optional[str],
cuda_docker_uri: Optional[str],
component: GiGLComponents,
) -> None:
"""Launch a graph store enabled job on Vertex AI with separate storage and compute pools.
Args:
vertex_ai_graph_store_config: The Vertex AI graph store configuration
job_name: Raw GiGL applied task identifier
task_config_uri: URI to the task configuration
resource_config_uri: URI to the resource configuration
compute_commmand: Command to run in the compute container
compute_runtime_args: Runtime arguments for the compute process
storage_command: Command to run in the storage container
storage_args: Arguments to pass to the storage command
resource_config_wrapper: Wrapper for the resource configuration
cpu_docker_uri: Docker image URI for CPU execution
cuda_docker_uri: Docker image URI for GPU execution
component: The GiGL component (Trainer or Inferencer)
"""
if component not in _LAUNCHABLE_COMPONENTS:
raise ValueError(
f"Invalid component: {component}. Expected one of: {_LAUNCHABLE_COMPONENTS}"
)
vertex_ai_job_name = _build_vertex_ai_job_name(
job_name=job_name,
component=component,
)
storage_pool_config = vertex_ai_graph_store_config.graph_store_pool
compute_pool_config = vertex_ai_graph_store_config.compute_pool
# Compute workers may use GPUs, but storage workers always run the CPU graph-store entrypoint.
is_compute_cpu_execution = _determine_if_cpu_execution(
vertex_ai_resource_config=compute_pool_config
)
cpu_docker_uri = cpu_docker_uri or DEFAULT_GIGL_RELEASE_SRC_IMAGE_CPU
cuda_docker_uri = cuda_docker_uri or DEFAULT_GIGL_RELEASE_SRC_IMAGE_CUDA
compute_container_uri = (
cpu_docker_uri if is_compute_cpu_execution else cuda_docker_uri
)
logger.info(f"Running {component.value} with command: {compute_commmand}")
num_compute_processes = (
vertex_ai_graph_store_config.compute_cluster_local_world_size
)
if not num_compute_processes:
if is_compute_cpu_execution:
num_compute_processes = 1
else:
num_compute_processes = vertex_ai_graph_store_config.compute_pool.gpu_limit
# Add server/client environment variables
environment_variables: list[env_var.EnvVar] = [
env_var.EnvVar(name="TF_CPP_MIN_LOG_LEVEL", value="3"),
env_var.EnvVar(
name=COMPUTE_CLUSTER_LOCAL_WORLD_SIZE_ENV_KEY,
value=str(num_compute_processes),
),
*_build_common_gigl_env_vars(
applied_task_identifier=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
cpu_docker_uri=cpu_docker_uri,
cuda_docker_uri=cuda_docker_uri,
component=component,
),
]
labels = resource_config_wrapper.get_resource_labels(component=component)
# Create compute pool job config
compute_job_config = _build_job_config(
vertex_ai_job_name=vertex_ai_job_name,
applied_task_identifier=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
command_str=compute_commmand,
args=compute_runtime_args,
use_cuda=not is_compute_cpu_execution,
container_uri=compute_container_uri,
vertex_ai_resource_config=compute_pool_config,
env_vars=environment_variables,
labels=labels,
)
# Create storage pool job config
storage_job_config = _build_job_config(
vertex_ai_job_name=vertex_ai_job_name,
applied_task_identifier=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
command_str=storage_command,
args=storage_args,
use_cuda=False,
container_uri=cpu_docker_uri,
vertex_ai_resource_config=storage_pool_config,
env_vars=environment_variables,
labels=labels,
)
# Determine region from compute pool or use default region
region = (
compute_pool_config.gcp_region_override
if compute_pool_config.gcp_region_override
else resource_config_wrapper.region
)
vertex_ai_service = VertexAIService(
project=resource_config_wrapper.project,
location=region,
service_account=resource_config_wrapper.service_account_email,
staging_bucket=resource_config_wrapper.temp_assets_regional_bucket_path.uri,
)
vertex_ai_service.launch_graph_store_job(
compute_pool_job_config=compute_job_config,
storage_pool_job_config=storage_job_config,
)
def _build_job_config(
vertex_ai_job_name: str,
applied_task_identifier: str,
task_config_uri: Uri,
resource_config_uri: Uri,
command_str: str,
args: Mapping[str, str],
use_cuda: bool,
container_uri: str,
vertex_ai_resource_config: VertexAiResourceConfig,
env_vars: list[env_var.EnvVar],
labels: Optional[dict[str, str]] = None,
) -> VertexAiJobConfig:
"""Build a VertexAiJobConfig for training or inference jobs.
This function constructs a configuration object for running GiGL training or inference
jobs on Vertex AI. It assembles job arguments and configures resource specifications
based on the provided parameters.
Args:
vertex_ai_job_name (str): The Vertex AI CustomJob display name.
applied_task_identifier (str): Raw GiGL applied task identifier passed to the process.
task_config_uri (Uri): URI to the task configuration file.
resource_config_uri (Uri): URI to the resource configuration file.
command_str (str): The command to run in the container (will be split on spaces).
args (Mapping[str, str]): Additional command-line arguments to pass to the job.
use_cuda (bool): Whether to use CUDA. If True, adds --use_cuda flag.
container_uri (str): The URI of the container image to use.
vertex_ai_resource_config (VertexAiResourceConfig): Resource configuration including
machine type, GPU type, replica count, timeout, and scheduling strategy.
env_vars (list[env_var.EnvVar]): Environment variables to set in the container.
labels (Optional[dict[str, str]]): Labels to associate with the job. Defaults to None.
Returns:
VertexAiJobConfig: A configuration object ready to be used with VertexAIService.launch_job().
"""
job_args = (
[
f"--job_name={applied_task_identifier}",
f"--task_config_uri={task_config_uri}",
f"--resource_config_uri={resource_config_uri}",
]
+ (["--use_cuda"] if use_cuda else [])
+ ([f"--{k}={v}" for k, v in args.items()])
)
command = command_str.strip().split(" ")
job_config = VertexAiJobConfig(
job_name=vertex_ai_job_name,
container_uri=container_uri,
command=command,
args=job_args,
environment_variables=env_vars,
machine_type=vertex_ai_resource_config.machine_type,
accelerator_type=vertex_ai_resource_config.gpu_type.upper().replace("-", "_"),
accelerator_count=vertex_ai_resource_config.gpu_limit,
replica_count=vertex_ai_resource_config.num_replicas,
boot_disk_size_gb=vertex_ai_resource_config.boot_disk_size_gb or 100,
labels=labels,
timeout_s=vertex_ai_resource_config.timeout
if vertex_ai_resource_config.timeout
else None,
# This should be `aiplatform.gapic.Scheduling.Strategy[inferencer_resource_config.scheduling_strategy]`
# But the type checker complains otherwise...
# gigl/src/inference/v2/glt_inferencer.py:124: error: The type "type[Strategy]" is not generic and not indexable [misc]
# TODO(kmonte): Fix this
scheduling_strategy=getattr(
Scheduling.Strategy,
vertex_ai_resource_config.scheduling_strategy,
)
if vertex_ai_resource_config.scheduling_strategy
else None,
reservation_affinity=_build_reservation_affinity(
vertex_ai_resource_config.reservation_affinity
),
)
return job_config
def _build_vertex_ai_job_name(job_name: str, component: GiGLComponents) -> str:
"""Build the Vertex AI CustomJob display name from a raw GiGL job name.
Args:
job_name: Raw GiGL applied task identifier.
component: The GiGL component being launched.
Returns:
The component-prefixed Vertex AI CustomJob display name.
Raises:
ValueError: If ``component`` is not a launchable Vertex AI component.
"""
if component == GiGLComponents.Trainer:
return f"gigl_train_{job_name}"
if component == GiGLComponents.Inferencer:
return f"gigl_infer_{job_name}"
raise ValueError(
f"Invalid component: {component}. Expected one of: {_LAUNCHABLE_COMPONENTS}"
)
def _build_common_gigl_env_vars(
applied_task_identifier: str,
task_config_uri: Uri,
resource_config_uri: Uri,
cpu_docker_uri: str,
cuda_docker_uri: str,
component: GiGLComponents,
) -> list[env_var.EnvVar]:
"""Build common GiGL runtime context env vars for Vertex AI containers.
Args:
applied_task_identifier: The raw GiGL task identifier.
task_config_uri: URI to the task configuration.
resource_config_uri: URI to the resource configuration.
cpu_docker_uri: Resolved CPU Docker image URI.
cuda_docker_uri: Resolved CUDA Docker image URI.
component: The GiGL component being launched.
Returns:
Environment variables carrying shared GiGL launcher context.
"""
return [
env_var.EnvVar(name=name, value=value)
for name, value in get_gigl_runtime_env_vars(
applied_task_identifier=applied_task_identifier,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
component=component,
cpu_docker_uri=cpu_docker_uri,
cuda_docker_uri=cuda_docker_uri,
).items()
]
def _build_reservation_affinity(
affinity: VertexAiReservationAffinity,
) -> Optional[ReservationAffinity]:
"""Translate a proto VertexAiReservationAffinity to the SDK ReservationAffinity.
Returns ``None`` when the proto is fully unset (empty ``type`` with no
``reservation_resource_names``) so the resulting ``MachineSpec`` is left
at the Vertex AI default.
Args:
affinity: The proto reservation affinity (always non-None because
proto3 singular message fields default to an empty instance).
Returns:
A ``ReservationAffinity`` message, or ``None`` when ``affinity`` is
unset.
Raises:
ValueError: If ``affinity.type`` is empty but
``reservation_resource_names`` is set; if ``affinity.type`` is
not one of the accepted values; if ``SPECIFIC_RESERVATION`` is
requested without names; or if names are provided for
``NO_RESERVATION`` / ``ANY_RESERVATION``.
"""
names = list(affinity.reservation_resource_names)
if not affinity.type:
if names:
raise ValueError(
"reservation_resource_names is set but reservation_affinity.type "
"is empty. Set type to 'SPECIFIC_RESERVATION' or clear "
"reservation_resource_names."
)
return None
if affinity.type not in _VALID_RESERVATION_AFFINITY_TYPES:
raise ValueError(
f"Invalid reservation_affinity.type {affinity.type!r}. "
f"Expected one of: {sorted(_VALID_RESERVATION_AFFINITY_TYPES)}."
)
# Mirrors the scheduling_strategy pattern above: mypy does not treat
# ReservationAffinity.Type as subscriptable, so use getattr instead of
# ReservationAffinity.Type[affinity.type].
affinity_type = getattr(ReservationAffinity.Type, affinity.type)
if affinity_type == ReservationAffinity.Type.SPECIFIC_RESERVATION:
if not names:
raise ValueError(
"SPECIFIC_RESERVATION requires at least one entry in "
"reservation_resource_names "
"(format: projects/{p}/zones/{z}/reservations/{n})."
)
return ReservationAffinity(
reservation_affinity_type=affinity_type,
key=_RESERVATION_AFFINITY_KEY,
values=names,
)
if names:
raise ValueError(
f"reservation_resource_names must be empty for type {affinity.type}."
)
return ReservationAffinity(reservation_affinity_type=affinity_type)
# TODO(svij): This function may need some work cc @zfan3, @xgao4
# i.e. dataloading may happen on gpu instead of inference. Curretly, there is no
# great support for gpu data loading, thus we assume inference is done on gpu and
# data loading is done on cpu. This will need to be revisited.
def _determine_if_cpu_execution(
vertex_ai_resource_config: VertexAiResourceConfig,
) -> bool:
"""Determine whether CPU execution is required based on the resource configuration.
Args:
vertex_ai_resource_config: The Vertex AI resource configuration to check
Returns:
True if CPU execution is required, False if GPU execution is required
"""
if (
not vertex_ai_resource_config.gpu_type
or vertex_ai_resource_config.gpu_type
== accelerator_type.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED.name # type: ignore[attr-defined] # `name` is defined
):
return True
else:
return False