Skip to content

Commit 58d83c3

Browse files
committed
.
1 parent d746e57 commit 58d83c3

10 files changed

+99
-391
lines changed

code/KustoCopyConsole/Entity/InMemory/ActivityFlatHierarchy.cs

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,7 @@ namespace KustoCopyConsole.Entity.InMemory
1010
internal record ActivityFlatHierarchy(
1111
ActivityRowItem Activity,
1212
IterationRowItem Iteration,
13-
BlockRowItem BlockItem);
13+
TempTableRowItem? TempTable,
14+
BlockRowItem Block,
15+
IEnumerable<UrlRowItem> Urls);
1416
}

code/KustoCopyConsole/Entity/InMemory/RowItemInMemoryCache.cs

+5-2
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,15 @@ public IEnumerable<ActivityFlatHierarchy> GetActivityFlatHierarchy(
4242
.SelectMany(a => a.IterationMap.Values.Where(i => iterationPredicate(i)).Select(i => new
4343
{
4444
Activity = a,
45-
Iteration = i
45+
Iteration = i,
46+
TempTableItem = i.TempTable
4647
}))
4748
.SelectMany(o => o.Iteration.BlockMap.Values.Select(b => new ActivityFlatHierarchy(
4849
o.Activity.RowItem,
4950
o.Iteration.RowItem,
50-
b.RowItem)));
51+
o.TempTableItem,
52+
b.RowItem,
53+
b.UrlMap.Values.Select(u => u.RowItem))));
5154
}
5255

5356
public IEnumerable<RowItemBase> GetItems()

code/KustoCopyConsole/Runner/AwaitExportedRunner.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ private IEnumerable<ClusterBlocks> GetClusterBlocks()
6060
a => a.RowItem.State != ActivityState.Completed,
6161
i => i.RowItem.State != IterationState.Completed);
6262
var exportingBlocks = hierarchy
63-
.Where(h => h.BlockItem.State == BlockState.Exporting);
63+
.Where(h => h.Block.State == BlockState.Exporting);
6464
var clusterBlocks = exportingBlocks
6565
.GroupBy(h => h.Activity.SourceTable.ClusterUri)
6666
.Select(g => new ClusterBlocks(
6767
g.Key,
68-
g.Select(h => h.BlockItem).OrderBy(b => b.Updated).Take(MAX_OPERATIONS)));
68+
g.Select(h => h.Block).OrderBy(b => b.Updated).Take(MAX_OPERATIONS)));
6969

7070
return clusterBlocks;
7171
}

code/KustoCopyConsole/Runner/AwaitIngestRunner.cs

+8-11
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public async Task RunAsync(CancellationToken ct)
2626
a => a.RowItem.State != ActivityState.Completed,
2727
i => i.RowItem.State != IterationState.Completed);
2828
var queuedBlocks = allBlocks
29-
.Where(h => h.BlockItem.State == BlockState.Queued);
29+
.Where(h => h.Block.State == BlockState.Queued);
3030
var ingestionTasks = queuedBlocks
3131
.Select(h => UpdateQueuedBlockAsync(h, ct))
3232
.ToImmutableArray();
@@ -42,19 +42,16 @@ private async Task UpdateQueuedBlockAsync(
4242
ActivityFlatHierarchy item,
4343
CancellationToken ct)
4444
{
45-
var iterationCache = RowItemGateway.InMemoryCache
46-
.ActivityMap[item.Activity.ActivityName]
47-
.IterationMap[item.Iteration.IterationId];
48-
var tempTableName = iterationCache.TempTable!.TempTableName;
49-
var targetRowCount = iterationCache.BlockMap[item.BlockItem.BlockId].UrlMap.Values
50-
.Sum(u => u.RowItem.RowCount);
45+
var targetRowCount = item
46+
.Urls
47+
.Sum(u => u.RowCount);
5148
var dbClient = DbClientFactory.GetDbCommandClient(
5249
item.Activity.DestinationTable.ClusterUri,
5350
item.Activity.DestinationTable.DatabaseName);
5451
var rowCount = await dbClient.GetExtentRowCountAsync(
55-
new KustoPriority(item.BlockItem.GetIterationKey()),
56-
tempTableName,
57-
item.BlockItem.BlockTag,
52+
new KustoPriority(item.Block.GetIterationKey()),
53+
item.TempTable!.TempTableName,
54+
item.Block.BlockTag,
5855
ct);
5956

6057
if (rowCount > targetRowCount)
@@ -65,7 +62,7 @@ private async Task UpdateQueuedBlockAsync(
6562
}
6663
if (rowCount == targetRowCount)
6764
{
68-
var newBlockItem = item.BlockItem.ChangeState(BlockState.Ingested);
65+
var newBlockItem = item.Block.ChangeState(BlockState.Ingested);
6966

7067
RowItemGateway.Append(newBlockItem);
7168
}

code/KustoCopyConsole/Runner/ExportingRunner.cs

+8-8
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,21 @@ async Task ProcessBlockAsync(
5454
item.Activity.SourceTable.DatabaseName);
5555
var folderPath = $"activities/{item.Activity.ActivityName}" +
5656
$"iterations/{item.Iteration.IterationId:D20}" +
57-
$"/blocks/{item.BlockItem.BlockId:D20}";
57+
$"/blocks/{item.Block.BlockId:D20}";
5858
var writableUris = await StagingBlobUriProvider.GetWritableFolderUrisAsync(
5959
folderPath,
6060
ct);
6161
var query = Parameterization.Activities[item.Activity.ActivityName].KqlQuery;
6262
var operationId = await dbClient.ExportBlockAsync(
63-
new KustoPriority(item.BlockItem.GetIterationKey()),
63+
new KustoPriority(item.Block.GetIterationKey()),
6464
writableUris,
6565
query,
6666
item.Iteration.CursorStart,
6767
item.Iteration.CursorEnd,
68-
item.BlockItem.IngestionTimeStart,
69-
item.BlockItem.IngestionTimeEnd,
68+
item.Block.IngestionTimeStart,
69+
item.Block.IngestionTimeEnd,
7070
ct);
71-
var blockItem = item.BlockItem.ChangeState(BlockState.Exporting);
71+
var blockItem = item.Block.ChangeState(BlockState.Exporting);
7272

7373
blockItem.OperationId = operationId;
7474
RowItemGateway.Append(blockItem);
@@ -96,8 +96,8 @@ private async Task<IEnumerable<ActivityFlatHierarchy>> GetExportLineUpAsync(
9696
.Select(g => new
9797
{
9898
ClusterUri = g.Key,
99-
ExportingCount = g.Count(h => h.BlockItem.State == BlockState.Exporting),
100-
Candidates = g.Where(h => h.BlockItem.State == BlockState.Planned)
99+
ExportingCount = g.Count(h => h.Block.State == BlockState.Exporting),
100+
Candidates = g.Where(h => h.Block.State == BlockState.Planned)
101101
})
102102
// Keep only clusters with candidates
103103
.Where(o => o.Candidates.Any())
@@ -119,7 +119,7 @@ private async Task<IEnumerable<ActivityFlatHierarchy>> GetExportLineUpAsync(
119119
.Where(o => o.ExportingAvailability > 0)
120120
// Select candidates by priority
121121
.SelectMany(o => o.Candidates
122-
.OrderBy(c => new KustoPriority(c.BlockItem.GetIterationKey()))
122+
.OrderBy(c => new KustoPriority(c.Block.GetIterationKey()))
123123
.Take(o.ExportingAvailability))
124124
.ToImmutableArray();
125125

0 commit comments

Comments
 (0)