hyb
2026-01-09 4cb426cb3ae31e772a09d4ade5b2f0242aaeefa0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import asyncio
import threading
from typing import Any, Callable, Coroutine
 
 
class BackgroundScheduler:
    """
    Schedules background tasks execution either in separate thread or in the running event loop.
    """
 
    def __init__(self):
        self._next_timer = None
        self._event_loops = []
        self._lock = threading.Lock()
        self._stopped = False
 
    def __del__(self):
        self.stop()
 
    def stop(self):
        """
        Stop all scheduled tasks and clean up resources.
        """
        with self._lock:
            if self._stopped:
                return
            self._stopped = True
 
            if self._next_timer:
                self._next_timer.cancel()
                self._next_timer = None
 
            # Stop all event loops
            for loop in self._event_loops:
                if loop.is_running():
                    loop.call_soon_threadsafe(loop.stop)
 
            self._event_loops.clear()
 
    def run_once(self, delay: float, callback: Callable, *args):
        """
        Runs callable task once after certain delay in seconds.
        """
        with self._lock:
            if self._stopped:
                return
 
        # Run loop in a separate thread to unblock main thread.
        loop = asyncio.new_event_loop()
 
        with self._lock:
            self._event_loops.append(loop)
 
        thread = threading.Thread(
            target=_start_event_loop_in_thread,
            args=(loop, self._call_later, delay, callback, *args),
            daemon=True,
        )
        thread.start()
 
    def run_recurring(self, interval: float, callback: Callable, *args):
        """
        Runs recurring callable task with given interval in seconds.
        """
        with self._lock:
            if self._stopped:
                return
 
        # Run loop in a separate thread to unblock main thread.
        loop = asyncio.new_event_loop()
 
        with self._lock:
            self._event_loops.append(loop)
 
        thread = threading.Thread(
            target=_start_event_loop_in_thread,
            args=(loop, self._call_later_recurring, interval, callback, *args),
            daemon=True,
        )
        thread.start()
 
    async def run_recurring_async(
        self, interval: float, coro: Callable[..., Coroutine[Any, Any, Any]], *args
    ):
        """
        Runs recurring coroutine with given interval in seconds in the current event loop.
        To be used only from an async context. No additional threads are created.
        """
        with self._lock:
            if self._stopped:
                return
 
        loop = asyncio.get_running_loop()
        wrapped = _async_to_sync_wrapper(loop, coro, *args)
 
        def tick():
            with self._lock:
                if self._stopped:
                    return
            # Schedule the coroutine
            wrapped()
            # Schedule next tick
            self._next_timer = loop.call_later(interval, tick)
 
        # Schedule first tick
        self._next_timer = loop.call_later(interval, tick)
 
    def _call_later(
        self, loop: asyncio.AbstractEventLoop, delay: float, callback: Callable, *args
    ):
        with self._lock:
            if self._stopped:
                return
        self._next_timer = loop.call_later(delay, callback, *args)
 
    def _call_later_recurring(
        self,
        loop: asyncio.AbstractEventLoop,
        interval: float,
        callback: Callable,
        *args,
    ):
        with self._lock:
            if self._stopped:
                return
        self._call_later(
            loop, interval, self._execute_recurring, loop, interval, callback, *args
        )
 
    def _execute_recurring(
        self,
        loop: asyncio.AbstractEventLoop,
        interval: float,
        callback: Callable,
        *args,
    ):
        """
        Executes recurring callable task with given interval in seconds.
        """
        with self._lock:
            if self._stopped:
                return
 
        try:
            callback(*args)
        except Exception:
            # Silently ignore exceptions during shutdown
            pass
 
        with self._lock:
            if self._stopped:
                return
 
        self._call_later(
            loop, interval, self._execute_recurring, loop, interval, callback, *args
        )
 
 
def _start_event_loop_in_thread(
    event_loop: asyncio.AbstractEventLoop, call_soon_cb: Callable, *args
):
    """
    Starts event loop in a thread and schedule callback as soon as event loop is ready.
    Used to be able to schedule tasks using loop.call_later.
 
    :param event_loop:
    :return:
    """
    asyncio.set_event_loop(event_loop)
    event_loop.call_soon(call_soon_cb, event_loop, *args)
    try:
        event_loop.run_forever()
    finally:
        try:
            # Clean up pending tasks
            pending = asyncio.all_tasks(event_loop)
            for task in pending:
                task.cancel()
            # Run loop once more to process cancellations
            event_loop.run_until_complete(
                asyncio.gather(*pending, return_exceptions=True)
            )
        except Exception:
            pass
        finally:
            event_loop.close()
 
 
def _async_to_sync_wrapper(loop, coro_func, *args, **kwargs):
    """
    Wraps an asynchronous function so it can be used with loop.call_later.
 
    :param loop: The event loop in which the coroutine will be executed.
    :param coro_func: The coroutine function to wrap.
    :param args: Positional arguments to pass to the coroutine function.
    :param kwargs: Keyword arguments to pass to the coroutine function.
    :return: A regular function suitable for loop.call_later.
    """
 
    def wrapped():
        # Schedule the coroutine in the event loop
        asyncio.ensure_future(coro_func(*args, **kwargs), loop=loop)
 
    return wrapped