1
+ using KustoCopyConsole . Entity ;
2
+ using KustoCopyConsole . Entity . InMemory ;
3
+ using KustoCopyConsole . Entity . State ;
4
+ using KustoCopyConsole . JobParameter ;
5
+ using KustoCopyConsole . Kusto ;
6
+ using KustoCopyConsole . Storage ;
7
+ using System ;
8
+ using System . Collections . Generic ;
9
+ using System . Collections . Immutable ;
10
+ using System . Linq ;
11
+ using System . Text ;
12
+ using System . Threading . Tasks ;
13
+
14
+ namespace KustoCopyConsole . Orchestration
15
+ {
16
+ /// <summary>
17
+ /// This orchestration is responsible to manage the life cycle of a destination
18
+ /// table, i.e. create a staging table and drop the staging table.
19
+ /// </summary>
20
+ internal class DestinationTableLifeCycleOrchestration : SubOrchestrationBase
21
+ {
22
+ public DestinationTableLifeCycleOrchestration (
23
+ RowItemGateway rowItemGateway ,
24
+ DbClientFactory dbClientFactory ,
25
+ MainJobParameterization parameterization )
26
+ : base ( rowItemGateway , dbClientFactory , parameterization )
27
+ {
28
+ }
29
+
30
+ protected override async Task OnStartProcessAsync ( CancellationToken ct )
31
+ {
32
+ var cache = RowItemGateway . InMemoryCache ;
33
+
34
+ foreach ( var sourceTable in cache . SourceTableMap . Values )
35
+ {
36
+ foreach ( var iterationTable in sourceTable . IterationMap . Values )
37
+ {
38
+ if ( iterationTable . RowItem . ParseState < SourceTableState > ( ) !=
39
+ SourceTableState . Completed )
40
+ {
41
+ BackgroundTaskContainer . AddTask (
42
+ EnsureStagingTableAsync ( iterationTable . RowItem , ct ) ) ;
43
+ }
44
+ }
45
+ }
46
+ await Task . CompletedTask ;
47
+ }
48
+
49
+ private async Task EnsureStagingTableAsync (
50
+ RowItem sourceTableIterationItem ,
51
+ CancellationToken ct )
52
+ {
53
+ var sourceTableIdentity = sourceTableIterationItem . GetSourceTableIdentity ( ) ;
54
+ var destinationTableIdentities = MapSourceToDestinations ( sourceTableIdentity ) ;
55
+
56
+ await Task . CompletedTask ;
57
+ throw new NotImplementedException ( ) ;
58
+ }
59
+
60
+ private IEnumerable < TableIdentity > MapSourceToDestinations (
61
+ TableIdentity sourceTableIdentity )
62
+ {
63
+ foreach ( var a in Parameterization . Activities )
64
+ {
65
+ if ( a . Source . GetTableIdentity ( ) == sourceTableIdentity )
66
+ {
67
+ foreach ( var d in a . Destinations )
68
+ {
69
+ yield return d . GetTableIdentity ( ) ;
70
+ }
71
+ }
72
+ }
73
+ }
74
+ }
75
+ }
0 commit comments