Source code for gigl.orchestration.kubeflow.utils.log_metrics

import kfp


[docs] def log_metrics_to_ui( task_config_uri: str, component_name: str, base_image: str, ) -> kfp.components.BaseComponent: """Publishes metrics for components to the Vertex AI Pipeline UI. Args: task_config_uri (str): URI to the task config. component (str): Name of the component to log metrics for. base_image: The Docker image to be used as the base image for the component. Returns: kfp.components.BaseComponent: The component to log metrics. """ kfp_component = kfp.dsl.component(_log_eval_metrics_to_ui, base_image=base_image) return kfp_component(task_config_uri=task_config_uri, component=component_name)
def _log_eval_metrics_to_ui( task_config_uri: str, component: str, metrics: kfp.dsl.Output[kfp.dsl.Metrics], ) -> None: """Publishes metrics for components to the Vertex AI Pipeline UI. Args: task_config_uri (str): URI to the task config. component (str): Name of the component to log metrics for. metrics (Output[Metrics]): Metrics object to log metrics. Populated by the KFP SDK. """ # This is required to resolve below packages when containerized by KFP. import json import os import sys from google.api_core.exceptions import NotFound sys.path.append(os.getcwd()) from gigl.common import UriFactory from gigl.common.logger import Logger from gigl.common.utils.proto_utils import ProtoUtils from gigl.src.common.constants.components import GiGLComponents from gigl.src.common.utils.file_loader import FileLoader from snapchat.research.gbml import gbml_config_pb2 logger = Logger() proto_utils = ProtoUtils() gbml_config_pb = proto_utils.read_proto_from_yaml( uri=UriFactory.create_uri(task_config_uri), proto_cls=gbml_config_pb2.GbmlConfig ) logger.info(f"Read gbml config pb from {task_config_uri}") if component == GiGLComponents.Trainer.value: eval_metrics_uri = UriFactory.create_uri( uri=gbml_config_pb.shared_config.trained_model_metadata.eval_metrics_uri ) elif component == GiGLComponents.PostProcessor.value: eval_metrics_uri = UriFactory.create_uri( uri=gbml_config_pb.shared_config.postprocessed_metadata.post_processor_log_metrics_uri ) else: raise ValueError(f"Unknown component: {component}") logger.info(f"Fetching eval metrics from: {eval_metrics_uri.uri}") file_loader = FileLoader() metrics_str: str try: tfh = file_loader.load_to_temp_file(file_uri_src=eval_metrics_uri) with open(tfh.name, "r") as f: metrics_str = f.read() except (NotFound, FileNotFoundError) as e: logger.warning( f"Error loading metrics file: {e}, evaluation could have been skipped" ) return logger.info(f"Got metrics_str: {metrics_str}") j = json.loads(metrics_str) if "metrics" in j: for metric in j["metrics"]: metrics.log_metric(metric["name"], metric["numberValue"])