-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreset_counter_block.py
34 lines (27 loc) · 1.14 KB
/
reset_counter_block.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from nio.block import input
from nio.properties import VersionProperty
from .counter_block import Counter
@input('count', label='Count', default=True, order=1)
@input('reset', label='Reset', order=2)
class ResettableCounter(Counter):
""" The same as the counter block but with an input to reset a
cumulative count """
version = VersionProperty("0.2.0")
def process_signals(self, signals, input_id='count'):
if input_id == 'reset':
self.notify_signals(self.for_each_group(
self.reset_group_from_signals, signals))
else:
super().process_signals(signals)
def reset_group_from_signals(self, signals, key):
""" Reset the groups inlcuded in the list of incoming signals.
A method that takes signals as the first argument for use in
the group by mixin's for_each_group method.
"""
try:
return self.reset_group(key)
except KeyError:
# resetting a group that hasn't been counted yet,
# create it with a count of 0 and try again
self._cumulative_count[key] = 0
return self.reset_group(key)