-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathasync_cb_coro.py
135 lines (109 loc) · 3.49 KB
/
async_cb_coro.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# async_cb_coro.py
#
# An example of how to implement coroutine based concurrency layered
# on top of a callback-based scheduler.
import time
from collections import deque
import heapq
# Callback based scheduler (from earlier)
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.current = None
self.sleeping = [] # Sleeping functions
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
# Priority queue
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# Find the nearest deadline
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
# Coroutine-based functions
def new_task(self, coro):
self.ready.append(Task(coro)) # Wrapped coroutine
async def sleep(self, delay):
self.call_later(delay, self.current)
self.current = None
await switch() # Switch to a new task
# Class that wraps a coroutine--making it look like a callback
class Task:
def __init__(self, coro):
self.coro = coro # "Wrapped coroutine"
# Make it look like a callback
def __call__(self):
try:
# Driving the coroutine as before
sched.current = self
self.coro.send(None)
if sched.current:
sched.ready.append(self)
except StopIteration:
pass
class Awaitable:
def __await__(self):
yield
def switch():
return Awaitable()
sched = Scheduler() # Background scheduler object
# ----------------
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
async def put(self, item):
self.items.append(item)
if self.waiting:
sched.ready.append(self.waiting.popleft())
async def get(self):
if not self.items:
self.waiting.append(sched.current) # Put myself to sleep
sched.current = None # "Disappear"
await switch() # Switch to another task
return self.items.popleft()
# Coroutine-based tasks
async def producer(q, count):
for n in range(count):
print('Producing', n)
await q.put(n)
await sched.sleep(1)
print('Producer done')
await q.put(None) # "Sentinel" to shut down
async def consumer(q):
while True:
item = await q.get()
if item is None:
break
print('Consuming', item)
print('Consumer done')
q = AsyncQueue()
sched.new_task(producer(q, 10))
sched.new_task(consumer(q))
# Call-back based tasks
def countdown(n):
if n > 0:
print('Down', n)
# time.sleep(4) # Blocking call (nothing else can run)
sched.call_later(4, lambda: countdown(n-1))
def countup(stop):
def _run(x):
if x < stop:
print('Up', x)
# time.sleep(1)
sched.call_later(1, lambda: _run(x+1))
_run(0)
sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(20))
sched.run()