Skip to content

Commit d746e57

Browse files
committed
.
1 parent 85f8986 commit d746e57

File tree

4 files changed

+80
-77
lines changed

4 files changed

+80
-77
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
using KustoCopyConsole.Entity.InMemory;
2+
using KustoCopyConsole.Entity.RowItems;
3+
using KustoCopyConsole.Entity.State;
4+
using KustoCopyConsole.JobParameter;
5+
using KustoCopyConsole.Kusto;
6+
using KustoCopyConsole.Storage;
7+
using System.Collections.Immutable;
8+
9+
namespace KustoCopyConsole.Runner
10+
{
11+
internal class AwaitIngestRunner : RunnerBase
12+
{
13+
public AwaitIngestRunner(
14+
MainJobParameterization parameterization,
15+
RowItemGateway rowItemGateway,
16+
DbClientFactory dbClientFactory)
17+
: base(parameterization, rowItemGateway, dbClientFactory, TimeSpan.FromSeconds(10))
18+
{
19+
}
20+
21+
public async Task RunAsync(CancellationToken ct)
22+
{
23+
while (!AllActivitiesCompleted())
24+
{
25+
var allBlocks = RowItemGateway.InMemoryCache.GetActivityFlatHierarchy(
26+
a => a.RowItem.State != ActivityState.Completed,
27+
i => i.RowItem.State != IterationState.Completed);
28+
var queuedBlocks = allBlocks
29+
.Where(h => h.BlockItem.State == BlockState.Queued);
30+
var ingestionTasks = queuedBlocks
31+
.Select(h => UpdateQueuedBlockAsync(h, ct))
32+
.ToImmutableArray();
33+
34+
await Task.WhenAll(ingestionTasks);
35+
36+
// Sleep
37+
await SleepAsync(ct);
38+
}
39+
}
40+
41+
private async Task UpdateQueuedBlockAsync(
42+
ActivityFlatHierarchy item,
43+
CancellationToken ct)
44+
{
45+
var iterationCache = RowItemGateway.InMemoryCache
46+
.ActivityMap[item.Activity.ActivityName]
47+
.IterationMap[item.Iteration.IterationId];
48+
var tempTableName = iterationCache.TempTable!.TempTableName;
49+
var targetRowCount = iterationCache.BlockMap[item.BlockItem.BlockId].UrlMap.Values
50+
.Sum(u => u.RowItem.RowCount);
51+
var dbClient = DbClientFactory.GetDbCommandClient(
52+
item.Activity.DestinationTable.ClusterUri,
53+
item.Activity.DestinationTable.DatabaseName);
54+
var rowCount = await dbClient.GetExtentRowCountAsync(
55+
new KustoPriority(item.BlockItem.GetIterationKey()),
56+
tempTableName,
57+
item.BlockItem.BlockTag,
58+
ct);
59+
60+
if (rowCount > targetRowCount)
61+
{
62+
throw new CopyException(
63+
$"Target row count is {targetRowCount} while we observe {rowCount}",
64+
false);
65+
}
66+
if (rowCount == targetRowCount)
67+
{
68+
var newBlockItem = item.BlockItem.ChangeState(BlockState.Ingested);
69+
70+
RowItemGateway.Append(newBlockItem);
71+
}
72+
}
73+
}
74+
}

code/KustoCopyConsole/Runner/Legacy/AwaitIngestRunner.cs

-75
This file was deleted.

code/KustoCopyConsole/Runner/MainRunner.cs

+4-1
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,16 @@ public async Task RunAsync(CancellationToken ct)
9090
new AwaitExportedRunner(Parameterization, RowItemGateway, DbClientFactory);
9191
var queueIngestRunner =
9292
new QueueIngestRunner(Parameterization, RowItemGateway, DbClientFactory);
93+
var awaitIngestRunner =
94+
new AwaitIngestRunner(Parameterization, RowItemGateway, DbClientFactory);
9395

9496
await Task.WhenAll(
9597
iterationRunner.RunAsync(ct),
9698
tempTableRunner.RunAsync(ct),
9799
exportingRunner.RunAsync(ct),
98100
awaitExportedRunner.RunAsync(ct),
99-
queueIngestRunner.RunAsync(ct));
101+
queueIngestRunner.RunAsync(ct),
102+
awaitIngestRunner.RunAsync(ct));
100103
}
101104
}
102105

code/KustoCopyConsole/Runner/QueueIngestRunner.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public async Task RunAsync(CancellationToken ct)
3030
var exportedBlocks = allBlocks
3131
.Where(h => h.BlockItem.State == BlockState.Exported);
3232
var ingestionTasks = exportedBlocks
33-
.Select(h => QueueIngestBlockAsync(h, ct));
33+
.Select(h => QueueIngestBlockAsync(h, ct))
34+
.ToImmutableArray();
3435

3536
await Task.WhenAll(ingestionTasks);
3637

0 commit comments

Comments
 (0)