diff --git a/crates/bindings-csharp/Runtime/Internal/FFI.cs b/crates/bindings-csharp/Runtime/Internal/FFI.cs index 626883902fa..5d0c0516b06 100644 --- a/crates/bindings-csharp/Runtime/Internal/FFI.cs +++ b/crates/bindings-csharp/Runtime/Internal/FFI.cs @@ -52,8 +52,33 @@ internal static partial class FFI #endif ; + public static void Check(this Errno status) + { + if (status == Errno.OK) + { + return; + } + throw status switch + { + Errno.NOT_IN_TRANSACTION => new NotInTransactionException(), + Errno.BSATN_DECODE_ERROR => new BsatnDecodeException(), + Errno.NO_SUCH_TABLE => new NoSuchTableException(), + Errno.NO_SUCH_INDEX => new NoSuchIndexException(), + Errno.NO_SUCH_ITER => new NoSuchIterException(), + Errno.NO_SUCH_CONSOLE_TIMER => new NoSuchLogStopwatch(), + Errno.NO_SUCH_BYTES => new NoSuchBytesException(), + Errno.NO_SPACE => new NoSpaceException(), + Errno.BUFFER_TOO_SMALL => new BufferTooSmallException(), + Errno.UNIQUE_ALREADY_EXISTS => new UniqueConstraintViolationException(), + Errno.SCHEDULE_AT_DELAY_TOO_LONG => new ScheduleAtDelayTooLongException(), + Errno.INDEX_NOT_UNIQUE => new IndexNotUniqueException(), + Errno.NO_SUCH_ROW => new NoSuchRowException(), + _ => new UnknownException(status), + }; + } + [NativeMarshalling(typeof(Marshaller))] - public struct CheckedStatus + internal struct CheckedStatus { // This custom marshaller takes care of checking the status code // returned from the host and throwing an exception if it's not 0. @@ -69,27 +94,8 @@ internal static class Marshaller { public static CheckedStatus ConvertToManaged(Errno status) { - if (status == 0) - { - return default; - } - throw status switch - { - Errno.NOT_IN_TRANSACTION => new NotInTransactionException(), - Errno.BSATN_DECODE_ERROR => new BsatnDecodeException(), - Errno.NO_SUCH_TABLE => new NoSuchTableException(), - Errno.NO_SUCH_INDEX => new NoSuchIndexException(), - Errno.NO_SUCH_ITER => new NoSuchIterException(), - Errno.NO_SUCH_CONSOLE_TIMER => new NoSuchLogStopwatch(), - Errno.NO_SUCH_BYTES => new NoSuchBytesException(), - Errno.NO_SPACE => new NoSpaceException(), - Errno.BUFFER_TOO_SMALL => new BufferTooSmallException(), - Errno.UNIQUE_ALREADY_EXISTS => new UniqueConstraintViolationException(), - Errno.SCHEDULE_AT_DELAY_TOO_LONG => new ScheduleAtDelayTooLongException(), - Errno.INDEX_NOT_UNIQUE => new IndexNotUniqueException(), - Errno.NO_SUCH_ROW => new NoSuchRowException(), - _ => new UnknownException(status), - }; + status.Check(); + return default; } } } diff --git a/crates/bindings-csharp/Runtime/Internal/IIndex.cs b/crates/bindings-csharp/Runtime/Internal/IIndex.cs index 5abc12620e4..e6e2956011d 100644 --- a/crates/bindings-csharp/Runtime/Internal/IIndex.cs +++ b/crates/bindings-csharp/Runtime/Internal/IIndex.cs @@ -41,7 +41,7 @@ out ReadOnlySpan rend } protected IEnumerable DoFilter(Bounds bounds) - where Bounds : IBTreeIndexBounds => new RawTableIter(indexId, bounds).Parse(); + where Bounds : IBTreeIndexBounds => new RawTableIter(indexId, bounds); protected uint DoDelete(Bounds bounds) where Bounds : IBTreeIndexBounds diff --git a/crates/bindings-csharp/Runtime/Internal/ITable.cs b/crates/bindings-csharp/Runtime/Internal/ITable.cs index 7a1bca01a1e..dec1ec9af51 100644 --- a/crates/bindings-csharp/Runtime/Internal/ITable.cs +++ b/crates/bindings-csharp/Runtime/Internal/ITable.cs @@ -1,95 +1,71 @@ namespace SpacetimeDB.Internal; +using System.Buffers; +using System.Collections; using SpacetimeDB.BSATN; -internal abstract class RawTableIterBase +internal abstract class RawTableIterBase : IEnumerable where T : IStructuralReadWrite, new() { - public sealed class Enumerator(FFI.RowIter handle) : IDisposable - { - byte[] buffer = new byte[0x20_000]; - public byte[] Current { get; private set; } = []; + protected abstract void IterStart(out FFI.RowIter handle); - public bool MoveNext() + public IEnumerator GetEnumerator() + { + IterStart(out var handle); + // Initial buffer size to match Rust one (see `DEFAULT_BUFFER_CAPACITY` in `bindings/src/lib.rs`). + // Use pool to reduce GC pressure between iterations. + var buffer = ArrayPool.Shared.Rent(0x10_000); + try { - if (handle == FFI.RowIter.INVALID) - { - return false; - } - - uint buffer_len; - while (true) + while (handle != FFI.RowIter.INVALID) { - buffer_len = (uint)buffer.Length; + var buffer_len = (uint)buffer.Length; var ret = FFI.row_iter_bsatn_advance(handle, buffer, ref buffer_len); - if (ret == Errno.EXHAUSTED) - { - handle = FFI.RowIter.INVALID; - } // On success, the only way `buffer_len == 0` is for the iterator to be exhausted. // This happens when the host iterator was empty from the start. System.Diagnostics.Debug.Assert(!(ret == Errno.OK && buffer_len == 0)); switch (ret) { - // Iterator advanced and may also be `EXHAUSTED`. - // When `OK`, we'll need to advance the iterator in the next call to `MoveNext`. - // In both cases, copy over the row data to `Current` from the scratch `buffer`. - case Errno.EXHAUSTED - or Errno.OK: - Current = new byte[buffer_len]; - Array.Copy(buffer, 0, Current, 0, buffer_len); - return buffer_len != 0; - // Couldn't find the iterator, error! - case Errno.NO_SUCH_ITER: - throw new NoSuchIterException(); + // Iterator is exhausted. + // Treat in the same way as OK, just tell the next iteration to stop. + case Errno.EXHAUSTED: + handle = FFI.RowIter.INVALID; + goto case Errno.OK; + // We got a chunk of rows, parse all of them before moving to the next chunk. + case Errno.OK: + { + using var stream = new MemoryStream(buffer, 0, (int)buffer_len); + using var reader = new BinaryReader(stream); + while (stream.Position < stream.Length) + { + yield return IStructuralReadWrite.Read(reader); + } + break; + } // The scratch `buffer` is too small to fit a row / chunk. // Grow `buffer` and try again. // The `buffer_len` will have been updated with the necessary size. case Errno.BUFFER_TOO_SMALL: - buffer = new byte[buffer_len]; - continue; + ArrayPool.Shared.Return(buffer); + buffer = ArrayPool.Shared.Rent((int)buffer_len); + break; default: - throw new UnknownException(ret); + ret.Check(); + break; } } } - - public void Dispose() + finally { if (handle != FFI.RowIter.INVALID) { FFI.row_iter_bsatn_close(handle); - handle = FFI.RowIter.INVALID; } - } - - public void Reset() - { - throw new NotImplementedException(); + ArrayPool.Shared.Return(buffer); } } - protected abstract void IterStart(out FFI.RowIter handle); - - // Note: using the GetEnumerator() duck-typing protocol instead of IEnumerable to avoid extra boxing. - public Enumerator GetEnumerator() - { - IterStart(out var handle); - return new(handle); - } - - public IEnumerable Parse() - { - foreach (var chunk in this) - { - using var stream = new MemoryStream(chunk); - using var reader = new BinaryReader(stream); - while (stream.Position < stream.Length) - { - yield return IStructuralReadWrite.Read(reader); - } - } - } + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); } public interface ITableView @@ -136,7 +112,7 @@ protected static ulong DoCount() return count; } - protected static IEnumerable DoIter() => new RawTableIter(tableId).Parse(); + protected static IEnumerable DoIter() => new RawTableIter(tableId); protected static T DoInsert(T row) { diff --git a/crates/bindings-csharp/Runtime/Internal/Module.cs b/crates/bindings-csharp/Runtime/Internal/Module.cs index 9d62a2e14b1..e3708c2b2f0 100644 --- a/crates/bindings-csharp/Runtime/Internal/Module.cs +++ b/crates/bindings-csharp/Runtime/Internal/Module.cs @@ -95,13 +95,12 @@ public static void RegisterTable() moduleDef.RegisterTable(View.MakeTableDesc(typeRegistrar)); } - private static byte[] Consume(this BytesSource source) + private static MemoryStream Consume(this BytesSource source, ref byte[] buffer) { if (source == BytesSource.INVALID) { - return []; + return new(); } - var buffer = new byte[0x20_000]; var written = 0U; while (true) { @@ -114,8 +113,7 @@ private static byte[] Consume(this BytesSource source) { // Host side source exhausted, we're done. case Errno.EXHAUSTED: - Array.Resize(ref buffer, (int)written); - return buffer; + return new(buffer, 0, (int)written); // Wrote the entire spare capacity. // Need to reserve more space in the buffer. case Errno.OK when written == buffer.Length: @@ -126,11 +124,8 @@ private static byte[] Consume(this BytesSource source) // The host will likely not trigger this branch (current host doesn't), // but a module should be prepared for it. case Errno.OK: + ret.Check(); break; - case Errno.NO_SUCH_BYTES: - throw new NoSuchBytesException(); - default: - throw new UnknownException(ret); } } } @@ -164,6 +159,13 @@ public static void __describe_module__(BytesSink description) } } + // Note: `__call_reducer__` can't be invoked in parallel because we don't support multithreading in Wasm, + // nor is it supposed to be invoked recursively. + // + // This means we can reuse the same argument buffer for all `__call_reducer__` invocations - + // unlike in e.g. iterators, where multiple iterators can easily exist at the same time. + private static byte[] reducerArgsBuffer = new byte[0x10_000]; + public static Errno __call_reducer__( uint id, ulong sender_0, @@ -190,7 +192,7 @@ BytesSink error var ctx = newContext!(senderIdentity, senderAddress, random, time); - using var stream = new MemoryStream(args.Consume()); + using var stream = args.Consume(ref reducerArgsBuffer); using var reader = new BinaryReader(stream); reducers[(int)id].Invoke(reader, ctx); if (stream.Position != stream.Length)