Skip to content

Commit

Permalink
Fix handling of multi-tables
Browse files Browse the repository at this point in the history
Unfortunately, none of our tests currently cover this, but while working on the V9 upgrade, I noticed that this code still relies on `type(Row)` as a unique table identifier.

That no longer holds with multi-tables as several tables can share the same `Row` type. In that case, subscription updates would be grouped incorrectly and always applied to the same first table that uses `Row` for its data storage.

This PR fixes that by using the table handle itself as a key (compared by reference).

If transaction updates are already grouped uniquely by table, it should be possible to simplify this code much further, but I'm not sure if such guarantee exists, so leaving that untouched.
  • Loading branch information
RReverser committed Jan 29, 2025
1 parent f47ebe8 commit 9696cd3
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 52 deletions.
2 changes: 0 additions & 2 deletions src/ClientCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,5 @@ public void AddTable<Row>(string name, IRemoteTableHandle table)
Log.Error($"We don't know that this table is: {name}");
return null;
}

internal IEnumerable<IRemoteTableHandle> GetTables() => tables.Values;
}
}
82 changes: 32 additions & 50 deletions src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using SpacetimeDB.BSATN;
using SpacetimeDB.Internal;
using SpacetimeDB.ClientApi;
using Thread = System.Threading.Thread;
using System.Diagnostics;

namespace SpacetimeDB
{
Expand Down Expand Up @@ -225,7 +225,7 @@ struct ProcessedMessage
struct PreProcessedMessage
{
public ProcessedMessage processed;
public Dictionary<Type, HashSet<byte[]>>? subscriptionInserts;
public Dictionary<IRemoteTableHandle, HashSet<byte[]>>? subscriptionInserts;
}

private readonly BlockingCollection<UnprocessedMessage> _messageQueue =
Expand Down Expand Up @@ -350,37 +350,33 @@ void PreProcessMessages()
{
var tableName = update.TableName;
var table = clientDB.GetTable(tableName);
if (table == null)
{
Log.Error($"Unknown table name: {tableName}");
continue;
}
if (table == null) continue;
yield return (table, update);
}
}

(List<DbOp>, Dictionary<System.Type, HashSet<byte[]>>) PreProcessLegacySubscription(InitialSubscription initSub)
(List<DbOp>, Dictionary<IRemoteTableHandle, HashSet<byte[]>>) PreProcessLegacySubscription(InitialSubscription initSub)
{
var dbOps = new List<DbOp>();
// This is all of the inserts
int cap = initSub.DatabaseUpdate.Tables.Sum(a => (int)a.NumRows);
// FIXME: shouldn't this be `new(initSub.DatabaseUpdate.Tables.Length)` ?
Dictionary<System.Type, HashSet<byte[]>> subscriptionInserts = new(capacity: cap);
Dictionary<IRemoteTableHandle, HashSet<byte[]>> subscriptionInserts = new(capacity: cap);

HashSet<byte[]> GetInsertHashSet(System.Type tableType, int tableSize)
HashSet<byte[]> GetInsertHashSet(IRemoteTableHandle table, int tableSize)
{
if (!subscriptionInserts.TryGetValue(tableType, out var hashSet))
if (!subscriptionInserts.TryGetValue(table, out var hashSet))
{
hashSet = new HashSet<byte[]>(capacity: tableSize, comparer: ByteArrayComparer.Instance);
subscriptionInserts[tableType] = hashSet;
subscriptionInserts[table] = hashSet;
}
return hashSet;
}

// First apply all of the state
foreach (var (table, update) in GetTables(initSub.DatabaseUpdate))
{
var hashSet = GetInsertHashSet(table.ClientTableType, (int)update.NumRows);
var hashSet = GetInsertHashSet(table, (int)update.NumRows);

PreProcessInsertOnlyTable(table, update, dbOps, hashSet);
}
Expand All @@ -391,22 +387,27 @@ HashSet<byte[]> GetInsertHashSet(System.Type tableType, int tableSize)
/// TODO: the dictionary is here for backwards compatibility and can be removed
/// once we get rid of legacy subscriptions.
/// </summary>
(List<DbOp>, Dictionary<System.Type, HashSet<byte[]>>) PreProcessSubscribeApplied(SubscribeApplied subscribeApplied)
(List<DbOp>, Dictionary<IRemoteTableHandle, HashSet<byte[]>>) PreProcessSubscribeApplied(SubscribeApplied subscribeApplied)
{
var table = clientDB.GetTable(subscribeApplied.Rows.TableName) ?? throw new Exception($"Unknown table name: {subscribeApplied.Rows.TableName}");
var dbOps = new List<DbOp>();
HashSet<byte[]> inserts = new();
HashSet<byte[]> inserts = new(comparer: ByteArrayComparer.Instance);

PreProcessInsertOnlyTable(table, subscribeApplied.Rows.TableRows, dbOps, inserts);

var result = new Dictionary<System.Type, HashSet<byte[]>>();
result[table.ClientTableType] = inserts;
var result = new Dictionary<IRemoteTableHandle, HashSet<byte[]>>
{
[table] = inserts
};

return (dbOps, result);
}

void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, List<DbOp> dbOps, HashSet<byte[]> inserts)
{
// In debug mode, make sure we use a byte array comparer in HashSet and not a reference-equal `byte[]` by accident.
Debug.Assert(ReferenceEquals(inserts.Comparer, ByteArrayComparer.Instance));

foreach (var cqu in update.Updates)
{
var qu = DecompressDecodeQueryUpdate(cqu);
Expand All @@ -430,7 +431,6 @@ void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, Lis
dbOps.Add(op);
}
}

}


