Source code for gigl.orchestration.kubeflow.utils.log_metrics
import kfp
[docs]
def log_metrics_to_ui(
task_config_uri: str,
component_name: str,
base_image: str,
) -> kfp.components.BaseComponent:
"""Publishes metrics for components to the Vertex AI Pipeline UI.
Args:
task_config_uri (str): URI to the task config.
component (str): Name of the component to log metrics for.
base_image: The Docker image to be used as the base image for the component.
Returns:
kfp.components.BaseComponent: The component to log metrics.
"""
kfp_component = kfp.dsl.component(_log_eval_metrics_to_ui, base_image=base_image)
return kfp_component(task_config_uri=task_config_uri, component=component_name)
def _log_eval_metrics_to_ui(
task_config_uri: str,
component: str,
metrics: kfp.dsl.Output[kfp.dsl.Metrics],
) -> None:
"""Publishes metrics for components to the Vertex AI Pipeline UI.
Args:
task_config_uri (str): URI to the task config.
component (str): Name of the component to log metrics for.
metrics (Output[Metrics]): Metrics object to log metrics. Populated by the KFP SDK.
"""
# This is required to resolve below packages when containerized by KFP.
import json
import os
import sys
from google.api_core.exceptions import NotFound
sys.path.append(os.getcwd())
from gigl.common import UriFactory
from gigl.common.logger import Logger
from gigl.common.utils.proto_utils import ProtoUtils
from gigl.src.common.constants.components import GiGLComponents
from gigl.src.common.utils.file_loader import FileLoader
from snapchat.research.gbml import gbml_config_pb2
logger = Logger()
proto_utils = ProtoUtils()
gbml_config_pb = proto_utils.read_proto_from_yaml(
uri=UriFactory.create_uri(task_config_uri), proto_cls=gbml_config_pb2.GbmlConfig
)
logger.info(f"Read gbml config pb from {task_config_uri}")
if component == GiGLComponents.Trainer.value:
eval_metrics_uri = UriFactory.create_uri(
uri=gbml_config_pb.shared_config.trained_model_metadata.eval_metrics_uri
)
elif component == GiGLComponents.PostProcessor.value:
eval_metrics_uri = UriFactory.create_uri(
uri=gbml_config_pb.shared_config.postprocessed_metadata.post_processor_log_metrics_uri
)
else:
raise ValueError(f"Unknown component: {component}")
logger.info(f"Fetching eval metrics from: {eval_metrics_uri.uri}")
file_loader = FileLoader()
metrics_str: str
try:
tfh = file_loader.load_to_temp_file(file_uri_src=eval_metrics_uri)
with open(tfh.name, "r") as f:
metrics_str = f.read()
except (NotFound, FileNotFoundError) as e:
logger.warning(
f"Error loading metrics file: {e}, evaluation could have been skipped"
)
return
logger.info(f"Got metrics_str: {metrics_str}")
j = json.loads(metrics_str)
if "metrics" in j:
for metric in j["metrics"]:
metrics.log_metric(metric["name"], metric["numberValue"])