Skip to content

Commit 82c2ef6

Browse files
committed
Allow stream.{read,write}s of length 0 to query/signal readiness
1 parent a52fc75 commit 82c2ef6

File tree

4 files changed

+92
-32
lines changed

4 files changed

+92
-32
lines changed

design/mvp/Async.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,11 @@ These built-ins can either return immediately if >0 elements were able to be
327327
written or read immediately (without blocking) or return a sentinel "blocked"
328328
value indicating that the read or write will execute concurrently. The readable
329329
and writable ends of streams and futures can then be [waited](#waiting) on to
330-
make progress.
330+
make progress. Notification of progress signals *completion* of a read or write
331+
(i.e., the bytes have already been copied into the buffer). Additionally,
332+
*readiness* (to perform a read or write in the future) can be queried and
333+
signalled by performing a `0`-length read or write (see the [Stream State]
334+
section in the Canonical ABI explainer for details).
331335

332336
The `T` element type of streams and futures is optional, such that `future` and
333337
`stream` can be written in WIT without a trailing `<T>`. In this case, the

design/mvp/CanonicalABI.md

+42-15
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,8 @@ class BufferGuestImpl(Buffer):
364364
length: int
365365

366366
def __init__(self, t, cx, ptr, length):
367-
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
368-
if t:
367+
trap_if(length > Buffer.MAX_LENGTH)
368+
if t and length > 0:
369369
trap_if(ptr != align_to(ptr, alignment(t)))
370370
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
371371
self.cx = cx
@@ -1104,9 +1104,12 @@ class ReadableStreamGuestImpl(ReadableStream):
11041104
self.reset_pending()
11051105

11061106
def reset_pending(self):
1107-
self.pending_buffer = None
1108-
self.pending_on_partial_copy = None
1109-
self.pending_on_copy_done = None
1107+
self.set_pending(None, None, None)
1108+
1109+
def set_pending(self, buffer, on_partial_copy, on_copy_done):
1110+
self.pending_buffer = buffer
1111+
self.pending_on_partial_copy = on_partial_copy
1112+
self.pending_on_copy_done = on_copy_done
11101113
```
11111114
The `impl` field records the component instance that created this stream and is
11121115
used by `lower_stream` below.
@@ -1168,20 +1171,44 @@ but in the opposite direction. Both are implemented by a single underlying
11681171
if self.closed_:
11691172
return 'done'
11701173
elif not self.pending_buffer:
1171-
self.pending_buffer = buffer
1172-
self.pending_on_partial_copy = on_partial_copy
1173-
self.pending_on_copy_done = on_copy_done
1174+
self.set_pending(buffer, on_partial_copy, on_copy_done)
11741175
return 'blocked'
11751176
else:
1176-
ncopy = min(src.remain(), dst.remain())
1177-
assert(ncopy > 0)
1178-
dst.write(src.read(ncopy))
11791177
if self.pending_buffer.remain() > 0:
1180-
self.pending_on_partial_copy(self.reset_pending)
1178+
if buffer.remain() > 0:
1179+
dst.write(src.read(min(src.remain(), dst.remain())))
1180+
if self.pending_buffer.remain() > 0:
1181+
self.pending_on_partial_copy(self.reset_pending)
1182+
else:
1183+
self.reset_and_notify_pending()
1184+
return 'done'
11811185
else:
1182-
self.reset_and_notify_pending()
1183-
return 'done'
1184-
```
1186+
if buffer.remain() > 0 or buffer is dst:
1187+
self.reset_and_notify_pending()
1188+
self.set_pending(buffer, on_partial_copy, on_copy_done)
1189+
return 'blocked'
1190+
else:
1191+
return 'done'
1192+
```
1193+
The meaning of a `read` or `write` when the length is `0` is that the caller is
1194+
querying the "readiness" of the other side. When a `0`-length read/write
1195+
rendezvous with a non-`0`-length read/write, only the `0`-length read/write
1196+
completes; the non-`0`-length read/write is kept pending (and ready for a
1197+
subsequent rendezvous).
1198+
1199+
In the corner case where a `0`-length read *and* write rendezvous, only the
1200+
*writer* is notified of readiness. To avoid livelock, the Canonical ABI
1201+
requires that a writer *must* (eventually) follow a completed `0`-length write
1202+
with a non-`0`-length write that is allowed to block (allowing the reader end
1203+
to run and rendezvous with its own non-`0`-length read). To implement a
1204+
traditional `O_NONBLOCK` `write()` or `sendmsg()` API, a writer can use a
1205+
buffering scheme in which, after `select()` (or a similar API) signals a file
1206+
descriptor is ready to write, the next `O_NONBLOCK` `write()`/`sendmsg()` on
1207+
that file descriptor copies to an internal buffer and suceeds, issuing an
1208+
`async` `stream.write` in the background and waiting for completion before
1209+
signalling readiness again. Note that buffering only occurs when streaming
1210+
between two components using non-blocking I/O; if either side is the host or a
1211+
component using blocking or completion-based I/O, no buffering is necessary.
11851212

11861213
Given the above, we can define the `{Readable,Writable}StreamEnd` classes that
11871214
are actually stored in the `waitables` table. The classes are almost entirely

design/mvp/canonical-abi/definitions.py

+22-14
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,8 @@ class BufferGuestImpl(Buffer):
311311
length: int
312312

313313
def __init__(self, t, cx, ptr, length):
314-
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
315-
if t:
314+
trap_if(length > Buffer.MAX_LENGTH)
315+
if t and length > 0:
316316
trap_if(ptr != align_to(ptr, alignment(t)))
317317
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
318318
self.cx = cx
@@ -660,9 +660,12 @@ def __init__(self, t, inst):
660660
self.reset_pending()
661661

662662
def reset_pending(self):
663-
self.pending_buffer = None
664-
self.pending_on_partial_copy = None
665-
self.pending_on_copy_done = None
663+
self.set_pending(None, None, None)
664+
665+
def set_pending(self, buffer, on_partial_copy, on_copy_done):
666+
self.pending_buffer = buffer
667+
self.pending_on_partial_copy = on_partial_copy
668+
self.pending_on_copy_done = on_copy_done
666669

667670
def reset_and_notify_pending(self):
668671
pending_on_copy_done = self.pending_on_copy_done
@@ -696,19 +699,24 @@ def copy(self, buffer, on_partial_copy, on_copy_done, src, dst):
696699
if self.closed_:
697700
return 'done'
698701
elif not self.pending_buffer:
699-
self.pending_buffer = buffer
700-
self.pending_on_partial_copy = on_partial_copy
701-
self.pending_on_copy_done = on_copy_done
702+
self.set_pending(buffer, on_partial_copy, on_copy_done)
702703
return 'blocked'
703704
else:
704-
ncopy = min(src.remain(), dst.remain())
705-
assert(ncopy > 0)
706-
dst.write(src.read(ncopy))
707705
if self.pending_buffer.remain() > 0:
708-
self.pending_on_partial_copy(self.reset_pending)
706+
if buffer.remain() > 0:
707+
dst.write(src.read(min(src.remain(), dst.remain())))
708+
if self.pending_buffer.remain() > 0:
709+
self.pending_on_partial_copy(self.reset_pending)
710+
else:
711+
self.reset_and_notify_pending()
712+
return 'done'
709713
else:
710-
self.reset_and_notify_pending()
711-
return 'done'
714+
if buffer.remain() > 0 or buffer is dst:
715+
self.reset_and_notify_pending()
716+
self.set_pending(buffer, on_partial_copy, on_copy_done)
717+
return 'blocked'
718+
else:
719+
return 'done'
712720

713721
class StreamEnd(Waitable):
714722
stream: ReadableStream

design/mvp/canonical-abi/run_tests.py

+23-2
Original file line numberDiff line numberDiff line change
@@ -1457,8 +1457,19 @@ async def core_func1(task, args):
14571457
assert(mem1[retp+0] == wsi)
14581458
assert(mem1[retp+4] == 4)
14591459

1460+
[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
1461+
assert(ret == definitions.BLOCKED)
1462+
14601463
fut4.set_result(None)
14611464

1465+
[event] = await canon_waitable_set_wait(False, mem1, task, seti, retp)
1466+
assert(event == EventCode.STREAM_WRITE)
1467+
assert(mem1[retp+0] == wsi)
1468+
assert(mem1[retp+4] == 0)
1469+
1470+
[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
1471+
assert(ret == 0)
1472+
14621473
[errctxi] = await canon_error_context_new(opts1, task, 0, 0)
14631474
[] = await canon_stream_close_writable(U8Type(), task, wsi, errctxi)
14641475
[] = await canon_waitable_set_drop(task, seti)
@@ -1498,6 +1509,9 @@ async def core_func2(task, args):
14981509
fut2.set_result(None)
14991510
await task.on_block(fut3)
15001511

1512+
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
1513+
assert(ret == 0)
1514+
15011515
mem2[0:8] = bytes(8)
15021516
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
15031517
assert(ret == 2)
@@ -1508,9 +1522,16 @@ async def core_func2(task, args):
15081522

15091523
await task.on_block(fut4)
15101524

1511-
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
1525+
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
1526+
assert(ret == definitions.BLOCKED)
1527+
1528+
[event] = await canon_waitable_set_wait(False, mem2, task, seti, retp)
1529+
assert(event == EventCode.STREAM_READ)
1530+
assert(mem2[retp+0] == rsi)
1531+
p2 = int.from_bytes(mem2[retp+4 : retp+8], 'little', signed=False)
15121532
errctxi = 1
1513-
assert(ret == (definitions.CLOSED | errctxi))
1533+
assert(p2 == (definitions.CLOSED | errctxi))
1534+
15141535
[] = await canon_stream_close_readable(U8Type(), task, rsi, 0)
15151536
[] = await canon_waitable_set_drop(task, seti)
15161537
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)

0 commit comments

Comments
 (0)