|
1 | 1 | """Utilities for timers, interval trackers, etc."""
|
2 | 2 |
|
3 | 3 | import asyncio
|
| 4 | +import itertools |
4 | 5 | import logging
|
5 | 6 | import time
|
6 | 7 | from typing import Union
|
@@ -29,50 +30,68 @@ def __init__(
|
29 | 30 | self.logger = logging.getLogger(logger)
|
30 | 31 |
|
31 | 32 | def fastforward(self):
|
32 |
| - """Reset the timer so that the next call to `has_interval_elapsed` will return True. |
| 33 | + """Force the interval timer to consider the current interval as already elapsed. |
33 | 34 |
|
34 |
| - This effectively skips the current interval and forces the timer to indicate |
35 |
| - that the interval has elapsed on the next check. |
| 35 | + This resets the internal timer to a state where the next call to `has_interval_elapsed` |
| 36 | + will immediately return `True`, as if the interval has already passed. |
36 | 37 | """
|
37 | 38 | self._last_time = float("-inf")
|
38 | 39 |
|
39 |
| - async def wait_until_interval(self, frequency: float = 1.0) -> None: |
| 40 | + @staticmethod |
| 41 | + def _is_nth(i: int, nth: int) -> bool: |
| 42 | + return nth > 0 and i % nth == 0 |
| 43 | + |
| 44 | + async def wait_until_interval( |
| 45 | + self, |
| 46 | + frequency: float = 1.0, |
| 47 | + log_every_nth: int = 60, |
| 48 | + ) -> None: |
40 | 49 | """Wait asynchronously until the specified interval has elapsed.
|
41 | 50 |
|
42 | 51 | This method checks the elapsed time every `frequency` seconds,
|
43 | 52 | allowing cooperative multitasking during the wait.
|
44 | 53 | """
|
45 | 54 | if self.logger:
|
46 | 55 | self.logger.debug(
|
47 |
| - f"Waiting until {self.seconds}s has elapsed since the last iteration..." |
| 56 | + f"Waiting for {self.seconds}s interval before proceeding..." |
48 | 57 | )
|
49 |
| - while not self.has_interval_elapsed(): |
| 58 | + |
| 59 | + for i in itertools.count(): |
| 60 | + if self.has_interval_elapsed(do_log=self._is_nth(i, log_every_nth)): |
| 61 | + return |
50 | 62 | await asyncio.sleep(frequency)
|
51 | 63 |
|
52 |
| - def wait_until_interval_sync(self, frequency: float = 1.0) -> None: |
| 64 | + def wait_until_interval_sync( |
| 65 | + self, |
| 66 | + frequency: float = 1.0, |
| 67 | + log_every_nth: int = 60, |
| 68 | + ) -> None: |
53 | 69 | """Wait until the specified interval has elapsed.
|
54 | 70 |
|
55 | 71 | This method checks the elapsed time every `frequency` seconds,
|
56 | 72 | blocking until the interval has elapsed.
|
57 | 73 | """
|
58 | 74 | if self.logger:
|
59 | 75 | self.logger.debug(
|
60 |
| - f"Waiting until {self.seconds}s has elapsed since the last iteration..." |
| 76 | + f"Waiting for {self.seconds}s interval before proceeding..." |
61 | 77 | )
|
62 |
| - while not self.has_interval_elapsed(): |
| 78 | + |
| 79 | + for i in itertools.count(): |
| 80 | + if self.has_interval_elapsed(do_log=self._is_nth(i, log_every_nth)): |
| 81 | + return |
63 | 82 | time.sleep(frequency)
|
64 | 83 |
|
65 |
| - def has_interval_elapsed(self) -> bool: |
| 84 | + def has_interval_elapsed(self, do_log: bool = True) -> bool: |
66 | 85 | """Check if the specified time interval has elapsed since the last expiration.
|
67 | 86 |
|
68 | 87 | If the interval has elapsed, the internal timer is reset to the current time.
|
69 | 88 | """
|
70 | 89 | diff = time.monotonic() - self._last_time
|
71 | 90 | if diff >= self.seconds:
|
72 | 91 | self._last_time = time.monotonic()
|
73 |
| - if self.logger: |
| 92 | + if self.logger and do_log: |
74 | 93 | self.logger.debug(
|
75 |
| - f"At least {self.seconds}s have elapsed (actually {diff}s)." |
| 94 | + f"Interval elapsed: {self.seconds}s (actual: {diff:.3f}s)." |
76 | 95 | )
|
77 | 96 | return True
|
78 | 97 | return False
|
0 commit comments