Skip to content

Commit

Permalink
Merge branch 'main' into request/SQS-DataAdapter
Browse files Browse the repository at this point in the history
  • Loading branch information
jamescarter-le authored Feb 16, 2024
2 parents b8aface + 374ab20 commit 5087395
Show file tree
Hide file tree
Showing 18 changed files with 302 additions and 290 deletions.
310 changes: 68 additions & 242 deletions .github/fabricbot.json

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="3.35.4" />
<PackageVersion Include="Microsoft.Build" Version="17.7.2" />
<PackageVersion Include="Microsoft.CSharp" Version="4.7.0" />
<PackageVersion Include="Microsoft.CodeAnalysis" Version="4.8.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.8.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.8.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.8.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Workspaces.Common" Version="4.8.0" />
<PackageVersion Include="Microsoft.CodeAnalysis" Version="4.5.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.5.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.5.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.5.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Workspaces.Common" Version="4.5.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="3.11.0-beta1.23472.1" />
<PackageVersion Include="Microsoft.DotNet.PlatformAbstractions" Version="3.1.6" />
<PackageVersion Include="Microsoft.AspNetCore.Connections.Abstractions" Version="8.0.0" />
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Analyzers/AliasClashAttributeAnalyzer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public partial class AliasClashAttributeAnalyzer : DiagnosticAnalyzer

public const string RuleId = "ORLEANS0011";

