Source code for gigl.src.common.utils.metrics_service_provider
import os
from typing import Optional
from gigl.common import Uri
from gigl.common.logger import Logger
from gigl.common.metrics.base_metrics import NopMetricsPublisher
from gigl.common.metrics.metrics_interface import OpsMetricPublisher
from gigl.common.utils import os_utils
from gigl.common.utils.proto_utils import ProtoUtils
from snapchat.research.gbml.gbml_config_pb2 import GbmlConfig
_metrics_instance: Optional[OpsMetricPublisher] = None
[docs]
JOB_NAME_GROUPING_ENV_KEY = "GBML_JOB_NAME"
[docs]
def initialize_metrics(task_config_uri: Uri, service_name: str):
global _metrics_instance
os.environ[JOB_NAME_GROUPING_ENV_KEY] = service_name
proto_utils = ProtoUtils()
task_config: GbmlConfig = proto_utils.read_proto_from_yaml(
uri=task_config_uri, proto_cls=GbmlConfig
)
metrics_cls_path = task_config.metrics_config.metrics_cls_path
metrics_args = task_config.metrics_config.metrics_args
if not metrics_cls_path:
logger.info("Custom metrics class not provided. Using No-op metrics")
_metrics_instance = NopMetricsPublisher()
return
metrics_cls = os_utils.import_obj(metrics_cls_path)
try:
metrics_cls_instance: OpsMetricPublisher = metrics_cls(**metrics_args)
assert isinstance(metrics_cls_instance, OpsMetricPublisher)
_metrics_instance = metrics_cls_instance
logger.info(f"Instantiated Custom Metrics Class from: {metrics_cls_path}")
except Exception as e:
logger.error(f"Could not instantiate class {metrics_cls_path}: {e}")
raise e
[docs]
def get_metrics_service_instance() -> Optional[OpsMetricPublisher]:
if _metrics_instance is None:
logger.warning(
"initialize_metrics() was not called, using NopMetricsPulisher as default"
)
raise RuntimeError(
"Metrics instance is not initialized. Call initialize_metrics() before getting the instance."
)
return _metrics_instance
[docs]
def init_metrics_publisher_grouping_for_job(service_name: str) -> None:
os.environ[JOB_NAME_GROUPING_ENV_KEY] = service_name