From d27ad15a089e4c96d9bc4523c0b3965fa8c1c0e0 Mon Sep 17 00:00:00 2001 From: Sam Fields Date: Tue, 26 Dec 2023 22:55:58 -0500 Subject: [PATCH 1/2] Add injectable CustomStorage implementation --- ...BadCustomStorageProviderConfigException.cs | 26 ++++++ .../CustomStorage/CustomStorageHelpers.cs | 46 +++++++++++ .../CustomStorageProviderAttribute.cs | 10 +++ .../CustomStorage/ICustomStorageFactory.cs | 11 +++ .../CustomStorage/ICustomStorageInterface.cs | 15 +++- .../CustomStorage/LogConsistencyProvider.cs | 16 ++-- .../CustomStorage/LogViewAdaptor.cs | 27 ++++--- .../TestGrains/LogTestGrainVariations.cs | 79 ++++++++++++++++++- .../GeoClusterTests/BasicLogTestGrainTests.cs | 16 +++- 9 files changed, 223 insertions(+), 23 deletions(-) create mode 100644 src/Orleans.EventSourcing/CustomStorage/BadCustomStorageProviderConfigException.cs create mode 100644 src/Orleans.EventSourcing/CustomStorage/CustomStorageHelpers.cs create mode 100644 src/Orleans.EventSourcing/CustomStorage/CustomStorageProviderAttribute.cs create mode 100644 src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs diff --git a/src/Orleans.EventSourcing/CustomStorage/BadCustomStorageProviderConfigException.cs b/src/Orleans.EventSourcing/CustomStorage/BadCustomStorageProviderConfigException.cs new file mode 100644 index 0000000000..6f6070a942 --- /dev/null +++ b/src/Orleans.EventSourcing/CustomStorage/BadCustomStorageProviderConfigException.cs @@ -0,0 +1,26 @@ +using System; +using Orleans; +using Orleans.Runtime; + +namespace OrleansEventSourcing.CustomStorage; + +/// +/// Exception thrown whenever a grain call is attempted with a bad / missing custom storage provider configuration settings for that grain. +/// +[GenerateSerializer, Serializable] +public sealed class BadCustomStorageProviderConfigException : OrleansException +{ + public BadCustomStorageProviderConfigException() + { + } + + public BadCustomStorageProviderConfigException(string msg) + : base(msg) + { + } + + public BadCustomStorageProviderConfigException(string msg, Exception exc) + : base(msg, exc) + { + } +} \ No newline at end of file diff --git a/src/Orleans.EventSourcing/CustomStorage/CustomStorageHelpers.cs b/src/Orleans.EventSourcing/CustomStorage/CustomStorageHelpers.cs new file mode 100644 index 0000000000..d7670dd2ea --- /dev/null +++ b/src/Orleans.EventSourcing/CustomStorage/CustomStorageHelpers.cs @@ -0,0 +1,46 @@ +using System; +using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.DependencyInjection; +using Orleans.EventSourcing.CustomStorage; +using Orleans.Runtime; + +namespace OrleansEventSourcing.CustomStorage; + +public static class CustomStorageHelpers +{ + public static ICustomStorageInterface GetCustomStorage(object hostGrain, GrainId grainId, IServiceProvider services) + where TState : class, new() + where TDelta : class + { + ArgumentNullException.ThrowIfNull(hostGrain); + + if (hostGrain is ICustomStorageInterface hostGrainCustomStorage) + { + return hostGrainCustomStorage; + } + + var grainType = hostGrain.GetType(); + var attrs = grainType.GetCustomAttributes(typeof(CustomStorageProviderAttribute), true); + var attr = attrs.Length > 0 ? (CustomStorageProviderAttribute)attrs[0] : null; + var storageFactory = attr != null + ? services.GetKeyedService(attr.ProviderName) + : services.GetService(); + + if (storageFactory == null) + { + ThrowMissingProviderException(grainType, attr?.ProviderName); + } + + return storageFactory.CreateCustomStorage(grainId); + } + + [DoesNotReturn] + private static void ThrowMissingProviderException(Type grainType, string name) + { + var grainTypeName = grainType.FullName; + var errMsg = string.IsNullOrEmpty(name) + ? $"No default custom storage provider found loading grain type {grainTypeName} and grain does not implement ICustomStorageInterface." + : $"No custom storage provider named \"{name}\" found loading grain type {grainTypeName} and grain does not implement ICustomStorageInterface."; + throw new BadCustomStorageProviderConfigException(errMsg); + } +} \ No newline at end of file diff --git a/src/Orleans.EventSourcing/CustomStorage/CustomStorageProviderAttribute.cs b/src/Orleans.EventSourcing/CustomStorage/CustomStorageProviderAttribute.cs new file mode 100644 index 0000000000..8a965de958 --- /dev/null +++ b/src/Orleans.EventSourcing/CustomStorage/CustomStorageProviderAttribute.cs @@ -0,0 +1,10 @@ +using System; +using Orleans.Providers; + +namespace OrleansEventSourcing.CustomStorage; + +[AttributeUsage(AttributeTargets.Class)] +public class CustomStorageProviderAttribute : Attribute +{ + public string ProviderName { get; set; } = ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME; +} \ No newline at end of file diff --git a/src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs b/src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs new file mode 100644 index 0000000000..418257f92d --- /dev/null +++ b/src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs @@ -0,0 +1,11 @@ +using Orleans.EventSourcing.CustomStorage; +using Orleans.Runtime; + +namespace OrleansEventSourcing.CustomStorage; + +public interface ICustomStorageFactory +{ + public ICustomStorageInterface CreateCustomStorage(GrainId grainId) + where TState : class, new() + where TDelta : class; +} \ No newline at end of file diff --git a/src/Orleans.EventSourcing/CustomStorage/ICustomStorageInterface.cs b/src/Orleans.EventSourcing/CustomStorage/ICustomStorageInterface.cs index 83f880b42d..0b81331f12 100644 --- a/src/Orleans.EventSourcing/CustomStorage/ICustomStorageInterface.cs +++ b/src/Orleans.EventSourcing/CustomStorage/ICustomStorageInterface.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Orleans.EventSourcing.CustomStorage @@ -18,11 +19,21 @@ public interface ICustomStorageInterface Task> ReadStateFromStorage(); /// - /// Applies the given array of deltas to storage, and returns true, if the version in storage matches the expected version. + /// Applies the given array of deltas to storage, and returns true, if the version in storage matches the expected version. /// Otherwise, does nothing and returns false. If successful, the version of storage must be increased by the number of deltas. /// /// true if the deltas were applied, false otherwise Task ApplyUpdatesToStorage(IReadOnlyList updates, int expectedversion); + + /// + /// Attempt to retrieve a segment of the log, possibly from storage. Throws if + /// the log cannot be read, which depends on the providers used and how they are configured. + /// + /// the start position + /// the end position + /// + Task> RetrieveLogSegment(int fromVersion, int toVersion) => + throw new NotSupportedException(); } } diff --git a/src/Orleans.EventSourcing/CustomStorage/LogConsistencyProvider.cs b/src/Orleans.EventSourcing/CustomStorage/LogConsistencyProvider.cs index ad2801f69e..0def0171fe 100644 --- a/src/Orleans.EventSourcing/CustomStorage/LogConsistencyProvider.cs +++ b/src/Orleans.EventSourcing/CustomStorage/LogConsistencyProvider.cs @@ -3,20 +3,22 @@ using System; using Microsoft.Extensions.Options; using Microsoft.Extensions.DependencyInjection; +using OrleansEventSourcing.CustomStorage; namespace Orleans.EventSourcing.CustomStorage { /// - /// A log-consistency provider that relies on grain-specific custom code for + /// A log-consistency provider that relies on grain-specific custom code for /// reading states from storage, and appending deltas to storage. /// Grains that wish to use this provider must implement the /// interface, to define how state is read and how deltas are written. /// If the provider attribute "PrimaryCluster" is supplied in the provider configuration, then only the specified cluster - /// accesses storage, and other clusters may not issue updates. + /// accesses storage, and other clusters may not issue updates. /// public class LogConsistencyProvider : ILogViewAdaptorFactory { private readonly CustomStorageLogConsistencyOptions options; + private readonly IServiceProvider serviceProvider; /// /// Specifies a clusterid of the primary cluster from which to access storage exclusively, null if @@ -26,10 +28,11 @@ public class LogConsistencyProvider : ILogViewAdaptorFactory /// public bool UsesStorageProvider => false; - - public LogConsistencyProvider(CustomStorageLogConsistencyOptions options) + + public LogConsistencyProvider(CustomStorageLogConsistencyOptions options, IServiceProvider serviceProvider) { this.options = options; + this.serviceProvider = serviceProvider; } /// @@ -37,7 +40,8 @@ public ILogViewAdaptor MakeLogViewAdaptor(ILogView where TView : class, new() where TEntry : class { - return new CustomStorageAdaptor(hostgrain, initialstate, services, PrimaryCluster); + var customStorage = CustomStorageHelpers.GetCustomStorage(hostgrain, services.GrainId, serviceProvider); + return new CustomStorageAdaptor(hostgrain, initialstate, services, PrimaryCluster, customStorage); } } @@ -46,7 +50,7 @@ public static class LogConsistencyProviderFactory public static ILogViewAdaptorFactory Create(IServiceProvider services, string name) { var optionsMonitor = services.GetRequiredService>(); - return ActivatorUtilities.CreateInstance(services, optionsMonitor.Get(name)); + return ActivatorUtilities.CreateInstance(services, optionsMonitor.Get(name), services); } } } \ No newline at end of file diff --git a/src/Orleans.EventSourcing/CustomStorage/LogViewAdaptor.cs b/src/Orleans.EventSourcing/CustomStorage/LogViewAdaptor.cs index 0571722b4a..338de96e40 100644 --- a/src/Orleans.EventSourcing/CustomStorage/LogViewAdaptor.cs +++ b/src/Orleans.EventSourcing/CustomStorage/LogViewAdaptor.cs @@ -4,13 +4,12 @@ using System.Linq; using System.Threading.Tasks; using Microsoft.Extensions.Logging; -using Orleans.Storage; using Orleans.EventSourcing.Common; namespace Orleans.EventSourcing.CustomStorage { /// - /// A log consistency adaptor that uses the user-provided storage interface . + /// A log consistency adaptor that uses the user-provided storage interface . /// This interface must be implemented by any grain that uses this log view adaptor. /// /// log view type @@ -23,15 +22,15 @@ internal class CustomStorageAdaptor : PrimaryBasedLogViewAd /// Initialize a new instance of CustomStorageAdaptor class /// public CustomStorageAdaptor(ILogViewAdaptorHost host, TLogView initialState, - ILogConsistencyProtocolServices services, string primaryCluster) + ILogConsistencyProtocolServices services, string primaryCluster, ICustomStorageInterface customStorage) : base(host, initialState, services) { - if (!(host is ICustomStorageInterface)) - throw new BadProviderConfigException("Must implement ICustomStorageInterface for CustomStorageLogView provider"); + this.customStorage = customStorage; this.primaryCluster = primaryCluster; } private readonly string primaryCluster; + private readonly ICustomStorageInterface customStorage; private TLogView cached; private int version; @@ -114,7 +113,7 @@ protected override async Task ReadAsync() try { // read from storage - var result = await ((ICustomStorageInterface)Host).ReadStateFromStorage(); + var result = await customStorage.ReadStateFromStorage(); version = result.Key; cached = result.Value; @@ -152,7 +151,7 @@ protected override async Task WriteAsync() try { - writesuccessful = await ((ICustomStorageInterface) Host).ApplyUpdatesToStorage(updates, version); + writesuccessful = await customStorage.ApplyUpdatesToStorage(updates, version); LastPrimaryIssue.Resolve(Host, Services); } @@ -197,7 +196,7 @@ protected override async Task WriteAsync() try { - var result = await ((ICustomStorageInterface)Host).ReadStateFromStorage(); + var result = await customStorage.ReadStateFromStorage(); version = result.Key; cached = result.Value; @@ -225,6 +224,12 @@ protected override async Task WriteAsync() return writesuccessful ? updates.Count : 0; } + /// + public override Task> RetrieveLogSegment(int fromVersion, int toVersion) + { + return customStorage.RetrieveLogSegment(fromVersion, toVersion); + } + /// /// Describes a connection issue that occurred when updating the primary storage. /// @@ -279,7 +284,7 @@ public override string ToString() return string.Format("v{0} ({1} updates)", Version, Updates.Count); } } - + private readonly SortedList notifications = new SortedList(); /// @@ -309,7 +314,7 @@ protected override void ProcessNotifications() var updatenotification = notifications.ElementAt(0).Value; notifications.RemoveAt(0); - // Apply all operations in pending + // Apply all operations in pending foreach (var u in updatenotification.Updates) try { @@ -328,7 +333,7 @@ protected override void ProcessNotifications() Services.Log(LogLevel.Trace, "unprocessed notifications in queue: {0}", notifications.Count); base.ProcessNotifications(); - + } [Conditional("DEBUG")] diff --git a/test/Grains/TestGrains/LogTestGrainVariations.cs b/test/Grains/TestGrains/LogTestGrainVariations.cs index 8a279f75ed..d1c6841e7e 100644 --- a/test/Grains/TestGrains/LogTestGrainVariations.cs +++ b/test/Grains/TestGrains/LogTestGrainVariations.cs @@ -1,5 +1,8 @@ +using Orleans.EventSourcing.CustomStorage; using Orleans.Providers; +using Orleans.Runtime; using Orleans.Serialization; +using OrleansEventSourcing.CustomStorage; using UnitTests.GrainInterfaces; namespace TestGrains @@ -48,7 +51,7 @@ private ILogTestGrain GetStorageGrain() } return storagegrain; } - + public Task ApplyUpdatesToStorage(IReadOnlyList updates, int expectedversion) { @@ -120,5 +123,79 @@ public Task> ReadStateFromStorage() } } + // use the explicitly specified "CustomStorage" log-consistency provider with a separate ICustomStorageInterface implementation + [LogConsistencyProvider(ProviderName = "CustomStorage")] + public class LogTestGrainSeparateCustomStorage : LogTestGrain + { + public class SeparateCustomStorageFactory : ICustomStorageFactory + { + private readonly DeepCopier deepCopier; + + public SeparateCustomStorageFactory(DeepCopier deepCopier) + { + this.deepCopier = deepCopier; + } + + public ICustomStorageInterface CreateCustomStorage(GrainId grainId) + where TState : class, new() + where TDelta : class + { + return new SeparateCustomStorage(deepCopier); + } + } + + public class SeparateCustomStorage : ICustomStorageInterface + where TState : class, new() + where TDelta : class + { + private readonly DeepCopier copier; + + // we use fake in-memory state as the storage + private TState state; + private int version; + + public SeparateCustomStorage(DeepCopier copier) + { + this.copier = copier; + } + + public Task> ReadStateFromStorage() + { + if (state == null) + { + state = new TState(); + version = 0; + } + return Task.FromResult(new KeyValuePair(version, this.copier.Copy(state))); + } + + public Task ApplyUpdatesToStorage(IReadOnlyList updates, int expectedversion) + { + if (state == null) + { + state = new TState(); + version = 0; + } + + if (expectedversion != version) + return Task.FromResult(false); + + foreach (var u in updates) + { + this.TransitionState(state, u); + version++; + } + + return Task.FromResult(true); + } + + protected virtual void TransitionState(TState state, object @event) + { + dynamic s = state; + dynamic e = @event; + s.Apply(e); + } + } + } } diff --git a/test/TesterInternal/GeoClusterTests/BasicLogTestGrainTests.cs b/test/TesterInternal/GeoClusterTests/BasicLogTestGrainTests.cs index bc4533b901..90fdae2934 100644 --- a/test/TesterInternal/GeoClusterTests/BasicLogTestGrainTests.cs +++ b/test/TesterInternal/GeoClusterTests/BasicLogTestGrainTests.cs @@ -1,3 +1,4 @@ +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Xunit; using Orleans.TestingHost; @@ -6,6 +7,8 @@ using Tester; using Orleans.Configuration; +using OrleansEventSourcing.CustomStorage; +using TestGrains; namespace Tests.GeoClusterTests { @@ -41,11 +44,13 @@ public void Configure(ISiloBuilder hostBuilder) { options.ConfigureTableServiceClient(TestDefaultConfiguration.DataConnectionString); })) - .AddMemoryGrainStorage("MemoryStore"); + .AddMemoryGrainStorage("MemoryStore"); + + hostBuilder.Services.AddSingleton(); } } } - + public BasicLogTestGrainTests(Fixture fixture) { this.fixture = fixture; @@ -78,6 +83,11 @@ public async Task CustomStorage() { await DoBasicLogTestGrainTest("TestGrains.LogTestGrainCustomStorage"); } + [SkippableFact] + public async Task SeparateCustomStorage() + { + await DoBasicLogTestGrainTest("TestGrains.LogTestGrainSeparateCustomStorage"); + } private int GetRandom() { @@ -93,7 +103,7 @@ private async Task DoBasicLogTestGrainTest(string grainClass, int phases = 100) private async Task ThreeCheckers(string grainClass, int phases) { - // Global + // Global async Task checker1() { int x = GetRandom(); From ecfee790444a9ed9253ce2928705e59a7913170a Mon Sep 17 00:00:00 2001 From: Sam Fields Date: Tue, 2 Jan 2024 22:13:42 -0500 Subject: [PATCH 2/2] Remove type requirements --- .../CustomStorage/ICustomStorageFactory.cs | 4 +--- test/Grains/TestGrains/LogTestGrainVariations.cs | 8 ++------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs b/src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs index 418257f92d..ec9b90ba19 100644 --- a/src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs +++ b/src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs @@ -5,7 +5,5 @@ namespace OrleansEventSourcing.CustomStorage; public interface ICustomStorageFactory { - public ICustomStorageInterface CreateCustomStorage(GrainId grainId) - where TState : class, new() - where TDelta : class; + public ICustomStorageInterface CreateCustomStorage(GrainId grainId); } \ No newline at end of file diff --git a/test/Grains/TestGrains/LogTestGrainVariations.cs b/test/Grains/TestGrains/LogTestGrainVariations.cs index d1c6841e7e..19591d42d9 100644 --- a/test/Grains/TestGrains/LogTestGrainVariations.cs +++ b/test/Grains/TestGrains/LogTestGrainVariations.cs @@ -137,16 +137,12 @@ public SeparateCustomStorageFactory(DeepCopier deepCopier) } public ICustomStorageInterface CreateCustomStorage(GrainId grainId) - where TState : class, new() - where TDelta : class { return new SeparateCustomStorage(deepCopier); } } public class SeparateCustomStorage : ICustomStorageInterface - where TState : class, new() - where TDelta : class { private readonly DeepCopier copier; @@ -163,7 +159,7 @@ public Task> ReadStateFromStorage() { if (state == null) { - state = new TState(); + state = Activator.CreateInstance(); version = 0; } return Task.FromResult(new KeyValuePair(version, this.copier.Copy(state))); @@ -173,7 +169,7 @@ public Task ApplyUpdatesToStorage(IReadOnlyList updates, int expec { if (state == null) { - state = new TState(); + state = Activator.CreateInstance(); version = 0; }