Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
vplauzon committed Jan 17, 2025
1 parent c0aca2e commit 5a752b0
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions code/KustoCopyConsole/Runner/AwaitExportedRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ namespace KustoCopyConsole.Runner
{
internal class AwaitExportedRunner : RunnerBase
{
#region Inner Types
private record ClusterBlocks(Uri ClusterUri, IEnumerable<BlockRowItem> BlockItems);
#endregion

public AwaitExportedRunner(
MainJobParameterization parameterization,
RowItemGateway rowItemGateway,
Expand All @@ -26,11 +30,59 @@ public async Task RunAsync(CancellationToken ct)
CleanCompletingExports();
while (!AllActivitiesCompleted())
{
var clusterBlocks = GetClusterBlocks();
var tasks = clusterBlocks
.Select(o => UpdateOperationsAsync(o.ClusterUri, o.BlockItems, ct))
.ToImmutableArray();

await Task.WhenAll(tasks);
// Sleep
await SleepAsync(ct);
}
}

private IEnumerable<ClusterBlocks> GetClusterBlocks()
{
var exportingBlocks = RowItemGateway.InMemoryCache
.ActivityMap
.Values
.Where(a => a.RowItem.State != ActivityState.Completed)
.SelectMany(a => a.IterationMap.Values)
.Where(i => i.RowItem.State != IterationState.Completed)
.SelectMany(i => i.BlockMap.Values)
.Select(b => b.RowItem)
.Where(b => b.State == BlockState.Exporting);
var clusterBlocks = exportingBlocks
.Select(b => new
{
RowItemGateway.InMemoryCache
.ActivityMap[b.ActivityName]
.RowItem
.SourceTable
.ClusterUri,
BlockItem = b
})
.GroupBy(o => o.ClusterUri, o => o.BlockItem)
.Select(g => new ClusterBlocks(g.Key, g.ToImmutableArray()));
return clusterBlocks;
}

private async Task UpdateOperationsAsync(
Uri clusterUri,
IEnumerable<BlockRowItem> blockItems,
CancellationToken ct)
{
var dbClient = DbClientFactory.GetDbCommandClient(clusterUri, string.Empty);
var operationIdMap = blockItems
.ToImmutableDictionary(b => b.OperationId);
var status = await dbClient.ShowOperationsAsync(
KustoPriority.HighestPriority,
operationIdMap.Keys,
ct);

throw new NotImplementedException();
}

private void CleanCompletingExports()
{
var completingBlocks = RowItemGateway.InMemoryCache
Expand Down

0 comments on commit 5a752b0

Please sign in to comment.