Source code for gigl.common.utils.retry

import time
from functools import wraps
from typing import Callable, Optional, Tuple, Type, TypeVar, Union
from xmlrpc.client import Boolean

from gigl.common.logger import Logger
from gigl.src.common.utils.timeout import TimedOutException, timeout

[docs] T = TypeVar("T") # return type
[docs] logger = Logger()
[docs] class RetriesFailedException(Exception): pass
class __RetriableTimeoutException(Exception): pass
[docs] def retry( exception_to_check: Union[Type, Tuple[Type, ...]] = Exception, tries: int = 5, delay_s: int = 3, backoff: int = 2, max_delay_s: Optional[int] = None, fn_execution_timeout_s: Optional[int] = None, deadline_s: Optional[int] = None, should_throw_original_exception: Boolean = False, ) -> Callable[[Callable[..., T]], Callable[..., T]]: """ Decorator that can be added around a function to retry incase it fails i.e. throws some exceptions Args: exception_to_check (Union[Type, Tuple[Type, ...]]): the exception to check. Could also be a tuple of exceptions to check. Defaults to Exception. i.e. catches everything tries (Optional[int]): [description]. number of times to try (not retry) before giving up. Defaults to 5. delay_s (Optional[int]): [description]. initial delay between retries in seconds. Defaults to 3. backoff (Optional[int]): [description]. backoff multiplier e.g. value of 2 will double the delay each retry. Defaults to 2. max_delay_s (Optional[int]): [description]. maximum delay between retries in seconds. Defaults to None. fn_execution_timeout_s (Optional[int]): Maximum time given before a single function execution should time out. Defaults to None. deadline_s (Optional[int]): [description]. Total time in seconds to spend retrying, fails if exceeds this time. Note this timeout can also stop the first execution, so ensure to provide a lot of extra room so retries can actually take place. If timeout occurs, src.common.utils.timeout.TimedOutException is raised. Defaults to None. should_throw_original_exception (Optional[bool]): Defaults to False. """ def deco_retry(f) -> Callable[..., T]: @wraps(f) def f_retry(*args, **kwargs) -> T: # type: ignore[type-var] mtries, mdelay = tries, delay_s def fn(*args, **kwargs) -> T: if fn_execution_timeout_s is not None: timeout_individual_fn_call_decorator = timeout( seconds=fn_execution_timeout_s, exception_thrown_on_timeout=__RetriableTimeoutException, ) return timeout_individual_fn_call_decorator(f)(*args, **kwargs) return f(*args, **kwargs) acceptable_exceptions: Tuple[Type[Exception], ...] = ( exception_to_check if isinstance(exception_to_check, tuple) else (exception_to_check,) ) acceptable_exceptions = acceptable_exceptions + ( __RetriableTimeoutException, ) ret_val: T while mtries >= 0: try: ret_val = fn(*args, **kwargs) break except TimedOutException: raise except acceptable_exceptions as e: if mtries == 0: # Failed for the final time logger.exception(f"Failed for the last time: {e}") if should_throw_original_exception: raise # Reraise original exception raise RetriesFailedException( f"Retry failed, permanently failing {f.__module__}:{f.__name__}, see logs for {e}" ) msg = f"{e}, Retrying {f.__module__}:{f.__name__} in {mdelay} seconds..." logger.warning(msg) time.sleep(mdelay) mtries -= 1 mdelay = ( min(mdelay * backoff, max_delay_s) if max_delay_s else mdelay * backoff ) return ret_val if deadline_s is not None: global_retry_timeout_decorator = timeout(seconds=deadline_s) return global_retry_timeout_decorator(f_retry) return f_retry return deco_retry