1
+ using Kusto . Ingest ;
2
+ using KustoCopyConsole . Entity . InMemory ;
3
+ using KustoCopyConsole . Entity . RowItems ;
4
+ using KustoCopyConsole . Entity . State ;
5
+ using KustoCopyConsole . JobParameter ;
6
+ using KustoCopyConsole . Kusto ;
7
+ using KustoCopyConsole . Storage ;
8
+ using System . Collections . Immutable ;
9
+
10
+ namespace KustoCopyConsole . Runner
11
+ {
12
+ internal class QueueIngestRunner : RunnerBase
13
+ {
14
+
15
+ public QueueIngestRunner (
16
+ MainJobParameterization parameterization ,
17
+ RowItemGateway rowItemGateway ,
18
+ DbClientFactory dbClientFactory )
19
+ : base ( parameterization , rowItemGateway , dbClientFactory , TimeSpan . FromSeconds ( 5 ) )
20
+ {
21
+ }
22
+
23
+ public async Task RunAsync ( CancellationToken ct )
24
+ {
25
+ while ( ! AllActivitiesCompleted ( ) )
26
+ {
27
+ var allBlocks = RowItemGateway . InMemoryCache . GetActivityFlatHierarchy (
28
+ a => a . RowItem . State != ActivityState . Completed ,
29
+ i => i . RowItem . State != IterationState . Completed ) ;
30
+ var exportedBlocks = allBlocks
31
+ . Where ( h => h . BlockItem . State == BlockState . Exported ) ;
32
+ var ingestionTasks = exportedBlocks
33
+ . Select ( h => QueueIngestBlockAsync ( h , ct ) ) ;
34
+
35
+ await Task . WhenAll ( ingestionTasks ) ;
36
+
37
+ // Sleep
38
+ await SleepAsync ( ct ) ;
39
+ }
40
+ }
41
+
42
+ private async Task QueueIngestBlockAsync ( ActivityFlatHierarchy item , CancellationToken ct )
43
+ {
44
+ var iterationCache = RowItemGateway . InMemoryCache
45
+ . ActivityMap [ item . Activity . ActivityName ]
46
+ . IterationMap [ item . Iteration . IterationId ] ;
47
+
48
+ // It's possible, although unlikely, the temp table hasn't been created yet
49
+ // If so, we'll process this block later
50
+ if ( iterationCache . TempTable != null )
51
+ {
52
+ var urlItems = iterationCache
53
+ . BlockMap [ item . BlockItem . BlockId ]
54
+ . UrlMap
55
+ . Values
56
+ . Select ( u => u . RowItem ) ;
57
+ var ingestClient = DbClientFactory . GetIngestClient (
58
+ item . Activity . DestinationTable . ClusterUri ,
59
+ item . Activity . DestinationTable . DatabaseName ,
60
+ iterationCache . TempTable . TempTableName ) ;
61
+ var blockTag = $ "kusto-copy:{ Guid . NewGuid ( ) } ";
62
+ var uriTasks = urlItems
63
+ . Select ( u => StagingBlobUriProvider . AuthorizeUriAsync ( new Uri ( u . Url ) , ct ) )
64
+ . ToImmutableArray ( ) ;
65
+
66
+ await Task . WhenAll ( uriTasks ) ;
67
+
68
+ var queueTasks = uriTasks
69
+ . Select ( t => ingestClient . QueueBlobAsync ( t . Result , blockTag , ct ) )
70
+ . ToImmutableArray ( ) ;
71
+
72
+ await Task . WhenAll ( queueTasks ) ;
73
+
74
+ var newBlockItem = item . BlockItem . ChangeState ( BlockState . Queued ) ;
75
+
76
+ newBlockItem . BlockTag = blockTag ;
77
+ RowItemGateway . Append ( newBlockItem ) ;
78
+ }
79
+ }
80
+ }
81
+ }
0 commit comments