Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add CancellationToken to most storage interfaces #8584

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace Orleans.Clustering.DynamoDB
Expand All @@ -33,7 +34,7 @@ public DynamoDBGatewayListProvider(
this.MaxStaleness = gatewayOptions.Value.GatewayListRefreshPeriod;
}

public Task InitializeGatewayListProvider()
public Task InitializeGatewayListProvider(CancellationToken cancellationToken)
{
this.storage = new DynamoDBStorage(
this.logger,
Expand All @@ -58,10 +59,11 @@ public Task InitializeGatewayListProvider()
{
new AttributeDefinition { AttributeName = SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S },
new AttributeDefinition { AttributeName = SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }
});
},
cancellationToken: cancellationToken);
}

public async Task<IList<Uri>> GetGateways()
public async Task<IList<Uri>> GetGateways(CancellationToken cancellationToken)
{
var expressionValues = new Dictionary<string, AttributeValue>
{
Expand All @@ -82,7 +84,8 @@ public async Task<IList<Uri>> GetGateways()
IPAddress.Parse(gateway[SiloInstanceRecord.ADDRESS_PROPERTY_NAME].S),
int.Parse(gateway[SiloInstanceRecord.PROXY_PORT_PROPERTY_NAME].N),
int.Parse(gateway[SiloInstanceRecord.GENERATION_PROPERTY_NAME].N)).ToGatewayUri();
});
},
cancellationToken);

return records;
}
Expand All @@ -93,5 +96,8 @@ public bool IsUpdatable
{
get { return true; }
}

public Task InitializeGatewayListProvider() => InitializeGatewayListProvider(CancellationToken.None);
public Task<IList<Uri>> GetGateways() => GetGateways(CancellationToken.None);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Orleans.Clustering.DynamoDB
Expand All @@ -37,7 +38,16 @@ public DynamoDBMembershipTable(
this.clusterId = clusterOptions.Value.ClusterId;
}

public async Task InitializeMembershipTable(bool tryInitTableVersion)
public Task InitializeMembershipTable(bool tryInitTableVersion) => InitializeMembershipTable(tryInitTableVersion, CancellationToken.None);
public Task DeleteMembershipTableEntries(string clusterId) => DeleteMembershipTableEntries(clusterId, CancellationToken.None);
public Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate) => CleanupDefunctSiloEntries(beforeDate, CancellationToken.None);
public Task<MembershipTableData> ReadRow(SiloAddress key) => ReadRow(key, CancellationToken.None);
public Task<MembershipTableData> ReadAll() => ReadAll(CancellationToken.None);
public Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersion) => InsertRow(entry, tableVersion, CancellationToken.None);
public Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion) => UpdateRow(entry, etag, tableVersion, CancellationToken.None);
public Task UpdateIAmAlive(MembershipEntry entry) => UpdateIAmAlive(entry, CancellationToken.None);

