hyb
2025-12-23 10f3a1daddfbc7fa3dd2069197d83e8b6ef19176
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import abc
import socket
from time import sleep
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Generic,
    Iterable,
    Optional,
    Tuple,
    Type,
    TypeVar,
)
 
from redis.exceptions import ConnectionError, TimeoutError
 
T = TypeVar("T")
E = TypeVar("E", bound=Exception, covariant=True)
 
if TYPE_CHECKING:
    from redis.backoff import AbstractBackoff
 
 
class AbstractRetry(Generic[E], abc.ABC):
    """Retry a specific number of times after a failure"""
 
    _supported_errors: Tuple[Type[E], ...]
 
    def __init__(
        self,
        backoff: "AbstractBackoff",
        retries: int,
        supported_errors: Tuple[Type[E], ...],
    ):
        """
        Initialize a `Retry` object with a `Backoff` object
        that retries a maximum of `retries` times.
        `retries` can be negative to retry forever.
        You can specify the types of supported errors which trigger
        a retry with the `supported_errors` parameter.
        """
        self._backoff = backoff
        self._retries = retries
        self._supported_errors = supported_errors
 
    @abc.abstractmethod
    def __eq__(self, other: Any) -> bool:
        return NotImplemented
 
    def __hash__(self) -> int:
        return hash((self._backoff, self._retries, frozenset(self._supported_errors)))
 
    def update_supported_errors(self, specified_errors: Iterable[Type[E]]) -> None:
        """
        Updates the supported errors with the specified error types
        """
        self._supported_errors = tuple(
            set(self._supported_errors + tuple(specified_errors))
        )
 
    def get_retries(self) -> int:
        """
        Get the number of retries.
        """
        return self._retries
 
    def update_retries(self, value: int) -> None:
        """
        Set the number of retries.
        """
        self._retries = value
 
 
class Retry(AbstractRetry[Exception]):
    __hash__ = AbstractRetry.__hash__
 
    def __init__(
        self,
        backoff: "AbstractBackoff",
        retries: int,
        supported_errors: Tuple[Type[Exception], ...] = (
            ConnectionError,
            TimeoutError,
            socket.timeout,
        ),
    ):
        super().__init__(backoff, retries, supported_errors)
 
    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, Retry):
            return NotImplemented
 
        return (
            self._backoff == other._backoff
            and self._retries == other._retries
            and set(self._supported_errors) == set(other._supported_errors)
        )
 
    def call_with_retry(
        self,
        do: Callable[[], T],
        fail: Callable[[Exception], Any],
        is_retryable: Optional[Callable[[Exception], bool]] = None,
    ) -> T:
        """
        Execute an operation that might fail and returns its result, or
        raise the exception that was thrown depending on the `Backoff` object.
        `do`: the operation to call. Expects no argument.
        `fail`: the failure handler, expects the last error that was thrown
        """
        self._backoff.reset()
        failures = 0
        while True:
            try:
                return do()
            except self._supported_errors as error:
                if is_retryable and not is_retryable(error):
                    raise
                failures += 1
                fail(error)
                if self._retries >= 0 and failures > self._retries:
                    raise error
                backoff = self._backoff.compute(failures)
                if backoff > 0:
                    sleep(backoff)