Skip to content

Commit 4e10e59

Browse files
committed
Add DelayedVariable class
1 parent 1eaa7e9 commit 4e10e59

File tree

4 files changed

+104
-7
lines changed

4 files changed

+104
-7
lines changed

docs/variables_expressions.rst

+6
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ Equally, they can be retrieved via the :meth:`VariableField.field` method.
8686
Use :class:`shc.misc.UpdateExchange` to split up NamedTuple-based value updates in a stateless way:
8787
It provides an equal way for subscribing to fields of the NamedTuple via the :meth:`shc.misc.UpdateExchange.field` method but does not store the latest value and does not suppress value updates with unchanged values.
8888

89+
DelayedVariable
90+
^^^^^^^^^^^^^^^
91+
92+
.. autoclass:: shc.variables.DelayedVariable
93+
94+
8995
.. _expressions:
9096

9197
Expressions

shc/timer.py

+2
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,8 @@ class RateLimitedSubscription(Subscribable[T], Generic[T]):
716716
A transparent wrapper for `Subscribable` objects, that delays and drops values to make sure that a given maximum
717717
rate of new values is not exceeded.
718718
719+
See also :class:`shc.variables.DelayedVariable` for a similar (but slightly different) behaviour.
720+
719721
:param wrapped: The Subscribable object to be wrapped
720722
:param min_interval: The minimal allowed interval between published values in seconds
721723
"""

shc/variables.py

+60-7
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
# specific language governing permissions and limitations under the License.
1111

1212
import asyncio
13+
import datetime
1314
import logging
1415
import warnings
1516
from typing import Generic, Type, Optional, List, Any, Union, Dict
1617

18+
from . import timer
1719
from .base import Writable, T, Readable, Subscribable, UninitializedError, Reading
1820
from .expressions import ExpressionWrapper
1921

@@ -46,7 +48,7 @@ def __init__(self, type_: Type[T], name: Optional[str] = None, initial_value: Op
4648
self._value: Optional[T] = initial_value
4749
self._variable_fields: Dict[str, "VariableField"] = {}
4850

49-
# Create VariableFields for each typeannotated field of the type if it is typing.NamedTuple-based.
51+
# Create VariableFields for each type-annotated field of the type if it is typing.NamedTuple-based.
5052
if issubclass(type_, tuple) and type_.__annotations__:
5153
for name, field_type in type_.__annotations__.items():
5254
variable_field = VariableField(self, name, field_type)
@@ -73,11 +75,16 @@ async def _write(self, value: T, origin: List[Any]) -> None:
7375
self._value = value
7476
if old_value != value: # if a single field is different, the full value will also be different
7577
logger.info("New value %s for Variable %s from %s", value, self, origin[:1])
76-
self._publish(value, origin)
77-
for name, field in self._variable_fields.items():
78-
field._recursive_publish(getattr(value, name),
79-
None if old_value is None else getattr(old_value, name),
80-
origin)
78+
self._do_all_publish(old_value, origin)
79+
80+
def _do_all_publish(self, old_value: Optional[T], origin: List[Any]) -> None:
81+
logger.debug("Publishing value %s for Variable %s", self._value, self)
82+
assert self._value is not None
83+
self._publish(self._value, origin)
84+
for name, field in self._variable_fields.items():
85+
field._recursive_publish(getattr(self._value, name),
86+
None if old_value is None else getattr(old_value, name),
87+
origin)
8188

8289
async def read(self) -> T:
8390
if self._value is None:
@@ -96,7 +103,7 @@ def EX(self) -> ExpressionWrapper:
96103

97104
def __repr__(self) -> str:
98105
if self.name:
99-
return "<Variable \"{}\">".format(self.name)
106+
return "<{} \"{}\">".format(self.__class__.__name__, self.name)
100107
else:
101108
return super().__repr__()
102109

@@ -158,3 +165,49 @@ async def read(self) -> T:
158165
@property
159166
def EX(self) -> ExpressionWrapper:
160167
return ExpressionWrapper(self)
168+
169+
170+
class DelayedVariable(Variable[T], Generic[T]):
171+
"""
172+
A Variable object, which delays the updates to avoid publishing half-updated values
173+
174+
This is achieved by delaying the publishing of a newly received value by a configurable amount of time
175+
(`publish_delay`). If more value updates are received while a previous update publishing is still pending, the
176+
latest value will be published at the originally scheduled publishing time. There will be no publishing of the
177+
intermediate values. The next value update received after the publishing will be delayed by the configured delay
178+
time again, resulting in a maximum update interval of the specified delay time.
179+
180+
This is similar (but slightly different) to the behaviour of :class:`shc.misc.RateLimitedSubscription`.
181+
182+
:param type_: The Variable's value type (used for its ``.type`` attribute, i.e. for the *Connectable* type
183+
checking mechanism)
184+
:param name: An optional name of the variable. Used for logging and future displaying purposes.
185+
:param initial_value: An optional initial value for the Variable. If not provided and no default provider is
186+
set via :meth:`set_provider`, the Variable is initialized with a None value and any :meth:`read` request
187+
will raise an :exc:`shc.base.UninitializedError` until the first value update is received.
188+
:param publish_delay: Amount of time to delay the publishing of a new value.
189+
"""
190+
def __init__(self, type_: Type[T], name: Optional[str] = None, initial_value: Optional[T] = None,
191+
publish_delay: datetime.timedelta = datetime.timedelta(seconds=0.25)):
192+
super().__init__(type_, name, initial_value)
193+
self._publish_delay = publish_delay
194+
self._pending_publish_task: Optional[asyncio.Task] = None
195+
self._latest_origin: List[Any] = []
196+
197+
async def _write(self, value: T, origin: List[Any]) -> None:
198+
old_value = self._value
199+
self._value = value
200+
self._latest_origin = origin
201+
if old_value != value: # if a single field is different, the full value will also be different
202+
logger.info("New value %s for Variable %s from %s", value, self, origin[:1])
203+
if not self._pending_publish_task:
204+
self._pending_publish_task = asyncio.create_task(self._wait_and_publish(old_value))
205+
timer.timer_supervisor.add_temporary_task(self._pending_publish_task)
206+
207+
async def _wait_and_publish(self, old_value: Optional[T]) -> None:
208+
try:
209+
await asyncio.sleep(self._publish_delay.total_seconds())
210+
except asyncio.CancelledError:
211+
pass
212+
self._do_all_publish(old_value, self._latest_origin)
213+
self._pending_publish_task = None

test/test_variables.py

+36
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import datetime
23
import unittest
34
import unittest.mock
45
import warnings
@@ -292,6 +293,41 @@ async def test_concurrent_field_update_publishing(self) -> None:
292293
self.assertEqual(writable1._write.call_args[0][0], writable3._write.call_args[0][0])
293294

294295

296+
class DelayedVariableTest(unittest.TestCase):
297+
@async_test
298+
async def test_simple(self):
299+
var = variables.DelayedVariable(int, name="A test variable", publish_delay=datetime.timedelta(seconds=0.02))
300+
subscriber = ExampleWritable(int)
301+
var.subscribe(subscriber)
302+
303+
await var.write(5, [])
304+
self.assertEqual(5, await var.read())
305+
await asyncio.sleep(0)
306+
await var.write(42, [self])
307+
self.assertEqual(42, await var.read())
308+
await asyncio.sleep(0.025)
309+
subscriber._write.assert_called_once_with(42, [self, var])
310+
311+
@async_test
312+
async def test_field_update(self):
313+
var = variables.DelayedVariable(ExampleTupleType,
314+
name="A test variable",
315+
initial_value=ExampleTupleType(0, 0.0),
316+
publish_delay=datetime.timedelta(seconds=0.02))
317+
field_subscriber = ExampleWritable(int)
318+
subscriber = ExampleWritable(ExampleTupleType)
319+
var.subscribe(subscriber)
320+
var.field('a').subscribe(field_subscriber)
321+
322+
await var.field('a').write(21, [self])
323+
await asyncio.sleep(0)
324+
await var.field('b').write(3.1416, [self])
325+
self.assertEqual(ExampleTupleType(21, 3.1416), await var.read())
326+
await asyncio.sleep(0.025)
327+
subscriber._write.assert_called_once_with(ExampleTupleType(21, 3.1416), [self, var.field('b'), var])
328+
field_subscriber._write.assert_called_once_with(21, [self, var.field('b'), var.field('a')])
329+
330+
295331
class MyPyPluginTest(unittest.TestCase):
296332
def test_mypy_plugin_variable(self) -> None:
297333
asset_dir = Path(__file__).parent / 'assets' / 'mypy_plugin_test'

0 commit comments

Comments
 (0)