Source code for gigl.src.post_process.utils.component_runtime
from typing import Dict
# from gigl.common.services.kfp import KFPService
from gigl.common.types.wrappers.kfp_api import KfpTaskDetails
# TODO: This needs to update ot Vertex AI
# def get_task_details_from_kfp_pipeline(
# kfp_service: KFPService, experiment_name: str, kfp_run_name: str
# ) -> Dict[str, KfpTaskDetails]:
# pipeline_run_detail: Optional[
# ApiRunDetailWrapper
# ] = kfp_service.get_latest_run_with_name(
# kfp_run_name=kfp_run_name, experiment_name=experiment_name
# )
# assert pipeline_run_detail is not None
# return pipeline_run_detail.task_details_map
[docs]
def assert_component_runtimes_match_expected_parameters(
task_details_map: Dict[str, KfpTaskDetails],
component_name_runtime_hr: Dict[str, int],
) -> None:
for component_name, expected_runtime_hr in component_name_runtime_hr.items():
relevant_task = task_details_map.get(component_name)
if relevant_task is None:
raise ValueError(
f"Component {component_name} not found in pipeline runtime manifest: {task_details_map}"
)
t_start = relevant_task.started_at
t_finish = relevant_task.finished_at
runtime_sec = (t_finish - t_start).seconds
expected_runtime_sec = expected_runtime_hr * 3600
if runtime_sec > expected_runtime_sec:
raise ValueError(
f"Component {component_name} took longer than expected runtime of {expected_runtime_hr} hrs. Actual runtime was {t_finish- t_start}."
)