Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
vplauzon committed Aug 22, 2024
1 parent 2befcb9 commit 57a4b4c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
19 changes: 19 additions & 0 deletions code/KustoCopyConsole/Kusto/DbCommandClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@ public DbCommandClient(
_databaseName = databaseName;
}

public async Task<int> GetExportCapacityAsync()
{
return await _commandQueue.RequestRunAsync(
KustoDbPriority.HighestPriority,
async () =>
{
var commandText = @"
.show capacity
| where Resource == 'DataExport'
| project Total";
var reader = await _provider.ExecuteControlCommandAsync(
_databaseName,
commandText);
var exportCapacity = (long)reader.ToDataSet().Tables[0].Rows[0][0];

return (int)exportCapacity;
});
}

public async Task<string> ExportBlockAsync(
IImmutableList<Uri> storageRoots,
string tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,15 @@ private async Task<ClusterCache> FetchClusterCacheAsync(
return new ClusterCache(DateTime.Now, exportRootUris, exportCapacity);
}

private Task<int> FetchExportCapacityAsync(RowItem blockItem, CancellationToken ct)
private async Task<int> FetchExportCapacityAsync(RowItem blockItem, CancellationToken ct)
{
throw new NotImplementedException();
var tableIdentity = blockItem.GetSourceTableIdentity();
var client = _dbClientFactory.GetDbCommandClient(
tableIdentity.ClusterUri,
tableIdentity.DatabaseName);
var exportCapacity = await client.GetExportCapacityAsync();

return exportCapacity;
}

private async Task<IImmutableList<Uri>> FetchExportRootUrisAsync(
Expand Down Expand Up @@ -345,7 +351,7 @@ private async Task OnExportingAsync(CancellationToken ct)
{
var clusterCache = await clusterQueue.GetClusterCacheAsync(ct);

while (clusterQueue.GetOperationIDCount() > clusterCache.ExportCapacity
while (clusterQueue.GetOperationIDCount() < clusterCache.ExportCapacity
&& clusterQueue.TryPeekExport(out var blockItem))
{
var tableIdentity = blockItem.GetSourceTableIdentity();
Expand Down

0 comments on commit 57a4b4c

Please sign in to comment.