Skip to content

Commit

Permalink
Merge branch 'jgilles/unsubscribe-fix' of https://github.com/clockwor…
Browse files Browse the repository at this point in the history
…klabs/com.clockworklabs.spacetimedbsdk into jgilles/unsubscribe-fix
  • Loading branch information
rekhoff committed Feb 21, 2025
2 parents ff10925 + 2fde31b commit 3854092
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 10 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/check-pr-base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ permissions: read-all

jobs:
check_base_ref:
name: Only release branches may merge into master
name: Release branch restriction
runs-on: ubuntu-latest
steps:
- id: not_based_on_master
if: |
github.event_name == 'pull_request' &&
github.event.pull_request.base.ref == 'master' &&
github.event.pull_request.base.ref == 'release/latest' &&
! startsWith(github.event.pull_request.head.ref, 'release/')
run: |
echo 'Only `release/*` branches are allowed to merge into `master`.'
echo 'Maybe your PR should be merging into `staging`?'
echo 'Only `release/*` branches are allowed to merge into the release branch `release/latest`.'
echo 'Maybe you want to change your PR base to `master`?'
exit 1
38 changes: 32 additions & 6 deletions src/MultiDictionary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public static MultiDictionary<TKey, TValue> FromEnumerable(IEnumerable<KeyValueP

/// <summary>
/// Add a key-value-pair to the multidictionary.
/// If the key is already present, its associated value must satisfy value.Equals(item.Value).
/// If the key is already present, its associated value must satisfy
/// keyComparer.Equals(value, item.Value).
/// </summary>
/// <param name="item"></param>
/// <returns>Whether the key is entirely new to the dictionary. If it was already present, we assert that the old value is equal to the new value.</returns>
Expand Down Expand Up @@ -136,7 +137,7 @@ public bool Equals(MultiDictionary<TKey, TValue> other)
if (other.RawDict.TryGetValue(key, out var otherVM))
{
var (otherValue, otherMultiplicity) = otherVM;
if (!(value != null && value.Equals(otherValue) && multiplicity == otherMultiplicity))
if (!(ValueComparer.Equals(value, otherValue) && multiplicity == otherMultiplicity))
{
return false;
}
Expand Down Expand Up @@ -290,11 +291,18 @@ public override string ToString()
}

/// <summary>
/// A delta between two multidictionaries.
/// A bulk change to a multidictionary. Allows both adding and removing rows.
///
/// Can be applied to a multidictionary, and also inspected before application to see
/// what rows will be deleted. (This is used for OnBeforeDelete.)
///
/// Curiously, the order of operations applied to a MultiDictionaryDelta does not matter.
/// No matter the order of Add and Remove calls on a delta, when the Delta is applied,
/// the result will be the same, as long as the Add and Remove *counts* for each KeyValuePair are
/// the same.
/// (This means that this is a "conflict-free replicated data type", unlike MultiDictionary.)
/// (MultiDictionary would also be "conflict-free" if it didn't support Remove.)
///
/// The delta may include value updates.
/// A value can be updated multiple times, but each update must set the result to the same value.
/// When applying a delta, if the target multidictionary has multiple copies of (key, value) pair,
Expand All @@ -305,12 +313,12 @@ public override string ToString()
/// </summary>
/// <typeparam name="TKey"></typeparam>
/// <typeparam name="TValue"></typeparam>
internal struct MultiDictionaryDelta<TKey, TValue>
internal struct MultiDictionaryDelta<TKey, TValue> : IEquatable<MultiDictionaryDelta<TKey, TValue>>
{
/// <summary>
/// For each key, track its NEW value (or old value, but only if we have never seen the new value).
/// Also track the number of times it has been removed and inserted.
/// We keep these separate so that we can track that a KVP has been removed enough times (in case
/// We keep these separate so that we can debug-assert that a KVP has been removed enough times (in case
/// there are multiple copies of the KVP in the map we get applied to.)
/// </summary>
readonly Dictionary<TKey, (TValue Value, uint Removes, uint Inserts)> RawDict;
Expand All @@ -333,7 +341,8 @@ public MultiDictionaryDelta(IEqualityComparer<TKey> keyComparer, IEqualityCompar

/// <summary>
/// Add a key-value-pair to the multidictionary.
/// If the key is already present, its associated value must satisfy value.Equals(item.Value).
/// If the key is already present, its associated value must satisfy
/// keyComparer.Equals(value, item.Value).
/// </summary>
/// <param name="item"></param>
public void Add(TKey key, TValue value)
Expand Down Expand Up @@ -397,6 +406,23 @@ public override string ToString()
return result.ToString();
}

public bool Equals(MultiDictionaryDelta<TKey, TValue> other)
{
foreach (var item in RawDict)
{
var (key, my) = item;
if (other.RawDict.TryGetValue(key, out var their))
{
if (!(ValueComparer.Equals(my.Value, their.Value) && my.Inserts == their.Inserts && my.Removes == their.Removes))
{
return false;
}
}
}

return true;
}

public readonly IEnumerable<KeyValuePair<TKey, (TValue Value, uint Removes, uint Inserts)>> Entries
{
get
Expand Down
4 changes: 4 additions & 0 deletions src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,10 @@ void PreProcessTable(IRemoteTableHandle table, TableUpdate update, ProcessedData
foreach (var cqu in update.Updates)
{
var qu = DecompressDecodeQueryUpdate(cqu);

// Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once
// to the table, it doesn't matter that we call Add before Remove here.

foreach (var bin in BsatnRowListIter(qu.Inserts))
{
var obj = Decode(table, bin, out var pk);
Expand Down
37 changes: 37 additions & 0 deletions tests~/MultiDictionaryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,43 @@ public void Removals()
});
}

// Check that MultiDictionaryDelta is in fact a CRDT.
[Fact]
public void ShuffleDelta()
{
ListWithRemovals(Gen.Byte[1, 10], Gen.Byte[1, 10], EqualityComparer<byte>.Default).Sample((list, removals) =>
{
var m1 = new MultiDictionaryDelta<byte, byte>(EqualityComparer<byte>.Default, EqualityComparer<byte>.Default);
var m2 = new MultiDictionaryDelta<byte, byte>(EqualityComparer<byte>.Default, EqualityComparer<byte>.Default);
var listRemovals = list.Zip(removals).ToList();
foreach (var (kvp, remove) in listRemovals)
{
if (remove)
{
m1.Remove(kvp.Key, kvp.Value);
}
else
{
m1.Add(kvp.Key, kvp.Value);
}
}
Gen.Shuffle(listRemovals);
foreach (var (kvp, remove) in listRemovals)
{
if (remove)
{
m2.Remove(kvp.Key, kvp.Value);
}
else
{
m2.Add(kvp.Key, kvp.Value);
}
}

Assert.Equal(m1, m2);
});
}

// Note: this does not check proper batch updates yet, since I wasn't sure how to randomly generate them properly.
[Fact]
public void ChunkedRemovals()
Expand Down

0 comments on commit 3854092

Please sign in to comment.