public async Task InitializeMembershipTable(bool tryInitTableVersion, CancellationToken cancellationToken)
{
this.storage = new DynamoDBStorage(
this.logger,
Expand All @@ -53,7 +63,8 @@ public async Task InitializeMembershipTable(bool tryInitTableVersion)
this.options.UpdateIfExists);

logger.LogInformation((int)ErrorCode.MembershipBase, "Initializing AWS DynamoDB Membership Table");
await storage.InitializeTable(this.options.TableName,
await storage.InitializeTable(
this.options.TableName,
new List<KeySchemaElement>
{
new KeySchemaElement { AttributeName = SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME, KeyType = KeyType.HASH },
Expand All @@ -63,28 +74,29 @@ await storage.InitializeTable(this.options.TableName,
{
new AttributeDefinition { AttributeName = SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME, AttributeType = ScalarAttributeType.S },
new AttributeDefinition { AttributeName = SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME, AttributeType = ScalarAttributeType.S }
});
},
cancellationToken: cancellationToken);

// even if I am not the one who created the table,
// try to insert an initial table version if it is not already there,
// so we always have a first table version row, before this silo starts working.
if (tryInitTableVersion)
{
// ignore return value, since we don't care if I inserted it or not, as long as it is in there.
bool created = await TryCreateTableVersionEntryAsync();
bool created = await TryCreateTableVersionEntryAsync(cancellationToken);
if(created) logger.LogInformation("Created new table version row.");
}
}

private async Task<bool> TryCreateTableVersionEntryAsync()
private async Task<bool> TryCreateTableVersionEntryAsync(CancellationToken cancellationToken)
{
var keys = new Dictionary<string, AttributeValue>
{
{ $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", new AttributeValue(this.clusterId) },
{ $"{SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME}", new AttributeValue(SiloInstanceRecord.TABLE_VERSION_ROW) }
};

var versionRow = await storage.ReadSingleEntryAsync(this.options.TableName, keys, fields => new SiloInstanceRecord(fields));
var versionRow = await storage.ReadSingleEntryAsync(this.options.TableName, keys, fields => new SiloInstanceRecord(fields), cancellationToken);
if (versionRow != null)
{
return false;
Expand All @@ -99,7 +111,7 @@ private async Task<bool> TryCreateTableVersionEntryAsync()
$"attribute_not_exists({SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}) AND attribute_not_exists({SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME})";
try
{
await storage.PutEntryAsync(this.options.TableName, entry.GetFields(true), notExistConditionExpression);
await storage.PutEntryAsync(this.options.TableName, entry.GetFields(true), notExistConditionExpression, cancellationToken: cancellationToken);
}
catch (ConditionalCheckFailedException)
{
Expand Down Expand Up @@ -136,12 +148,17 @@ private bool TryCreateTableVersionRecord(int version, string etag, out SiloInsta
return true;
}

public async Task DeleteMembershipTableEntries(string clusterId)
public async Task DeleteMembershipTableEntries(string clusterId, CancellationToken cancellationToken)
{
try
{
var keys = new Dictionary<string, AttributeValue> { { $":{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", new AttributeValue(clusterId) } };
var records = await storage.QueryAsync(this.options.TableName, keys, $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} = :{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", item => new SiloInstanceRecord(item));
var records = await storage.QueryAsync(
this.options.TableName,
keys,
$"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} = :{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}",
item => new SiloInstanceRecord(item),
cancellationToken: cancellationToken);

var toDelete = new List<Dictionary<string, AttributeValue>>();
foreach (var record in records.results)
Expand All @@ -152,7 +169,7 @@ public async Task DeleteMembershipTableEntries(string clusterId)
List<Task> tasks = new List<Task>();
foreach (var batch in toDelete.BatchIEnumerable(MAX_BATCH_SIZE))
{
tasks.Add(storage.DeleteEntriesAsync(this.options.TableName, batch));
tasks.Add(storage.DeleteEntriesAsync(this.options.TableName, batch, cancellationToken));
}
await Task.WhenAll(tasks);
}
Expand All @@ -168,7 +185,7 @@ public async Task DeleteMembershipTableEntries(string clusterId)
}
}

public async Task<MembershipTableData> ReadRow(SiloAddress siloAddress)
public async Task<MembershipTableData> ReadRow(SiloAddress siloAddress, CancellationToken cancellationToken)
{
try
{
Expand All @@ -184,8 +201,11 @@ public async Task<MembershipTableData> ReadRow(SiloAddress siloAddress)
{ $"{SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME}", new AttributeValue(SiloInstanceRecord.TABLE_VERSION_ROW) }
};

var entries = await storage.GetEntriesTxAsync(this.options.TableName,
new[] {siloEntryKeys, versionEntryKeys}, fields => new SiloInstanceRecord(fields));
var entries = await storage.GetEntriesTxAsync(
this.options.TableName,
new[] { siloEntryKeys, versionEntryKeys },
fields => new SiloInstanceRecord(fields),
cancellationToken);

MembershipTableData data = Convert(entries.ToList());
if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace("Read my entry {SiloAddress} Table: {TableData}", siloAddress.ToString(), data.ToString());
Expand All @@ -203,7 +223,7 @@ public async Task<MembershipTableData> ReadRow(SiloAddress siloAddress)
}
}

public async Task<MembershipTableData> ReadAll()
public async Task<MembershipTableData> ReadAll(CancellationToken cancellationToken)
{
try
{
Expand All @@ -213,21 +233,29 @@ public async Task<MembershipTableData> ReadAll()
{ $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", new AttributeValue(this.clusterId) },
{ $"{SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME}", new AttributeValue(SiloInstanceRecord.TABLE_VERSION_ROW) }
};
var versionRow = await this.storage.ReadSingleEntryAsync(this.options.TableName, versionEntryKeys,
fields => new SiloInstanceRecord(fields));
var versionRow = await this.storage.ReadSingleEntryAsync(
this.options.TableName,
versionEntryKeys,
fields => new SiloInstanceRecord(fields),
cancellationToken);
if (versionRow == null)
{
throw new KeyNotFoundException("No version row found for membership table");
}

var keys = new Dictionary<string, AttributeValue> { { $":{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", new AttributeValue(this.clusterId) } };
var records = await this.storage.QueryAllAsync(this.options.TableName, keys, $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} = :{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", item => new SiloInstanceRecord(item));
var records = await this.storage.QueryAllAsync(
this.options.TableName,
keys,
$"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} = :{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}",
item => new SiloInstanceRecord(item),
cancellationToken: cancellationToken);

if (records.Any(record => record.MembershipVersion > versionRow.MembershipVersion))
{
this.logger.LogWarning((int)ErrorCode.MembershipBase, "Found an inconsistency while reading all silo entries");
//not expecting this to hit often, but if it does, should put in a limit
return await this.ReadAll();
return await this.ReadAll(cancellationToken);
}

MembershipTableData data = Convert(records);
Expand All @@ -246,7 +274,7 @@ public async Task<MembershipTableData> ReadAll()
}
}

public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersion)
public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersion, CancellationToken cancellationToken)
{
try
{
Expand Down Expand Up @@ -289,7 +317,7 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi
(versionEntryUpdate.UpdateExpression, versionEntryUpdate.ExpressionAttributeValues) =
this.storage.ConvertUpdate(versionEntry.GetFields(), conditionalValues);

await this.storage.WriteTxAsync(new[] {tableEntryInsert}, new[] {versionEntryUpdate});
await this.storage.WriteTxAsync(new[] {tableEntryInsert}, new[] {versionEntryUpdate}, cancellationToken: cancellationToken);

result = true;
}
Expand Down Expand Up @@ -323,7 +351,7 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi
}
}

public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion)
public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion, CancellationToken cancellationToken)
{
try
{
Expand Down Expand Up @@ -380,7 +408,7 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi
(versionEntryUpdate.UpdateExpression, versionEntryUpdate.ExpressionAttributeValues) =
this.storage.ConvertUpdate(versionEntry.GetFields(), versionConditionalValues);

await this.storage.WriteTxAsync(updates: new[] {siloEntryUpdate, versionEntryUpdate});
await this.storage.WriteTxAsync(updates: new[] {siloEntryUpdate, versionEntryUpdate}, cancellationToken: cancellationToken);
result = true;
}
catch (TransactionCanceledException canceledException)
Expand Down Expand Up @@ -415,15 +443,15 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi
}
}

