diff --git a/Directory.Packages.props b/Directory.Packages.props
index b3c0c71cfe..44eeec7a94 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -54,7 +54,7 @@
-
+
diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs
index 53aaaa8c82..5c66811a63 100644
--- a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs
+++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs
@@ -3,6 +3,7 @@
using System.IO;
using System.Linq;
using System.Threading.Tasks;
+using Amazon;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using Amazon.Runtime;
@@ -19,10 +20,10 @@ namespace Orleans.Streaming.Kinesis
///
/// Queue adapter factory which allows the PersistentStreamProvider to use AWS Kinesis Data Streams as its backend persistent event queue.
///
- public class KinesisAdapterFactory : IQueueAdapterFactory, IQueueAdapter
+ internal class KinesisAdapterFactory : IQueueAdapterFactory, IQueueAdapter
{
private readonly KinesisStreamOptions _options;
- private readonly Serializer _serializer;
+ private readonly Serializer _serializer;
private readonly IStreamQueueCheckpointerFactory _checkpointerFactory;
private readonly ILoggerFactory _loggerFactory;
private readonly IQueueAdapterCache _adapterCache;
@@ -36,7 +37,7 @@ public KinesisAdapterFactory(
string name,
KinesisStreamOptions options,
SimpleQueueCacheOptions cacheOptions,
- Serializer serializer,
+ Serializer serializer,
IStreamQueueCheckpointerFactory checkpointerFactory,
ILoggerFactory loggerFactory
)
@@ -69,10 +70,9 @@ public static KinesisAdapterFactory Create(IServiceProvider services, string nam
{
var streamsConfig = services.GetOptionsByName(name);
var cacheOptions = services.GetOptionsByName(name);
- var serializer = services.GetRequiredService();
- var logger = services.GetRequiredService();
- var grainFactory = services.GetRequiredService();
+ var serializer = services.GetRequiredService>();
var checkpointerFactory = services.GetRequiredKeyedService(name);
+ var logger = services.GetRequiredService();
var factory = ActivatorUtilities.CreateInstance(
services,
@@ -81,9 +81,7 @@ public static KinesisAdapterFactory Create(IServiceProvider services, string nam
cacheOptions,
serializer,
checkpointerFactory,
- logger,
- grainFactory,
- services
+ logger
);
return factory;
@@ -137,7 +135,7 @@ public IQueueAdapterReceiver CreateReceiver(QueueId queueId)
);
}
- private AmazonKinesisClient CreateClient()
+ internal AmazonKinesisClient CreateClient()
{
if (_options.Service.StartsWith("http://", StringComparison.OrdinalIgnoreCase) ||
_options.Service.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
@@ -153,16 +151,16 @@ private AmazonKinesisClient CreateClient()
{
// AWS Kinesis instance (auth via explicit credentials)
var credentials = new BasicAWSCredentials(_options.AccessKey, _options.SecretKey);
- return new AmazonKinesisClient(credentials, new AmazonKinesisConfig { RegionEndpoint = AWSUtils.GetRegionEndpoint(_options.Service) });
+ return new AmazonKinesisClient(credentials, new AmazonKinesisConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(_options.Service) });
}
else
{
// AWS Kinesis instance (implicit auth - EC2 IAM Roles etc)
- return new AmazonKinesisClient(new AmazonKinesisConfig { RegionEndpoint = AWSUtils.GetRegionEndpoint(_options.Service) });
+ return new AmazonKinesisClient(new AmazonKinesisConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(_options.Service) });
}
}
- private async Task GetPartitionIdsAsync()
+ internal async Task GetPartitionIdsAsync()
{
var request = new ListShardsRequest
{
diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs
index 80fd270389..4ce3654716 100644
--- a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs
+++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs
@@ -10,14 +10,14 @@
namespace Orleans.Streaming.Kinesis
{
- public class KinesisAdapterReceiver : IQueueAdapterReceiver
+ internal class KinesisAdapterReceiver : IQueueAdapterReceiver
{
private readonly ILogger _logger;
private readonly AmazonKinesisClient _client;
private readonly string _streamName;
private readonly string _partition;
private readonly IStreamQueueCheckpointerFactory _checkpointerFactory;
- private readonly Serializer _serializer;
+ private readonly Serializer _serializer;
private IStreamQueueCheckpointer _checkpointer;
private string _shardIterator;
@@ -28,7 +28,7 @@ internal KinesisAdapterReceiver(
string streamName,
string partition,
IStreamQueueCheckpointerFactory checkpointerFactory,
- Serializer serializer,
+ Serializer serializer,
ILoggerFactory loggerFactory
)
{
diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs
index 2130934d08..86e13327a7 100644
--- a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs
+++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs
@@ -11,7 +11,7 @@ namespace Orleans.Streaming.Kinesis
{
[Serializable]
[Orleans.GenerateSerializer]
- public class KinesisBatchContainer : IBatchContainer, IComparable
+ internal class KinesisBatchContainer : IBatchContainer, IComparable
{
[JsonProperty]
[Id(0)]
@@ -23,13 +23,13 @@ public class KinesisBatchContainer : IBatchContainer, IComparable Serializer { get; set; }
[JsonProperty]
[Id(1)]
internal KinesisSequenceToken Token { get; }
- private KinesisBatchContainer(Record record, Serializer serializer, long sequenceId)
+ private KinesisBatchContainer(Record record, Serializer serializer, long sequenceId)
{
this.Serializer = serializer;
this._rawRecord = record.Data.ToArray();
@@ -37,6 +37,12 @@ private KinesisBatchContainer(Record record, Serializer serializer, long sequenc
Token = new KinesisSequenceToken(record.SequenceNumber, sequenceId, 0);
}
+ [GeneratedActivatorConstructor]
+ internal KinesisBatchContainer(Serializer serializer)
+ {
+ this.Serializer = serializer;
+ }
+
///
/// Stream identifier for the stream this batch is part of.
///
@@ -47,7 +53,7 @@ private KinesisBatchContainer(Record record, Serializer serializer, long sequenc
///
public StreamSequenceToken SequenceToken => Token;
- private Body GetPayload() => _payload ?? (_payload = this.Serializer.Deserialize(_rawRecord));
+ private Body GetPayload() => _payload ?? (_payload = this.Serializer.Deserialize(_rawRecord));
///
/// Gets events of a specific type from the batch.
@@ -77,7 +83,21 @@ public bool ImportRequestContext()
public int CompareTo(KinesisBatchContainer other)
=> Token.SequenceNumber.CompareTo(other.SequenceToken.SequenceNumber);
- internal static byte[] ToKinesisPayload(Serializer serializer, StreamId streamId, IEnumerable events, Dictionary requestContext)
+ [Serializable]
+ [GenerateSerializer]
+ internal class Body
+ {
+ [Id(0)]
+ public List