Skip to content

Commit 4264096

Browse files
committed
CABI refactor: improve call_and_handle_blocking interface
1 parent 11fe9a6 commit 4264096

File tree

2 files changed

+62
-56
lines changed

2 files changed

+62
-56
lines changed

design/mvp/CanonicalABI.md

+41-38
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ class EventCode(IntEnum):
304304

305305
EventTuple = tuple[EventCode, int]
306306
EventCallback = Callable[[], EventTuple]
307-
OnBlockCallback = Callable[[Awaitable], any]
307+
OnBlockCallback = Callable[[Awaitable], Any]
308308
```
309309
The `CallState` enum describes the linear sequence of states that an async call
310310
necessarily transitions through: [`STARTING`](Async.md#backpressure), `STARTED`,
@@ -340,45 +340,48 @@ async def default_on_block(f):
340340
await current_task.acquire()
341341
return v
342342

343-
async def call_and_handle_blocking(callee):
344-
blocked = asyncio.Future()
343+
class Blocked: pass
344+
345+
async def call_and_handle_blocking(callee, *args) -> Blocked|Any:
346+
blocked_or_result = asyncio.Future[Blocked|Any]()
345347
async def on_block(f):
346-
if not blocked.done():
347-
blocked.set_result(True)
348+
if not blocked_or_result.done():
349+
blocked_or_result.set_result(Blocked())
348350
else:
349351
current_task.release()
350352
v = await f
351353
await current_task.acquire()
352354
return v
353355
async def do_call():
354-
await callee(on_block)
355-
if not blocked.done():
356-
blocked.set_result(False)
356+
result = await callee(*args, on_block)
357+
if not blocked_or_result.done():
358+
blocked_or_result.set_result(result)
357359
else:
358360
current_task.release()
359361
asyncio.create_task(do_call())
360-
return await blocked
362+
return await blocked_or_result
361363
```
362364
Talking through this little Python pretzel of control flow:
363365
1. `call_and_handle_blocking` starts by running `do_call` in a fresh Python
364366
task and then immediately `await`ing a future that will be resolved by
365367
`do_call`. Since `current_task` isn't `release()`d or `acquire()`d as part
366368
of this process, the net effect is to directly transfer control flow from
367369
`call_and_handle_blocking` to `do_call` task without allowing other tasks to
368-
run (as if by `cont.new` + `resume` in [stack-switching]).
370+
run (as if by the `cont.new` + `resume` instructions of [stack-switching]).
369371
2. `do_call` passes the local `on_block` closure to `callee`, which the
370-
Canonical ABI ensures will be called whenever there is a need to block.
371-
3. If `on_block` is called, the first time it resolves `blocking`. Because
372-
the `current_task` lock is not `release()`d or `acquire()`d as part of this
373-
process, the net effect is to directly transfer control flow from `do_call`
374-
back to `call_and_handle_blocking` without allowing other tasks to run (as
375-
if by `suspend` in [stack-switching]).
372+
Canonical ABI ensures will be called whenever there is a need to block on
373+
I/O (represented by the future `f`).
374+
3. If `on_block` is called, the first time it is called it will signal that
375+
the `callee` has `Blocked` before `await`ing the future. Because the
376+
`current_task` lock is not `release()`d , control flow is transferred
377+
directly from `on_block` to `call_and_handle_blocking` without allowing any
378+
other tasks to execute (as if by the `suspend` instruction of
379+
[stack-switching]).
376380
4. If `on_block` is called more than once, there is no longer a caller to
377-
directly switch to, so the `current_task` lock is `release()`d, just like
378-
in `default_on_block`, so that the Python async scheduler can pick another
379-
task to switch to.
381+
directly switch to, so the `current_task` lock is `release()`d so that the
382+
Python async scheduler can pick another task to switch to.
380383
5. If `do_call` finishes without `on_block` ever having been called, it
381-
resolves `blocking` to `False` to communicate this fact to the caller.
384+
resolves `blocking` to the (not-`Blocking`) return value of `callee`.
382385

