Source code for gigl.src.common.utils.metrics_service_provider

import os
from typing import Optional

from gigl.common import Uri
from gigl.common.logger import Logger
from gigl.common.metrics.base_metrics import NopMetricsPublisher
from gigl.common.metrics.metrics_interface import OpsMetricPublisher
from gigl.common.utils import os_utils
from gigl.common.utils.proto_utils import ProtoUtils
from snapchat.research.gbml.gbml_config_pb2 import GbmlConfig

[docs] logger = Logger()
_metrics_instance: Optional[OpsMetricPublisher] = None
[docs] JOB_NAME_GROUPING_ENV_KEY = "GBML_JOB_NAME"
[docs] def initialize_metrics(task_config_uri: Uri, service_name: str): global _metrics_instance os.environ[JOB_NAME_GROUPING_ENV_KEY] = service_name proto_utils = ProtoUtils() task_config: GbmlConfig = proto_utils.read_proto_from_yaml( uri=task_config_uri, proto_cls=GbmlConfig ) metrics_cls_path = task_config.metrics_config.metrics_cls_path metrics_args = task_config.metrics_config.metrics_args if not metrics_cls_path: logger.info("Custom metrics class not provided. Using No-op metrics") _metrics_instance = NopMetricsPublisher() return metrics_cls = os_utils.import_obj(metrics_cls_path) try: metrics_cls_instance: OpsMetricPublisher = metrics_cls(**metrics_args) assert isinstance(metrics_cls_instance, OpsMetricPublisher) _metrics_instance = metrics_cls_instance logger.info(f"Instantiated Custom Metrics Class from: {metrics_cls_path}") except Exception as e: logger.error(f"Could not instantiate class {metrics_cls_path}: {e}") raise e
[docs] def get_metrics_service_instance() -> Optional[OpsMetricPublisher]: if _metrics_instance is None: logger.warning( "initialize_metrics() was not called, using NopMetricsPulisher as default" ) raise RuntimeError( "Metrics instance is not initialized. Call initialize_metrics() before getting the instance." ) return _metrics_instance
[docs] def init_metrics_publisher_grouping_for_job(service_name: str) -> None: os.environ[JOB_NAME_GROUPING_ENV_KEY] = service_name