diff --git a/code/KustoCopyConsole/Kusto/DbCommandClient.cs b/code/KustoCopyConsole/Kusto/DbCommandClient.cs index bc1c5ff..90284a4 100755 --- a/code/KustoCopyConsole/Kusto/DbCommandClient.cs +++ b/code/KustoCopyConsole/Kusto/DbCommandClient.cs @@ -23,6 +23,25 @@ public DbCommandClient( _databaseName = databaseName; } + public async Task GetExportCapacityAsync() + { + return await _commandQueue.RequestRunAsync( + KustoDbPriority.HighestPriority, + async () => + { + var commandText = @" +.show capacity +| where Resource == 'DataExport' +| project Total"; + var reader = await _provider.ExecuteControlCommandAsync( + _databaseName, + commandText); + var exportCapacity = (long)reader.ToDataSet().Tables[0].Rows[0][0]; + + return (int)exportCapacity; + }); + } + public async Task ExportBlockAsync( IImmutableList storageRoots, string tableName, diff --git a/code/KustoCopyConsole/Orchestration/SourceTableExportingOrchestration.cs b/code/KustoCopyConsole/Orchestration/SourceTableExportingOrchestration.cs index 66e8139..0da3251 100755 --- a/code/KustoCopyConsole/Orchestration/SourceTableExportingOrchestration.cs +++ b/code/KustoCopyConsole/Orchestration/SourceTableExportingOrchestration.cs @@ -179,9 +179,15 @@ private async Task FetchClusterCacheAsync( return new ClusterCache(DateTime.Now, exportRootUris, exportCapacity); } - private Task FetchExportCapacityAsync(RowItem blockItem, CancellationToken ct) + private async Task FetchExportCapacityAsync(RowItem blockItem, CancellationToken ct) { - throw new NotImplementedException(); + var tableIdentity = blockItem.GetSourceTableIdentity(); + var client = _dbClientFactory.GetDbCommandClient( + tableIdentity.ClusterUri, + tableIdentity.DatabaseName); + var exportCapacity = await client.GetExportCapacityAsync(); + + return exportCapacity; } private async Task> FetchExportRootUrisAsync( @@ -345,7 +351,7 @@ private async Task OnExportingAsync(CancellationToken ct) { var clusterCache = await clusterQueue.GetClusterCacheAsync(ct); - while (clusterQueue.GetOperationIDCount() > clusterCache.ExportCapacity + while (clusterQueue.GetOperationIDCount() < clusterCache.ExportCapacity && clusterQueue.TryPeekExport(out var blockItem)) { var tableIdentity = blockItem.GetSourceTableIdentity();