diff --git a/src/Orleans.EventSourcing/CustomStorage/BadCustomStorageProviderConfigException.cs b/src/Orleans.EventSourcing/CustomStorage/BadCustomStorageProviderConfigException.cs
new file mode 100644
index 0000000000..6f6070a942
--- /dev/null
+++ b/src/Orleans.EventSourcing/CustomStorage/BadCustomStorageProviderConfigException.cs
@@ -0,0 +1,26 @@
+using System;
+using Orleans;
+using Orleans.Runtime;
+
+namespace OrleansEventSourcing.CustomStorage;
+
+///
+/// Exception thrown whenever a grain call is attempted with a bad / missing custom storage provider configuration settings for that grain.
+///
+[GenerateSerializer, Serializable]
+public sealed class BadCustomStorageProviderConfigException : OrleansException
+{
+ public BadCustomStorageProviderConfigException()
+ {
+ }
+
+ public BadCustomStorageProviderConfigException(string msg)
+ : base(msg)
+ {
+ }
+
+ public BadCustomStorageProviderConfigException(string msg, Exception exc)
+ : base(msg, exc)
+ {
+ }
+}
\ No newline at end of file
diff --git a/src/Orleans.EventSourcing/CustomStorage/CustomStorageHelpers.cs b/src/Orleans.EventSourcing/CustomStorage/CustomStorageHelpers.cs
new file mode 100644
index 0000000000..d7670dd2ea
--- /dev/null
+++ b/src/Orleans.EventSourcing/CustomStorage/CustomStorageHelpers.cs
@@ -0,0 +1,46 @@
+using System;
+using System.Diagnostics.CodeAnalysis;
+using Microsoft.Extensions.DependencyInjection;
+using Orleans.EventSourcing.CustomStorage;
+using Orleans.Runtime;
+
+namespace OrleansEventSourcing.CustomStorage;
+
+public static class CustomStorageHelpers
+{
+ public static ICustomStorageInterface GetCustomStorage(object hostGrain, GrainId grainId, IServiceProvider services)
+ where TState : class, new()
+ where TDelta : class
+ {
+ ArgumentNullException.ThrowIfNull(hostGrain);
+
+ if (hostGrain is ICustomStorageInterface hostGrainCustomStorage)
+ {
+ return hostGrainCustomStorage;
+ }
+
+ var grainType = hostGrain.GetType();
+ var attrs = grainType.GetCustomAttributes(typeof(CustomStorageProviderAttribute), true);
+ var attr = attrs.Length > 0 ? (CustomStorageProviderAttribute)attrs[0] : null;
+ var storageFactory = attr != null
+ ? services.GetKeyedService(attr.ProviderName)
+ : services.GetService();
+
+ if (storageFactory == null)
+ {
+ ThrowMissingProviderException(grainType, attr?.ProviderName);
+ }
+
+ return storageFactory.CreateCustomStorage(grainId);
+ }
+
+ [DoesNotReturn]
+ private static void ThrowMissingProviderException(Type grainType, string name)
+ {
+ var grainTypeName = grainType.FullName;
+ var errMsg = string.IsNullOrEmpty(name)
+ ? $"No default custom storage provider found loading grain type {grainTypeName} and grain does not implement ICustomStorageInterface."
+ : $"No custom storage provider named \"{name}\" found loading grain type {grainTypeName} and grain does not implement ICustomStorageInterface.";
+ throw new BadCustomStorageProviderConfigException(errMsg);
+ }
+}
\ No newline at end of file
diff --git a/src/Orleans.EventSourcing/CustomStorage/CustomStorageProviderAttribute.cs b/src/Orleans.EventSourcing/CustomStorage/CustomStorageProviderAttribute.cs
new file mode 100644
index 0000000000..8a965de958
--- /dev/null
+++ b/src/Orleans.EventSourcing/CustomStorage/CustomStorageProviderAttribute.cs
@@ -0,0 +1,10 @@
+using System;
+using Orleans.Providers;
+
+namespace OrleansEventSourcing.CustomStorage;
+
+[AttributeUsage(AttributeTargets.Class)]
+public class CustomStorageProviderAttribute : Attribute
+{
+ public string ProviderName { get; set; } = ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME;
+}
\ No newline at end of file
diff --git a/src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs b/src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs
new file mode 100644
index 0000000000..ec9b90ba19
--- /dev/null
+++ b/src/Orleans.EventSourcing/CustomStorage/ICustomStorageFactory.cs
@@ -0,0 +1,9 @@
+using Orleans.EventSourcing.CustomStorage;
+using Orleans.Runtime;
+
+namespace OrleansEventSourcing.CustomStorage;
+
+public interface ICustomStorageFactory
+{
+ public ICustomStorageInterface CreateCustomStorage(GrainId grainId);
+}
\ No newline at end of file
diff --git a/src/Orleans.EventSourcing/CustomStorage/ICustomStorageInterface.cs b/src/Orleans.EventSourcing/CustomStorage/ICustomStorageInterface.cs
index 83f880b42d..0b81331f12 100644
--- a/src/Orleans.EventSourcing/CustomStorage/ICustomStorageInterface.cs
+++ b/src/Orleans.EventSourcing/CustomStorage/ICustomStorageInterface.cs
@@ -1,4 +1,5 @@
-using System.Collections.Generic;
+using System;
+using System.Collections.Generic;
using System.Threading.Tasks;
namespace Orleans.EventSourcing.CustomStorage
@@ -18,11 +19,21 @@ public interface ICustomStorageInterface
Task> ReadStateFromStorage();
///
- /// Applies the given array of deltas to storage, and returns true, if the version in storage matches the expected version.
+ /// Applies the given array of deltas to storage, and returns true, if the version in storage matches the expected version.
/// Otherwise, does nothing and returns false. If successful, the version of storage must be increased by the number of deltas.
///
/// true if the deltas were applied, false otherwise
Task ApplyUpdatesToStorage(IReadOnlyList updates, int expectedversion);
+
+ ///
+ /// Attempt to retrieve a segment of the log, possibly from storage. Throws if
+ /// the log cannot be read, which depends on the providers used and how they are configured.
+ ///
+ /// the start position
+ /// the end position
+ ///
+ Task> RetrieveLogSegment(int fromVersion, int toVersion) =>
+ throw new NotSupportedException();
}
}
diff --git a/src/Orleans.EventSourcing/CustomStorage/LogConsistencyProvider.cs b/src/Orleans.EventSourcing/CustomStorage/LogConsistencyProvider.cs
index ad2801f69e..0def0171fe 100644
--- a/src/Orleans.EventSourcing/CustomStorage/LogConsistencyProvider.cs
+++ b/src/Orleans.EventSourcing/CustomStorage/LogConsistencyProvider.cs
@@ -3,20 +3,22 @@
using System;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.DependencyInjection;
+using OrleansEventSourcing.CustomStorage;
namespace Orleans.EventSourcing.CustomStorage
{
///
- /// A log-consistency provider that relies on grain-specific custom code for
+ /// A log-consistency provider that relies on grain-specific custom code for
/// reading states from storage, and appending deltas to storage.
/// Grains that wish to use this provider must implement the
/// interface, to define how state is read and how deltas are written.
/// If the provider attribute "PrimaryCluster" is supplied in the provider configuration, then only the specified cluster
- /// accesses storage, and other clusters may not issue updates.
+ /// accesses storage, and other clusters may not issue updates.
///
public class LogConsistencyProvider : ILogViewAdaptorFactory
{
private readonly CustomStorageLogConsistencyOptions options;
+ private readonly IServiceProvider serviceProvider;
///
/// Specifies a clusterid of the primary cluster from which to access storage exclusively, null if
@@ -26,10 +28,11 @@ public class LogConsistencyProvider : ILogViewAdaptorFactory
///
public bool UsesStorageProvider => false;
-
- public LogConsistencyProvider(CustomStorageLogConsistencyOptions options)
+
+ public LogConsistencyProvider(CustomStorageLogConsistencyOptions options, IServiceProvider serviceProvider)
{
this.options = options;
+ this.serviceProvider = serviceProvider;
}
///
@@ -37,7 +40,8 @@ public ILogViewAdaptor MakeLogViewAdaptor(ILogView
where TView : class, new()
where TEntry : class
{
- return new CustomStorageAdaptor(hostgrain, initialstate, services, PrimaryCluster);
+ var customStorage = CustomStorageHelpers.GetCustomStorage(hostgrain, services.GrainId, serviceProvider);
+ return new CustomStorageAdaptor(hostgrain, initialstate, services, PrimaryCluster, customStorage);
}
}
@@ -46,7 +50,7 @@ public static class LogConsistencyProviderFactory
public static ILogViewAdaptorFactory Create(IServiceProvider services, string name)
{
var optionsMonitor = services.GetRequiredService>();
- return ActivatorUtilities.CreateInstance(services, optionsMonitor.Get(name));
+ return ActivatorUtilities.CreateInstance(services, optionsMonitor.Get(name), services);
}
}
}
\ No newline at end of file
diff --git a/src/Orleans.EventSourcing/CustomStorage/LogViewAdaptor.cs b/src/Orleans.EventSourcing/CustomStorage/LogViewAdaptor.cs
index 0571722b4a..338de96e40 100644
--- a/src/Orleans.EventSourcing/CustomStorage/LogViewAdaptor.cs
+++ b/src/Orleans.EventSourcing/CustomStorage/LogViewAdaptor.cs
@@ -4,13 +4,12 @@
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
-using Orleans.Storage;
using Orleans.EventSourcing.Common;
namespace Orleans.EventSourcing.CustomStorage
{
///
- /// A log consistency adaptor that uses the user-provided storage interface .
+ /// A log consistency adaptor that uses the user-provided storage interface .
/// This interface must be implemented by any grain that uses this log view adaptor.
///
/// log view type
@@ -23,15 +22,15 @@ internal class CustomStorageAdaptor : PrimaryBasedLogViewAd
/// Initialize a new instance of CustomStorageAdaptor class
///
public CustomStorageAdaptor(ILogViewAdaptorHost host, TLogView initialState,
- ILogConsistencyProtocolServices services, string primaryCluster)
+ ILogConsistencyProtocolServices services, string primaryCluster, ICustomStorageInterface customStorage)
: base(host, initialState, services)
{
- if (!(host is ICustomStorageInterface))
- throw new BadProviderConfigException("Must implement ICustomStorageInterface for CustomStorageLogView provider");
+ this.customStorage = customStorage;
this.primaryCluster = primaryCluster;
}
private readonly string primaryCluster;
+ private readonly ICustomStorageInterface customStorage;
private TLogView cached;
private int version;
@@ -114,7 +113,7 @@ protected override async Task ReadAsync()
try
{
// read from storage
- var result = await ((ICustomStorageInterface)Host).ReadStateFromStorage();
+ var result = await customStorage.ReadStateFromStorage();
version = result.Key;
cached = result.Value;
@@ -152,7 +151,7 @@ protected override async Task WriteAsync()
try
{
- writesuccessful = await ((ICustomStorageInterface) Host).ApplyUpdatesToStorage(updates, version);
+ writesuccessful = await customStorage.ApplyUpdatesToStorage(updates, version);
LastPrimaryIssue.Resolve(Host, Services);
}
@@ -197,7 +196,7 @@ protected override async Task WriteAsync()
try
{
- var result = await ((ICustomStorageInterface)Host).ReadStateFromStorage();
+ var result = await customStorage.ReadStateFromStorage();
version = result.Key;
cached = result.Value;
@@ -225,6 +224,12 @@ protected override async Task WriteAsync()
return writesuccessful ? updates.Count : 0;
}
+ ///
+ public override Task> RetrieveLogSegment(int fromVersion, int toVersion)
+ {
+ return customStorage.RetrieveLogSegment(fromVersion, toVersion);
+ }
+
///
/// Describes a connection issue that occurred when updating the primary storage.
///
@@ -279,7 +284,7 @@ public override string ToString()
return string.Format("v{0} ({1} updates)", Version, Updates.Count);
}
}
-
+
private readonly SortedList notifications = new SortedList();
///
@@ -309,7 +314,7 @@ protected override void ProcessNotifications()
var updatenotification = notifications.ElementAt(0).Value;
notifications.RemoveAt(0);
- // Apply all operations in pending
+ // Apply all operations in pending
foreach (var u in updatenotification.Updates)
try
{
@@ -328,7 +333,7 @@ protected override void ProcessNotifications()
Services.Log(LogLevel.Trace, "unprocessed notifications in queue: {0}", notifications.Count);
base.ProcessNotifications();
-
+
}
[Conditional("DEBUG")]
diff --git a/test/Grains/TestGrains/LogTestGrainVariations.cs b/test/Grains/TestGrains/LogTestGrainVariations.cs
index 8a279f75ed..19591d42d9 100644
--- a/test/Grains/TestGrains/LogTestGrainVariations.cs
+++ b/test/Grains/TestGrains/LogTestGrainVariations.cs
@@ -1,5 +1,8 @@
+using Orleans.EventSourcing.CustomStorage;
using Orleans.Providers;
+using Orleans.Runtime;
using Orleans.Serialization;
+using OrleansEventSourcing.CustomStorage;
using UnitTests.GrainInterfaces;
namespace TestGrains
@@ -48,7 +51,7 @@ private ILogTestGrain GetStorageGrain()
}
return storagegrain;
}
-
+
public Task ApplyUpdatesToStorage(IReadOnlyList