private static readonly DiagnosticDescriptor Rule = new(
private static readonly DiagnosticDescriptor Rule = new DiagnosticDescriptor(
id: RuleId,
category: "Usage",
defaultSeverity: DiagnosticSeverity.Error,
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Analyzers/IdClashAttributeAnalyzer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class IdClashAttributeAnalyzer : DiagnosticAnalyzer

public const string RuleId = "ORLEANS0012";

private static readonly DiagnosticDescriptor Rule = new(
private static readonly DiagnosticDescriptor Rule = new DiagnosticDescriptor(
id: RuleId,
category: "Usage",
defaultSeverity: DiagnosticSeverity.Error,
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Analyzers/IncorrectAttributeUseAnalyzer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class IncorrectAttributeUseAnalyzer : DiagnosticAnalyzer
{
public const string RuleId = "ORLEANS0013";

private static readonly DiagnosticDescriptor Rule = new(
private static readonly DiagnosticDescriptor Rule = new DiagnosticDescriptor(
id: RuleId,
category: "Usage",
defaultSeverity: DiagnosticSeverity.Error,
Expand Down
20 changes: 5 additions & 15 deletions src/Orleans.Core/Lifecycle/MigrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ public bool TryGetValue<T>(string key, out T? value)
IEnumerator<string> IEnumerable<string>.GetEnumerator() => new Enumerator(this);
IEnumerator IEnumerable.GetEnumerator() => new Enumerator(this);

private sealed class Enumerator : IEnumerator<string>, IEnumerator
private sealed class Enumerator(MigrationContext context) : IEnumerator<string>, IEnumerator
{
private Dictionary<string, (int Offset, int Length)>.KeyCollection.Enumerator _value;
public Enumerator(MigrationContext context) => _value = context._indices.Keys.GetEnumerator();
private Dictionary<string, (int Offset, int Length)>.KeyCollection.Enumerator _value = context._indices.Keys.GetEnumerator();

public string Current => _value.Current;
object IEnumerator.Current => Current;
public void Dispose() => _value.Dispose();
Expand All @@ -133,18 +133,8 @@ public void Reset()
}
}

internal sealed class SerializationHooks
internal sealed class SerializationHooks(SerializerSessionPool serializerSessionPool)
{
private readonly SerializerSessionPool _serializerSessionPool;

public SerializationHooks(SerializerSessionPool serializerSessionPool)
{
_serializerSessionPool = serializerSessionPool;
}

public void OnDeserializing(MigrationContext context)
{
context._sessionPool = _serializerSessionPool;
}
public void OnDeserializing(MigrationContext context) => context._sessionPool = serializerSessionPool;
}
}
25 changes: 24 additions & 1 deletion src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Tasks.Sources;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Orleans.Internal;
using Orleans.Runtime.Internal;
using Orleans.Runtime.Scheduler;

Expand Down Expand Up @@ -52,7 +53,7 @@ internal interface IActivationMigrationManager
/// <summary>
/// Migrates grain activations to target hosts and handles migration requests from other hosts.
/// </summary>
internal class ActivationMigrationManager : SystemTarget, IActivationMigrationManagerSystemTarget, IActivationMigrationManager
internal class ActivationMigrationManager : SystemTarget, IActivationMigrationManagerSystemTarget, IActivationMigrationManager, ILifecycleParticipant<ISiloLifecycle>
{
private const int MaxBatchSize = 1_000;
private readonly ConcurrentDictionary<SiloAddress, (Task PumpTask, Channel<MigrationWorkItem> WorkItemChannel)> _workers = new();
Expand Down Expand Up @@ -305,6 +306,28 @@ private void RemoveWorker(SiloAddress targetSilo)
}
}

private Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
private async Task StopAsync(CancellationToken cancellationToken)
{
var workerTasks = new List<Task>();
foreach (var (_, value) in _workers)
{
value.WorkItemChannel.Writer.TryComplete();
workerTasks.Add(value.PumpTask);
}

await Task.WhenAll(workerTasks).WithCancellation(cancellationToken);
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(
nameof(ActivationMigrationManager),
ServiceLifecycleStage.RuntimeGrainServices,
ct => this.RunOrQueueTask(() => StartAsync(ct)),
ct => this.RunOrQueueTask(() => StopAsync(ct)));
}

private class MigrationWorkItem : IValueTaskSource
{
private ManualResetValueTaskSourceCore<int> _core = new() { RunContinuationsAsynchronously = true };
Expand Down
1 change: 1 addition & 0 deletions src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ internal static void AddDefaultServices(ISiloBuilder builder)
services.AddSingleton<MigrationContext.SerializationHooks>();
services.AddSingleton<ActivationMigrationManager>();
services.AddFromExisting<IActivationMigrationManager, ActivationMigrationManager>();
services.AddFromExisting<ILifecycleParticipant<ISiloLifecycle>, ActivationMigrationManager>();

ApplyConfiguration(builder);
}
Expand Down
3 changes: 2 additions & 1 deletion src/Orleans.Serialization/Buffers/Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ internal Writer(TBufferWriter output, Span<byte> span, SerializerSession session
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Dispose()
{
// Avoid boxing the struct, for better perf and codegen.
if (typeof(TBufferWriter).IsValueType)
{
if (Output is IDisposable)
Expand Down Expand Up @@ -548,4 +549,4 @@ private void WriteVarUInt64Slow(ulong value)
BinaryPrimitives.WriteUInt16LittleEndian(_currentSpan[sizeof(ulong)..], (ushort)upper);
}
}
}
}
11 changes: 7 additions & 4 deletions src/Orleans.Serialization/Codecs/ByteArrayCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,11 @@ public static Memory<byte> DeepCopy(Memory<byte> input, CopyContext copyContext)
/// Serializer for <see cref="PooledBuffer"/> instances.
/// </summary>
[RegisterSerializer]
public sealed class PooledBufferCodec : IValueSerializer<PooledBuffer>
public sealed class PooledBufferCodec : IFieldCodec<PooledBuffer>
{
public void Serialize<TBufferWriter>(ref Writer<TBufferWriter> writer, scoped ref PooledBuffer value) where TBufferWriter : IBufferWriter<byte>
public void WriteField<TBufferWriter>(ref Writer<TBufferWriter> writer, uint fieldIdDelta, Type expectedType, PooledBuffer value) where TBufferWriter : IBufferWriter<byte>
{
writer.WriteFieldHeader(fieldIdDelta, expectedType, typeof(PooledBuffer), WireType.LengthPrefixed);
writer.WriteVarUInt32((uint)value.Length);
foreach (var segment in value)
{
Expand All @@ -311,11 +312,12 @@ public void Serialize<TBufferWriter>(ref Writer<TBufferWriter> writer, scoped re
// Senders must not use the value after sending.
// Receivers must dispose of the value after use.
value.Reset();
value = default;
}

public void Deserialize<TInput>(ref Reader<TInput> reader, scoped ref PooledBuffer value)
public PooledBuffer ReadValue<TInput>(ref Reader<TInput> reader, Field field)
{
field.EnsureWireType(WireType.LengthPrefixed);
var value = new PooledBuffer();
const int MaxSpanLength = 4096;
var length = (int)reader.ReadVarUInt32();
while (length > 0)
Expand All @@ -328,6 +330,7 @@ public void Deserialize<TInput>(ref Reader<TInput> reader, scoped ref PooledBuff
}

Debug.Assert(length == 0);
return value;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public void Configure(ISiloBuilder builder, string name, IConfigurationSection c
if (!string.IsNullOrEmpty(serviceKey))
{
// Get a connection multiplexer instance by name.
var multiplexer = services.GetKeyedService<IConnectionMultiplexer>(serviceKey);
var multiplexer = services.GetRequiredKeyedService<IConnectionMultiplexer>(serviceKey);
options.CreateMultiplexer = _ => Task.FromResult(multiplexer);
options.ConfigurationOptions = new ConfigurationOptions();
}
else
{
Expand Down Expand Up @@ -58,8 +59,9 @@ public void Configure(IClientBuilder builder, string name, IConfigurationSection
if (!string.IsNullOrEmpty(serviceKey))
{
// Get a connection multiplexer instance by name.
var multiplexer = services.GetKeyedService<IConnectionMultiplexer>(serviceKey);
var multiplexer = services.GetRequiredKeyedService<IConnectionMultiplexer>(serviceKey);
options.CreateMultiplexer = _ => Task.FromResult(multiplexer);
options.ConfigurationOptions = new ConfigurationOptions();
}
else
{
Expand Down
50 changes: 37 additions & 13 deletions src/Redis/Orleans.Clustering.Redis/Storage/RedisMembershipTable.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#nullable enable
using System;
using System.Threading.Tasks;
using Orleans.Runtime;
Expand All @@ -8,6 +9,7 @@
using Microsoft.Extensions.Options;
using System.Globalization;
using System.Text;
using System.Diagnostics.CodeAnalysis;

namespace Orleans.Clustering.Redis
{
Expand All @@ -19,8 +21,8 @@ internal class RedisMembershipTable : IMembershipTable, IDisposable
private readonly ClusterOptions _clusterOptions;
private readonly JsonSerializerSettings _jsonSerializerSettings;
private readonly RedisKey _clusterKey;
private IConnectionMultiplexer _muxer;
private IDatabase _db;
private IConnectionMultiplexer _muxer = null!;
private IDatabase _db = null!;

public RedisMembershipTable(IOptions<RedisClusteringOptions> redisOptions, IOptions<ClusterOptions> clusterOptions)
{
Expand Down Expand Up @@ -69,13 +71,18 @@ private async Task<UpsertResult> UpsertRowInternal(MembershipEntry entry, TableV
{
tx.HashSetAsync(_clusterKey, TableVersionKey, SerializeVersion(tableVersion)).Ignore();
}

var versionCondition = tx.AddCondition(Condition.HashEqual(_clusterKey, TableVersionKey, SerializeVersion(Predeccessor(tableVersion))));

ConditionResult insertCondition = null;
ConditionResult? insertCondition;
if (allowInsertOnly)
{
insertCondition = tx.AddCondition(Condition.HashNotExists(_clusterKey, rowKey));
}
else
{
insertCondition = null;
}

tx.HashSetAsync(_clusterKey, rowKey, Serialize(entry)).Ignore();

Expand All @@ -91,9 +98,9 @@ private async Task<UpsertResult> UpsertRowInternal(MembershipEntry entry, TableV
return UpsertResult.Conflict;
}

if (!insertCondition.WasSatisfied)
if (insertCondition is not null && !insertCondition.WasSatisfied)
{
return UpsertResult.Failure;
return UpsertResult.Conflict;
}

return UpsertResult.Failure;
Expand All @@ -105,15 +112,32 @@ public async Task<MembershipTableData> ReadAll()
var tableVersionRow = all.SingleOrDefault(h => TableVersionKey.Equals(h.Name, StringComparison.Ordinal));
TableVersion tableVersion = GetTableVersionFromRow(tableVersionRow.Value);

var data = all.Where(h => !TableVersionKey.Equals(h.Name, StringComparison.Ordinal) && h.Value.HasValue)
.Select(x => Tuple.Create(Deserialize(x.Value), tableVersion.VersionEtag))
var data = all.Where(x => !TableVersionKey.Equals(x.Name, StringComparison.Ordinal) && x.Value.HasValue)
.Select(x => Tuple.Create(Deserialize(x.Value!), tableVersion.VersionEtag))
.ToList();
return new MembershipTableData(data, tableVersion);
}

private static TableVersion GetTableVersionFromRow(RedisValue tableVersionRow)
{
return tableVersionRow.HasValue ? DeserializeVersion(tableVersionRow) : DefaultTableVersion;
if (TryGetValueString(tableVersionRow, out var value))
{
return DeserializeVersion(value);
}

return DefaultTableVersion;
}

private static bool TryGetValueString(RedisValue key, [NotNullWhen(true)] out string? value)
{
if (key.HasValue)
{
value = key.ToString();
return true;
}

value = null;
return false;
}

public async Task<MembershipTableData> ReadRow(SiloAddress key)
Expand All @@ -128,9 +152,9 @@ public async Task<MembershipTableData> ReadRow(SiloAddress key)

TableVersion tableVersion = GetTableVersionFromRow(await tableVersionRowTask);
var entryRow = await entryRowTask;
if (entryRow.HasValue)
if (TryGetValueString(entryRow, out var entryValueString))
{
var entry = Deserialize(entryRow);
var entry = Deserialize(entryValueString);
return new MembershipTableData(Tuple.Create(entry, tableVersion.VersionEtag), tableVersion);
}
else
Expand All @@ -151,13 +175,13 @@ public async Task UpdateIAmAlive(MembershipEntry entry)
}

var entryRow = await entryRowTask;
if (!entryRow.HasValue)
if (!TryGetValueString(entryRow, out var entryRowValue))
{
throw new RedisClusteringException($"Could not find a value for the key {key}");
}

TableVersion tableVersion = GetTableVersionFromRow(await tableVersionRowTask).Next();
var existingEntry = Deserialize(entryRow);
var existingEntry = Deserialize(entryRowValue);

// Update only the IAmAliveTime property.
existingEntry.IAmAliveTime = entry.IAmAliveTime;
Expand Down Expand Up @@ -226,7 +250,7 @@ private string Serialize(MembershipEntry value)

private MembershipEntry Deserialize(string json)
{
return JsonConvert.DeserializeObject<MembershipEntry>(json, _jsonSerializerSettings);
return JsonConvert.DeserializeObject<MembershipEntry>(json, _jsonSerializerSettings)!;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ public void Configure(ISiloBuilder builder, string name, IConfigurationSection c
if (!string.IsNullOrEmpty(serviceKey))
{
// Get a connection multiplexer instance by name.
var multiplexer = services.GetKeyedService<IConnectionMultiplexer>(serviceKey);
var multiplexer = services.GetRequiredKeyedService<IConnectionMultiplexer>(serviceKey);
options.CreateMultiplexer = _ => Task.FromResult(multiplexer);
options.ConfigurationOptions = new ConfigurationOptions();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public void Configure(ISiloBuilder builder, string name, IConfigurationSection c
if (!string.IsNullOrEmpty(serviceKey))
{
// Get a connection multiplexer instance by name.
var multiplexer = services.GetKeyedService<IConnectionMultiplexer>(serviceKey);
var multiplexer = services.GetRequiredKeyedService<IConnectionMultiplexer>(serviceKey);
options.CreateMultiplexer = _ => Task.FromResult(multiplexer);
options.ConfigurationOptions = new ConfigurationOptions();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public void Configure(ISiloBuilder builder, string name, IConfigurationSection c
if (!string.IsNullOrEmpty(serviceKey))
{
// Get a connection multiplexer instance by name.
var multiplexer = services.GetKeyedService<IConnectionMultiplexer>(serviceKey);
var multiplexer = services.GetRequiredKeyedService<IConnectionMultiplexer>(serviceKey);
options.CreateMultiplexer = _ => Task.FromResult(multiplexer);
options.ConfigurationOptions = new ConfigurationOptions();
}
else
{
Expand Down
Loading

0 comments on commit 5087395

Please sign in to comment.