public async Task UpdateIAmAlive(MembershipEntry entry)
public async Task UpdateIAmAlive(MembershipEntry entry, CancellationToken cancellationToken)
{
try
{
if (this.logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("Merge entry = {Entry}", entry.ToString());
var siloEntry = ConvertPartial(entry);
var fields = new Dictionary<string, AttributeValue> { { SiloInstanceRecord.I_AM_ALIVE_TIME_PROPERTY_NAME, new AttributeValue(siloEntry.IAmAliveTime) } };
var expression = $"attribute_exists({SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}) AND attribute_exists({SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME})";
await this.storage.UpsertEntryAsync(this.options.TableName, siloEntry.GetKeys(),fields, expression);
await this.storage.UpsertEntryAsync(this.options.TableName, siloEntry.GetKeys(),fields, expression, cancellationToken: cancellationToken);
}
catch (Exception exc)
{
Expand Down Expand Up @@ -589,7 +617,7 @@ private SiloInstanceRecord ConvertPartial(MembershipEntry memEntry)
};
}

public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate, CancellationToken cancellationToken)
{
try
{
Expand All @@ -599,13 +627,18 @@ public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
};
var filter = $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} = :{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}";

var records = await this.storage.QueryAllAsync(this.options.TableName, keys, filter, item => new SiloInstanceRecord(item));
var records = await this.storage.QueryAllAsync(
this.options.TableName,
keys,
filter,
item => new SiloInstanceRecord(item),
cancellationToken: cancellationToken);
var defunctRecordKeys = records.Where(r => SiloIsDefunct(r, beforeDate)).Select(r => r.GetKeys());

var tasks = new List<Task>();
foreach (var batch in defunctRecordKeys.BatchIEnumerable(MAX_BATCH_SIZE))
{
tasks.Add(this.storage.DeleteEntriesAsync(this.options.TableName, batch));
tasks.Add(this.storage.DeleteEntriesAsync(this.options.TableName, batch, cancellationToken));
}
await Task.WhenAll(tasks);
}
Expand Down
Loading