from __future__ import annotations
from pathlib import Path
from typing import Optional, Union
from google.cloud import aiplatform
from kfp.compiler import Compiler
import gigl.src.common.constants.local_fs as local_fs_constants
from gigl.common import LocalUri, Uri
from gigl.common.logger import Logger
from gigl.common.services.vertex_ai import VertexAIService
from gigl.common.types.resource_config import CommonPipelineComponentConfigs
from gigl.env.pipelines_config import get_resource_config
from gigl.orchestration.kubeflow.kfp_pipeline import generate_pipeline
from gigl.src.common.constants.components import GiGLComponents
from gigl.src.common.types import AppliedTaskIdentifier
from gigl.src.common.utils.file_loader import FileLoader
from gigl.src.common.utils.time import current_formatted_datetime
from gigl.src.validation_check.libs.name_checks import (
check_if_kfp_pipeline_job_name_valid,
)
[docs]
DEFAULT_PIPELINE_VERSION_NAME = (
f"gigl-pipeline-version-at-{current_formatted_datetime()}"
)
[docs]
DEFAULT_KFP_COMPILED_PIPELINE_DEST_PATH = LocalUri.join(
local_fs_constants.get_project_root_directory(),
"build",
f"gigl_pipeline_gnn.yaml",
)
[docs]
DEFAULT_START_AT_COMPONENT = "config_populator"
[docs]
class KfpOrchestrator:
"""
Orchestration of Kubeflow Pipelines for GiGL.
Methods:
compile: Compiles the Kubeflow pipeline.
run: Runs the Kubeflow pipeline.
upload: Uploads the pipeline to KFP.
wait_for_completion: Waits for the pipeline run to complete.
"""
@classmethod
[docs]
def compile(
cls,
cuda_container_image: str,
cpu_container_image: str,
dataflow_container_image: str,
dst_compiled_pipeline_path: Uri = DEFAULT_KFP_COMPILED_PIPELINE_DEST_PATH,
additional_job_args: Optional[dict[GiGLComponents, dict[str, str]]] = None,
tag: Optional[str] = None,
) -> Uri:
"""
Compiles the GiGL Kubeflow pipeline.
Args:
cuda_container_image (str): Container image for CUDA (see: containers/Dockerfile.cuda).
cpu_container_image (str): Container image for CPU.
dataflow_container_image (str): Container image for Dataflow.
dst_compiled_pipeline_path (Uri): Destination path for the compiled pipeline YAML file. Defaults to
:data:`~gigl.constants.DEFAULT_KFP_COMPILED_PIPELINE_DEST_PATH`.
additional_job_args (Optional[dict[GiGLComponents, dict[str, str]]]): Additional arguments to be passed into components, organized by component.
tag (Optional[str]): Optional tag to include in the pipeline description.
Returns:
Uri: The URI of the compiled pipeline.
"""
local_pipeline_bundle_path: LocalUri = (
dst_compiled_pipeline_path
if isinstance(dst_compiled_pipeline_path, LocalUri)
else DEFAULT_KFP_COMPILED_PIPELINE_DEST_PATH
)
Path(local_pipeline_bundle_path.uri).parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Compiling pipeline to {local_pipeline_bundle_path.uri}")
common_pipeline_component_configs = CommonPipelineComponentConfigs(
cuda_container_image=cuda_container_image,
cpu_container_image=cpu_container_image,
dataflow_container_image=dataflow_container_image,
additional_job_args=additional_job_args or {},
)
Compiler().compile(
generate_pipeline(
common_pipeline_component_configs=common_pipeline_component_configs,
tag=tag,
),
local_pipeline_bundle_path.uri,
)
logger.info(f"Compiled Kubeflow pipeline to {local_pipeline_bundle_path.uri}")
logger.info(f"Uploading compiled pipeline to {dst_compiled_pipeline_path.uri}")
if local_pipeline_bundle_path != dst_compiled_pipeline_path:
logger.info(f"Will upload pipeline to {dst_compiled_pipeline_path.uri}")
file_loader = FileLoader()
file_loader.load_file(
file_uri_src=local_pipeline_bundle_path,
file_uri_dst=dst_compiled_pipeline_path,
)
return dst_compiled_pipeline_path
[docs]
def run(
self,
applied_task_identifier: AppliedTaskIdentifier,
task_config_uri: Uri,
resource_config_uri: Uri,
start_at: str = DEFAULT_START_AT_COMPONENT,
stop_after: Optional[str] = None,
compiled_pipeline_path: Uri = DEFAULT_KFP_COMPILED_PIPELINE_DEST_PATH,
) -> aiplatform.PipelineJob:
"""
Runs the GiGL Kubeflow pipeline.
Args:
applied_task_identifier (AppliedTaskIdentifier): Identifier for the task.
task_config_uri (Uri): URI of the task configuration file.
resource_config_uri (Uri): URI of the resource configuration file.
start_at (str): Component to start the pipeline at. Defaults to 'config_populator'.
stop_after (Optional[str]): Component to stop the pipeline after. Defaults to None i.e. run entire pipeline.
compiled_pipeline_path (Uri): Path to the compiled pipeline YAML file.
Returns:
aiplatform.PipelineJob: The created pipeline job.
"""
check_if_kfp_pipeline_job_name_valid(str(applied_task_identifier))
file_loader = FileLoader()
assert file_loader.does_uri_exist(
compiled_pipeline_path
), f"Compiled pipeline path {compiled_pipeline_path} does not exist."
logger.info(f"Skipping pipeline compilation; will use {compiled_pipeline_path}")
run_keyword_args = {
"job_name": applied_task_identifier,
"start_at": start_at,
"template_or_frozen_config_uri": task_config_uri.uri,
"resource_config_uri": resource_config_uri.uri,
}
if stop_after is not None:
run_keyword_args["stop_after"] = stop_after
logger.info(f"Running pipeline with args: {run_keyword_args}")
resource_config = get_resource_config(resource_config_uri=resource_config_uri)
vertex_ai_service = VertexAIService(
project=resource_config.project,
location=resource_config.region,
service_account=resource_config.service_account_email,
staging_bucket=resource_config.temp_assets_regional_bucket_path.uri,
)
run = vertex_ai_service.run_pipeline(
display_name=str(applied_task_identifier),
template_path=compiled_pipeline_path,
run_keyword_args=run_keyword_args,
job_id=str(applied_task_identifier).replace("_", "-"),
)
return run
[docs]
def wait_for_completion(self, run: Union[aiplatform.PipelineJob, str]):
"""
Waits for the completion of a pipeline run.
Args:
run (Union[aiplatform.PipelineJob, str]): The pipeline job or its resource name.
Returns:
None
"""
resource_name = run if isinstance(run, str) else run.resource_name
VertexAIService.wait_for_run_completion(resource_name)
logger.info(f"Pipeline run {resource_name} completed successfully.")