20
20
21
21
from . import conversion
22
22
23
- S = TypeVar ('S' )
24
- T = TypeVar ('T' )
25
- T_co = TypeVar (' T_co' )
26
- T_con = TypeVar (' T_con' )
23
+ S = TypeVar ("S" )
24
+ T = TypeVar ("T" )
25
+ T_co = TypeVar (" T_co" )
26
+ T_con = TypeVar (" T_con" )
27
27
LogicHandler = Callable [[T , List [Any ]], Awaitable [None ]]
28
28
29
29
logger = logging .getLogger (__name__ )
30
30
31
- magicOriginVar : contextvars .ContextVar [List [Any ]] = contextvars .ContextVar (' shc_origin' )
31
+ magicOriginVar : contextvars .ContextVar [List [Any ]] = contextvars .ContextVar (" shc_origin" )
32
32
MAX_CONCURRENT_UPDATE_DELAY = 0.02
33
33
34
- C = TypeVar ('C' , bound = "Connectable" )
34
+ C = TypeVar ("C" , bound = "Connectable" )
35
35
36
36
37
37
class ResetOriginSentinel :
@@ -45,14 +45,18 @@ class Connectable(Generic[T], metaclass=abc.ABCMeta):
45
45
"""
46
46
:cvar type: The type of the values, this object is supposed to handle
47
47
"""
48
+
48
49
type : Type [T ]
49
50
50
- def connect (self : C , other : "Connectable[S]" ,
51
- send : Optional [bool ] = None ,
52
- receive : Optional [bool ] = None ,
53
- read : Optional [bool ] = None ,
54
- provide : Optional [bool ] = None ,
55
- convert : Union [bool , Tuple [Callable [[T ], S ], Callable [[S ], T ]]] = False ) -> C :
51
+ def connect (
52
+ self : C ,
53
+ other : "Connectable[S]" ,
54
+ send : Optional [bool ] = None ,
55
+ receive : Optional [bool ] = None ,
56
+ read : Optional [bool ] = None ,
57
+ provide : Optional [bool ] = None ,
58
+ convert : Union [bool , Tuple [Callable [[T ], S ], Callable [[S ], T ]]] = False ,
59
+ ) -> C :
56
60
"""
57
61
Subscribe self to other and set as default_provider and vice versa (depending on the two object's capabilities,
58
62
optionalities and given parameters).
@@ -83,43 +87,62 @@ def connect(self: C, other: "Connectable[S]",
83
87
if isinstance (other , ConnectableWrapper ):
84
88
# If other object is not connectable itself but wraps one or more connectable objects (like, for example, a
85
89
# `web.widgets.ValueButtonGroup`), let it use its special implementation of `connect()`.
86
- other .connect (self , send = receive , receive = send , read = provide , provide = read ,
87
- convert = ((convert [1 ], convert [0 ]) if isinstance (convert , tuple ) else convert ))
90
+ other .connect (
91
+ self ,
92
+ send = receive ,
93
+ receive = send ,
94
+ read = provide ,
95
+ provide = read ,
96
+ convert = ((convert [1 ], convert [0 ]) if isinstance (convert , tuple ) else convert ),
97
+ )
88
98
else :
89
99
self ._connect_with (self , other , send , provide , convert [0 ] if isinstance (convert , tuple ) else convert )
90
100
self ._connect_with (other , self , receive , read , convert [1 ] if isinstance (convert , tuple ) else convert )
91
101
return self
92
102
93
103
@staticmethod
94
- def _connect_with (source : "Connectable" , target : "Connectable" , send : Optional [bool ], provide : Optional [bool ],
95
- convert : Union [bool , Callable ]):
104
+ def _connect_with (
105
+ source : "Connectable" ,
106
+ target : "Connectable" ,
107
+ send : Optional [bool ],
108
+ provide : Optional [bool ],
109
+ convert : Union [bool , Callable ],
110
+ ):
96
111
if isinstance (source , Subscribable ) and isinstance (target , Writable ) and (send or send is None ):
97
112
source .subscribe (target , convert = convert )
98
113
elif send and not isinstance (source , Subscribable ):
99
114
raise TypeError ("Cannot subscribe {} to {}, since the latter is not Subscribable" .format (target , source ))
100
115
elif send and not isinstance (target , Writable ):
101
116
raise TypeError ("Cannot subscribe {} to {}, since the former is not Writable" .format (target , source ))
102
- if isinstance (source , Readable ) and isinstance (target , Reading ) \
103
- and (provide or (provide is None and not target .is_reading_optional )):
117
+ if (
118
+ isinstance (source , Readable )
119
+ and isinstance (target , Reading )
120
+ and (provide or (provide is None and not target .is_reading_optional ))
121
+ ):
104
122
target .set_provider (source , convert = convert )
105
123
elif provide and not isinstance (source , Readable ):
106
- raise TypeError ("Cannot use {} as read provider for {}, since the former is not Readable"
107
- .format (source , target ))
124
+ raise TypeError (
125
+ "Cannot use {} as read provider for {}, since the former is not Readable" .format (source , target )
126
+ )
108
127
elif provide and not isinstance (target , Reading ):
109
- raise TypeError ("Cannot use {} as read provider for {}, since the latter is not Reading"
110
- .format (source , target ))
128
+ raise TypeError (
129
+ "Cannot use {} as read provider for {}, since the latter is not Reading" .format (source , target )
130
+ )
111
131
112
132
113
133
class ConnectableWrapper (Connectable [T ], Generic [T ], metaclass = abc .ABCMeta ):
114
134
type : Type [T ]
115
135
116
136
@abc .abstractmethod
117
- def connect (self : C , other : "Connectable" ,
118
- send : Optional [bool ] = None ,
119
- receive : Optional [bool ] = None ,
120
- read : Optional [bool ] = None ,
121
- provide : Optional [bool ] = None ,
122
- convert : Union [bool , Tuple [Callable [[T ], Any ], Callable [[Any ], T ]]] = False ) -> C :
137
+ def connect (
138
+ self : C ,
139
+ other : "Connectable" ,
140
+ send : Optional [bool ] = None ,
141
+ receive : Optional [bool ] = None ,
142
+ read : Optional [bool ] = None ,
143
+ provide : Optional [bool ] = None ,
144
+ convert : Union [bool , Tuple [Callable [[T ], Any ], Callable [[Any ], T ]]] = False ,
145
+ ) -> C :
123
146
pass
124
147
125
148
@@ -208,8 +231,15 @@ def __init__(self, *args, **kwargs):
208
231
self ._triggers : List [Tuple [LogicHandler , bool ]] = []
209
232
self ._pending_updates : Dict [int , Dict [asyncio .Task , Optional [int ]]] = {}
210
233
211
- async def __publish_write (self , subscriber : Writable [S ], converter : Optional [Callable [[T_co ], S ]], value : T_co ,
212
- origin : List [Any ], use_pending : bool , wait_and_check : Optional [int ] = None ):
234
+ async def __publish_write (
235
+ self ,
236
+ subscriber : Writable [S ],
237
+ converter : Optional [Callable [[T_co ], S ]],
238
+ value : T_co ,
239
+ origin : List [Any ],
240
+ use_pending : bool ,
241
+ wait_and_check : Optional [int ] = None ,
242
+ ):
213
243
try :
214
244
if wait_and_check is not None :
215
245
await asyncio .sleep (random .random () * 0.02 )
@@ -222,8 +252,14 @@ async def __publish_write(self, subscriber: Writable[S], converter: Optional[Cal
222
252
if use_pending :
223
253
del self ._pending_updates [id (subscriber )][asyncio .current_task ()] # type: ignore
224
254
225
- async def __publish_trigger (self , target : LogicHandler , value : T_co ,
226
- origin : List [Any ], use_pending : bool , wait_and_check : Optional [int ] = None ):
255
+ async def __publish_trigger (
256
+ self ,
257
+ target : LogicHandler ,
258
+ value : T_co ,
259
+ origin : List [Any ],
260
+ use_pending : bool ,
261
+ wait_and_check : Optional [int ] = None ,
262
+ ):
227
263
try :
228
264
if wait_and_check is not None :
229
265
await asyncio .sleep (random .random () * 0.02 )
@@ -272,28 +308,42 @@ def _publish(self, value: T_co, origin: List[Any]):
272
308
for subscriber , converter in self ._subscribers :
273
309
prev_step = id (origin [- 1 ]) if origin else None
274
310
first_step = origin [0 ] if origin else None
275
- reset_origin = (any (o != prev_step for o in self ._pending_updates [id (subscriber )].values ())
276
- or first_step is RESET_ORIGIN_SENTINEL )
311
+ reset_origin = (
312
+ any (o != prev_step for o in self ._pending_updates [id (subscriber )].values ())
313
+ or first_step is RESET_ORIGIN_SENTINEL
314
+ )
277
315
if reset_origin :
278
316
logger .info ("Resetting origin from %s to %s; value=%s; origin=%s" , self , subscriber , value , origin )
279
317
if reset_origin or not any (s is subscriber for s in origin ):
280
318
task = asyncio .create_task (
281
- self .__publish_write (subscriber , converter , value ,
282
- [RESET_ORIGIN_SENTINEL ] if reset_origin else origin ,
283
- use_pending = True ,
284
- wait_and_check = self ._publish_count if reset_origin else None ))
319
+ self .__publish_write (
320
+ subscriber ,
321
+ converter ,
322
+ value ,
323
+ [RESET_ORIGIN_SENTINEL ] if reset_origin else origin ,
324
+ use_pending = True ,
325
+ wait_and_check = self ._publish_count if reset_origin else None ,
326
+ )
327
+ )
285
328
self ._pending_updates [id (subscriber )][task ] = prev_step
286
329
for target , sync in self ._triggers :
287
330
reset_origin = False
288
331
if sync :
289
332
prev_step = id (origin [- 1 ]) if origin else None
290
333
first_step = origin [0 ] if origin else None
291
- reset_origin = (any (o != prev_step for o in self ._pending_updates [id (target )].values ())
292
- or first_step is RESET_ORIGIN_SENTINEL )
334
+ reset_origin = (
335
+ any (o != prev_step for o in self ._pending_updates [id (target )].values ())
336
+ or first_step is RESET_ORIGIN_SENTINEL
337
+ )
293
338
task = asyncio .create_task (
294
- self .__publish_trigger (target , value , [RESET_ORIGIN_SENTINEL ] if reset_origin else origin ,
295
- use_pending = sync ,
296
- wait_and_check = (self ._publish_count if reset_origin else None )))
339
+ self .__publish_trigger (
340
+ target ,
341
+ value ,
342
+ [RESET_ORIGIN_SENTINEL ] if reset_origin else origin ,
343
+ use_pending = sync ,
344
+ wait_and_check = (self ._publish_count if reset_origin else None ),
345
+ )
346
+ )
297
347
if sync :
298
348
self ._pending_updates [id (target )][task ] = prev_step
299
349
@@ -322,13 +372,14 @@ async def _publish_and_wait(self, value: T_co, origin: List[Any]):
322
372
for target , sync in self ._triggers :
323
373
if not sync :
324
374
asyncio .create_task (self .__publish_trigger (target , value , origin , False ))
325
- sync_jobs = [self .__publish_write (subscriber , converter , value , origin , False )
326
- for subscriber , converter in self ._subscribers
327
- if not any (s is subscriber for s in origin )]
375
+ sync_jobs = [
376
+ self .__publish_write (subscriber , converter , value , origin , False )
377
+ for subscriber , converter in self ._subscribers
378
+ if not any (s is subscriber for s in origin )
379
+ ]
328
380
sync_jobs .extend (
329
- self .__publish_trigger (target , value , origin , False )
330
- for target , sync in self ._triggers
331
- if sync )
381
+ self .__publish_trigger (target , value , origin , False ) for target , sync in self ._triggers if sync
382
+ )
332
383
if len (sync_jobs ) == 1 :
333
384
await sync_jobs [0 ]
334
385
elif sync_jobs :
@@ -358,8 +409,11 @@ def subscribe(self, subscriber: Writable[S], convert: Union[Callable[[T_co], S],
358
409
elif convert :
359
410
converter = conversion .get_converter (self .type , subscriber .type )
360
411
else :
361
- raise TypeError ("Type mismatch of subscriber {} ({}) for {} ({})"
362
- .format (repr (subscriber ), subscriber .type .__name__ , repr (self ), self .type .__name__ ))
412
+ raise TypeError (
413
+ "Type mismatch of subscriber {} ({}) for {} ({})" .format (
414
+ repr (subscriber ), subscriber .type .__name__ , repr (self ), self .type .__name__
415
+ )
416
+ )
363
417
self ._subscribers .append ((subscriber , converter ))
364
418
if self ._stateful_publishing :
365
419
self ._pending_updates [id (subscriber )] = {}
@@ -431,8 +485,11 @@ def set_provider(self, provider: Readable[S], convert: Union[Callable[[S], T_con
431
485
elif convert :
432
486
converter = conversion .get_converter (provider .type , self .type )
433
487
else :
434
- raise TypeError ("Type mismatch of Readable {} ({}) as provider for {} ({})"
435
- .format (repr (provider ), provider .type .__name__ , repr (self ), self .type .__name__ ))
488
+ raise TypeError (
489
+ "Type mismatch of Readable {} ({}) as provider for {} ({})" .format (
490
+ repr (provider ), provider .type .__name__ , repr (self ), self .type .__name__
491
+ )
492
+ )
436
493
self ._default_provider = (provider , converter )
437
494
438
495
async def _from_provider (self ) -> Optional [T_con ]:
@@ -454,9 +511,7 @@ async def _from_provider(self) -> Optional[T_con]:
454
511
return convert (val ) if convert else val
455
512
456
513
457
- LogicHandlerOptionalParams = Union [LogicHandler ,
458
- Callable [[T ], Awaitable [None ]],
459
- Callable [[], Awaitable [None ]]]
514
+ LogicHandlerOptionalParams = Union [LogicHandler , Callable [[T ], Awaitable [None ]], Callable [[], Awaitable [None ]]]
460
515
461
516
462
517
def handler (reset_origin = False , allow_recursion = False ) -> Callable [[LogicHandlerOptionalParams ], LogicHandler ]:
@@ -483,6 +538,7 @@ def handler(reset_origin=False, allow_recursion=False) -> Callable[[LogicHandler
483
538
passed values and/or the `origin` list itself to prevent infinite feedback loops via `write` calls or calls to
484
539
other logic handlers – especiaally when used together with `reset_origin`.
485
540
"""
541
+
486
542
def decorator (f : LogicHandlerOptionalParams ) -> LogicHandler :
487
543
num_args = _count_function_args (f )
488
544
@@ -508,13 +564,13 @@ async def wrapper(value, origin: Optional[List[Any]] = None) -> None:
508
564
magicOriginVar .reset (token )
509
565
except Exception as e :
510
566
logger .error ("Error while executing handler %s():" , f .__name__ , exc_info = e )
567
+
511
568
return wrapper
569
+
512
570
return decorator
513
571
514
572
515
- BlockingLogicHandlerOptionalParams = Union [Callable [[T , List [Any ]], None ],
516
- Callable [[T ], None ],
517
- Callable [[], None ]]
573
+ BlockingLogicHandlerOptionalParams = Union [Callable [[T , List [Any ]], None ], Callable [[T ], None ], Callable [[], None ]]
518
574
519
575
520
576
def blocking_handler () -> Callable [[BlockingLogicHandlerOptionalParams ], LogicHandler ]:
@@ -531,6 +587,7 @@ def blocking_handler() -> Callable[[BlockingLogicHandlerOptionalParams], LogicHa
531
587
include special measures for preparing and passing the `origin` list or avoiding recursive execution. Still, it
532
588
takes care of the correct number of arguments (zero to two) for calling the function.
533
589
"""
590
+
534
591
def decorator (f : BlockingLogicHandlerOptionalParams ) -> LogicHandler :
535
592
num_args = _count_function_args (f )
536
593
@@ -549,7 +606,9 @@ async def wrapper(value, origin: Optional[List[Any]] = None) -> None:
549
606
await loop .run_in_executor (None , f , * args ) # type: ignore
550
607
except Exception as e :
551
608
logger .error ("Error while executing handler %s():" , f .__name__ , exc_info = e )
609
+
552
610
return wrapper
611
+
553
612
return decorator
554
613
555
614
0 commit comments