Expand Down Expand Up @@ -472,8 +472,10 @@ List<DbOp> PreProcessDatabaseUpdate(DatabaseUpdate updates)
{
var dbOps = new List<DbOp>();

// All row updates that have a primary key, this contains inserts, deletes and updates
var primaryKeyChanges = new Dictionary<(System.Type tableType, object primaryKeyValue), DbOp>();
// All row updates that have a primary key, this contains inserts, deletes and updates.
// TODO: is there any guarantee that transaction update contains each table only once, aka updates are already grouped by table?
// If so, we could simplify this and other methods by moving the dictionary inside the main loop and using only the primary key as key.
var primaryKeyChanges = new Dictionary<(IRemoteTableHandle table, object primaryKeyValue), DbOp>();

// First apply all of the state
foreach (var (table, update) in GetTables(updates))
Expand All @@ -487,26 +489,20 @@ List<DbOp> PreProcessDatabaseUpdate(DatabaseUpdate updates)
if (pk != null)
{
// Compound key that we use for lookup.
// Consists of type of the table (for faster comparison that string names) + actual primary key of the row.
var key = (table.ClientTableType, pk);
// Consists of the table handle (for faster comparison that string names) + actual primary key of the row.
var key = (table, pk);

if (primaryKeyChanges.TryGetValue(key, out var oldOp))
{
if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null))
if (oldOp.insert is not null)
{
Log.Warn($"Update with the same primary key was applied multiple times! tableName={update.TableName}");
// TODO(jdetter): Is this a correctable error? This would be a major error on the
// SpacetimeDB side.
continue;
}

var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op);
op = new DbOp
{
table = insertOp.table,
delete = deleteOp.delete,
insert = insertOp.insert,
};
op.delete = oldOp.delete;
}
primaryKeyChanges[key] = op;
}
Expand All @@ -522,26 +518,20 @@ List<DbOp> PreProcessDatabaseUpdate(DatabaseUpdate updates)
if (pk != null)
{
// Compound key that we use for lookup.
// Consists of type of the table (for faster comparison that string names) + actual primary key of the row.
var key = (table.ClientTableType, pk);
// Consists of the table handle (for faster comparison that string names) + actual primary key of the row.
var key = (table, pk);

if (primaryKeyChanges.TryGetValue(key, out var oldOp))
{
if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null))
if (oldOp.delete is not null)
{
Log.Warn($"Update with the same primary key was applied multiple times! tableName={update.TableName}");
// TODO(jdetter): Is this a correctable error? This would be a major error on the
// SpacetimeDB side.
continue;
}

var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op);
op = new DbOp
{
table = insertOp.table,
delete = deleteOp.delete,
insert = insertOp.insert,
};
op.insert = oldOp.insert;
}
primaryKeyChanges[key] = op;
}
Expand Down Expand Up @@ -580,7 +570,7 @@ PreProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed)
ReducerEvent<Reducer>? reducerEvent = default;

// This is all of the inserts, used for updating the stale but un-cleared client cache.
Dictionary<System.Type, HashSet<byte[]>>? subscriptionInserts = null;
Dictionary<IRemoteTableHandle, HashSet<byte[]>>? subscriptionInserts = null;

switch (message)
{
Expand Down Expand Up @@ -652,16 +642,8 @@ ProcessedMessage CalculateStateDiff(PreProcessedMessage preProcessedMessage)
// the client cache.
if (preProcessedMessage.subscriptionInserts is { } subscriptionInserts)
{
foreach (var table in clientDB.GetTables())
foreach (var (table, hashSet) in subscriptionInserts)
{
if (!subscriptionInserts.TryGetValue(table.ClientTableType, out var hashSet))
{
// We don't know if the user is waiting for subscriptions on other tables.
// Leave the stale data for untouched tables in the cache; this is
// the best we can do.
continue;
}

foreach (var (rowBytes, oldValue) in table.IterEntries().Where(kv => !hashSet.Contains(kv.Key)))
{
processed.dbOps.Add(new DbOp
Expand Down

0 comments on commit 9696cd3

Please sign in to comment.