383386
With these tricky primitives defined, the rest of the logic below can simply
384387
use `on_block` when there is a need to block and `call_and_handle_blocking`
@@ -616,7 +619,7 @@ tree.
616619
class Subtask(CallContext):
617620
ft: FuncType
618621
flat_args: CoreValueIter
619-
flat_results: Optional[list[any]]
622+
flat_results: Optional[list[Any]]
620623
state: CallState
621624
lenders: list[ResourceHandle]
622625
notify_supertask: bool
@@ -2147,25 +2150,25 @@ async def canon_lower(opts, ft, callee, task, flat_args):
21472150
async def do_call(on_block):
21482151
await callee(task, subtask.on_start, subtask.on_return, on_block)
21492152
[] = subtask.finish()
2150-
if await call_and_handle_blocking(do_call):
2151-
subtask.notify_supertask = True
2152-
task.need_to_drop += 1
2153-
i = task.inst.async_subtasks.add(subtask)
2154-
flat_results = [pack_async_result(i, subtask.state)]
2155-
else:
2156-
flat_results = [0]
2153+
match await call_and_handle_blocking(do_call):
2154+
case Blocked():
2155+
subtask.notify_supertask = True
2156+
task.need_to_drop += 1
2157+
i = task.inst.async_subtasks.add(subtask)
2158+
flat_results = [pack_async_result(i, subtask.state)]
2159+
case None:
2160+
flat_results = [0]
21572161
return flat_results
21582162
```
2159-
In the asynchronous case, `Task.call_and_handle_blocking` returns `True` if the
2160-
call to `do_call` blocks. In this blocking case, the `Subtask` is added to
2161-
stored in an instance-wide table and given an `i32` index that is later
2162-
returned by `task.wait` to indicate that the subtask made progress. The
2163-
`need_to_drop` increment is matched by a decrement in `canon_subtask_drop` and
2164-
ensures that all subtasks of a supertask are allowed to complete before the
2165-
supertask completes. The `notify_supertask` flag is set to tell `Subtask`
2166-
methods (below) to asynchronously notify the supertask of progress. Lastly,
2167-
the current state of the subtask is eagerly returned to the caller, packed
2168-
with the `i32` subtask index:
2163+
In the asynchronous case, if `do_call` blocks before `Subtask.finish`
2164+
(signalled by `callee` calling `on_block`), the `Subtask` is added to an
2165+
instance-wide table and given an `i32` index that is later returned by
2166+
`task.wait` to signal subtask's progress. The `need_to_drop` increment is
2167+
matched by a decrement in `canon_subtask_drop` and ensures that all subtasks
2168+
of a supertask are allowed to complete before the supertask completes. The
2169+
`notify_supertask` flag is set to tell `Subtask` methods (below) to
2170+
asynchronously notify the supertask of progress. Lastly, the current progress
2171+
of the subtask is returned to the caller, packed with the `i32` subtask index:
21692172
```python
21702173
def pack_async_result(i, state):
21712174
assert(0 < i < 2**30)

design/mvp/canonical-abi/definitions.py

+21-18
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from __future__ import annotations
88
from dataclasses import dataclass
99
from functools import partial
10-
from typing import Optional, Callable, Awaitable, Literal, MutableMapping, TypeVar, Generic
10+
from typing import Any, Optional, Callable, Awaitable, Literal, MutableMapping, TypeVar, Generic
1111
from enum import IntEnum
1212
import math
1313
import struct
@@ -304,7 +304,7 @@ class EventCode(IntEnum):
304304

305305
EventTuple = tuple[EventCode, int]
306306
EventCallback = Callable[[], EventTuple]
307-
OnBlockCallback = Callable[[Awaitable], any]
307+
OnBlockCallback = Callable[[Awaitable], Any]
308308

309309
current_task = asyncio.Lock()
310310
asyncio.run(current_task.acquire())
@@ -315,24 +315,26 @@ async def default_on_block(f):
315315
await current_task.acquire()
316316
return v
317317

318-
async def call_and_handle_blocking(callee):
319-
blocked = asyncio.Future()
318+
class Blocked: pass
319+
320+
async def call_and_handle_blocking(callee, *args) -> Blocked|Any:
321+
blocked_or_result = asyncio.Future[Blocked|Any]()
320322
async def on_block(f):
321-
if not blocked.done():
322-
blocked.set_result(True)
323+
if not blocked_or_result.done():
324+
blocked_or_result.set_result(Blocked())
323325
else:
324326
current_task.release()
325327
v = await f
326328
await current_task.acquire()
327329
return v
328330
async def do_call():
329-
await callee(on_block)
330-
if not blocked.done():
331-
blocked.set_result(False)
331+
result = await callee(*args, on_block)
332+
if not blocked_or_result.done():
333+
blocked_or_result.set_result(result)
332334
else:
333335
current_task.release()
334336
asyncio.create_task(do_call())
335-
return await blocked
337+
return await blocked_or_result
336338

337339
class Task(CallContext):
338340
ft: FuncType
@@ -457,7 +459,7 @@ def exit(self):
457459
class Subtask(CallContext):
458460
ft: FuncType
459461
flat_args: CoreValueIter
460-
flat_results: Optional[list[any]]
462+
flat_results: Optional[list[Any]]
461463
state: CallState
462464
lenders: list[ResourceHandle]
463465
notify_supertask: bool
@@ -1454,13 +1456,14 @@ async def canon_lower(opts, ft, callee, task, flat_args):
14541456
async def do_call(on_block):
14551457
await callee(task, subtask.on_start, subtask.on_return, on_block)
14561458
[] = subtask.finish()
1457-
if await call_and_handle_blocking(do_call):
1458-
subtask.notify_supertask = True
1459-
task.need_to_drop += 1
1460-
i = task.inst.async_subtasks.add(subtask)
1461-
flat_results = [pack_async_result(i, subtask.state)]
1462-
else:
1463-
flat_results = [0]
1459+
match await call_and_handle_blocking(do_call):
1460+
case Blocked():
1461+
subtask.notify_supertask = True
1462+
task.need_to_drop += 1
1463+
i = task.inst.async_subtasks.add(subtask)
1464+
flat_results = [pack_async_result(i, subtask.state)]
1465+
case None:
1466+
flat_results = [0]
14641467
return flat_results
14651468

14661469
def pack_async_result(i, state):

0 commit comments

Comments
 (0)