Source code for gigl.common.metrics.decorators
import time
import traceback
from enum import Enum
from typing import Any, Callable, Optional, TypeVar, cast
from gigl.common.logger import Logger
from gigl.common.metrics.metrics_interface import OpsMetricPublisher
[docs]
class TimerRecordGranularity(Enum):
[docs]
F = TypeVar("F", bound=Callable[..., Any])
def __safely_flush_metrics(
get_metrics_service_instance_fn: Optional[
Callable[[], Optional[OpsMetricPublisher]]
]
) -> None:
if get_metrics_service_instance_fn is not None:
metrics_instance = get_metrics_service_instance_fn()
if metrics_instance is not None:
metrics_instance.flush_metrics()
[docs]
def flushes_metrics(
get_metrics_service_instance_fn: Optional[
Callable[[], Optional[OpsMetricPublisher]]
]
) -> Callable[[F], F]:
"""
Decorator for flushing metrics after function execution.
Always catches any raised exceptions by decorated function and flushes metrics
before reraising the exception.
:return: wrapped result
"""
def inner(func: F) -> F:
def wrap(*args: Any, **kwargs: Any) -> Any:
try:
result = func(*args, **kwargs)
except Exception as e:
logger.info(
f"Exception raised, will flush metrics for: {func.__name__} and re-raise exception"
)
logger.error(f"Exception: {e}")
logger.error(traceback.format_exc())
__safely_flush_metrics(
get_metrics_service_instance_fn=get_metrics_service_instance_fn
) # Flush metrics before re-raising exception
logger.error(f"Post flushing metrics")
raise e
__safely_flush_metrics(
get_metrics_service_instance_fn=get_metrics_service_instance_fn
)
return result
return cast(F, wrap)
return inner
[docs]
def profileit(
metric_name: str,
get_metrics_service_instance_fn: Optional[
Callable[[], Optional[OpsMetricPublisher]]
],
record_granularity: TimerRecordGranularity = TimerRecordGranularity.SECONDS,
) -> Callable[[F], F]:
"""
performance profiling decorator
:param name: name of block being profiled
:return: wrapped result
"""
def inner(func: F) -> F:
def wrap(*args: Any, **kwargs: Any) -> Any:
raised_exception: Optional[Exception] = None
started_at = time.time()
try:
result = func(*args, **kwargs)
except Exception as e:
raised_exception = e
spanned_time_s = time.time() - started_at
if record_granularity == TimerRecordGranularity.MILLISECONDS:
spanned_time_formatted = int(spanned_time_s * 1000)
elif record_granularity == TimerRecordGranularity.SECONDS:
spanned_time_formatted = int(spanned_time_s)
else:
raise TypeError(
f"Unsupported record_granularity provided: {record_granularity}"
)
metrics_instance = None
if get_metrics_service_instance_fn is not None:
metrics_instance = get_metrics_service_instance_fn()
if metrics_instance is not None:
metrics_instance.add_timer(metric_name, spanned_time_formatted)
if raised_exception is not None:
raise raised_exception
return result
return cast(F, wrap)
return inner