Skip to content

Commit 4eba6f5

Browse files
authored
Merge pull request #28 from Azure/vpl/proc-runners
Vpl/proc runners
2 parents 7db759e + 6750f64 commit 4eba6f5

File tree

89 files changed

+1146
-6895
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+1146
-6895
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using KustoCopyConsole.Entity.RowItems;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Text;
6+
using System.Threading.Tasks;
7+
8+
namespace KustoCopyConsole.Entity.InMemory
9+
{
10+
internal record ActivityFlatHierarchy(
11+
ActivityRowItem Activity,
12+
IterationRowItem Iteration,
13+
TempTableRowItem? TempTable,
14+
BlockRowItem Block,
15+
IEnumerable<UrlRowItem> Urls);
16+
}

code/KustoCopyConsole/Entity/InMemory/IterationCache.cs

+6-2
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,28 @@ internal class IterationCache : CacheBase<IterationRowItem>
77
{
88
public IterationCache(
99
IterationRowItem item,
10+
TempTableRowItem? tempTable,
1011
IImmutableDictionary<long, BlockCache> blockMap)
1112
: base(item)
1213
{
14+
TempTable = tempTable;
1315
BlockMap = blockMap;
1416
}
1517

1618
public IterationCache(IterationRowItem item)
17-
: this(item, ImmutableDictionary<long, BlockCache>.Empty)
19+
: this(item, null, ImmutableDictionary<long, BlockCache>.Empty)
1820
{
1921
}
2022

23+
public TempTableRowItem? TempTable { get; }
24+
2125
public IImmutableDictionary<long, BlockCache> BlockMap { get; }
2226

2327
public IterationCache AppendBlock(BlockCache block)
2428
{
2529
var newBlockMap = BlockMap.SetItem(block.RowItem.BlockId, block);
2630

27-
return new IterationCache(RowItem, newBlockMap);
31+
return new IterationCache(RowItem, TempTable, newBlockMap);
2832
}
2933
}
3034
}

code/KustoCopyConsole/Entity/InMemory/RowItemInMemoryCache.cs

+84-27
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
using KustoCopyConsole.Entity.RowItems;
2+
using KustoCopyConsole.Storage;
23
using System;
34
using System.Collections.Generic;
45
using System.Collections.Immutable;
56
using System.Linq;
67
using System.Text;
78
using System.Threading.Tasks;
9+
using System.Threading.Tasks.Dataflow;
810

911
namespace KustoCopyConsole.Entity.InMemory
1012
{
@@ -30,14 +32,40 @@ public RowItemInMemoryCache(IEnumerable<RowItemBase> items)
3032
public IImmutableDictionary<string, ActivityCache> ActivityMap
3133
=> _activityMap;
3234

35+
public IEnumerable<ActivityFlatHierarchy> GetActivityFlatHierarchy(
36+
Func<ActivityCache, bool> activityPredicate,
37+
Func<IterationCache, bool> iterationPredicate)
38+
{
39+
return ActivityMap
40+
.Values
41+
.Where(a => activityPredicate(a))
42+
.SelectMany(a => a.IterationMap.Values.Where(i => iterationPredicate(i)).Select(i => new
43+
{
44+
Activity = a,
45+
Iteration = i,
46+
TempTableItem = i.TempTable
47+
}))
48+
.SelectMany(o => o.Iteration.BlockMap.Values.Select(b => new ActivityFlatHierarchy(
49+
o.Activity.RowItem,
50+
o.Iteration.RowItem,
51+
o.TempTableItem,
52+
b.RowItem,
53+
b.UrlMap.Values.Select(u => u.RowItem))));
54+
}
55+
3356
public IEnumerable<RowItemBase> GetItems()
3457
{
35-
foreach (var sourceTable in ActivityMap.Values)
58+
foreach (var activity in ActivityMap.Values)
3659
{
37-
foreach (var sourceTableIteration in sourceTable.IterationMap.Values)
60+
yield return activity.RowItem;
61+
foreach (var iteration in activity.IterationMap.Values)
3862
{
39-
yield return sourceTableIteration.RowItem;
40-
foreach (var block in sourceTableIteration.BlockMap.Values)
63+
yield return iteration.RowItem;
64+
if (iteration.TempTable != null)
65+
{
66+
yield return iteration.TempTable;
67+
}
68+
foreach (var block in iteration.BlockMap.Values)
4169
{
4270
yield return block.RowItem;
4371
foreach (var url in block.UrlMap.Values)
@@ -54,8 +82,8 @@ public void AppendItem(RowItemBase item)
5482
lock (_lock)
5583
{
5684
Interlocked.Exchange(ref _activityMap, AppendItemToCache(item));
57-
OnRowItemAppended(item);
5885
}
86+
OnRowItemAppended(item);
5987
}
6088

6189
private void OnRowItemAppended(RowItemBase item)
@@ -75,10 +103,12 @@ private IImmutableDictionary<string, ActivityCache> AppendItemToCache(
75103
return AppendActivity(a);
76104
case IterationRowItem i:
77105
return AppendIteration(i);
106+
case TempTableRowItem t:
107+
return AppendTempTable(t);
78108
case BlockRowItem sb:
79-
return AppendSourceBlock(sb);
109+
return AppendBlock(sb);
80110
case UrlRowItem url:
81-
return AppendSourceUrl(url);
111+
return AppendUrl(url);
82112
default:
83113
throw new NotSupportedException(
84114
$"Not supported row item type: {item.GetType().Name}");
@@ -120,7 +150,7 @@ private IImmutableDictionary<string, ActivityCache> AppendIteration(
120150
return _activityMap.SetItem(
121151
activityName,
122152
table.AppendIteration(
123-
new IterationCache(item, iteration.BlockMap)));
153+
new IterationCache(item, iteration.TempTable, iteration.BlockMap)));
124154
}
125155
else
126156
{
@@ -135,35 +165,62 @@ private IImmutableDictionary<string, ActivityCache> AppendIteration(
135165
}
136166
}
137167

138-
private IImmutableDictionary<string, ActivityCache> AppendSourceBlock(
139-
BlockRowItem item)
168+
private IImmutableDictionary<string, ActivityCache> AppendTempTable(TempTableRowItem item)
140169
{
141170
var activityName = item.ActivityName;
142171

143172
if (_activityMap.ContainsKey(activityName))
144173
{
145-
var sourceTable = _activityMap[activityName];
174+
var activity = _activityMap[activityName];
146175

147-
if (sourceTable.IterationMap.ContainsKey(item.IterationId))
176+
if (activity.IterationMap.ContainsKey(item.IterationId))
148177
{
149-
var sourceIteration = sourceTable.IterationMap[item.IterationId];
178+
var iteration = activity.IterationMap[item.IterationId];
150179

151-
if (sourceIteration.BlockMap.ContainsKey(item.BlockId))
180+
return _activityMap.SetItem(
181+
activityName,
182+
activity.AppendIteration(
183+
new IterationCache(iteration.RowItem, item, iteration.BlockMap)));
184+
}
185+
else
186+
{
187+
throw new NotSupportedException("Iteration should come before block in logs");
188+
}
189+
}
190+
else
191+
{
192+
throw new NotSupportedException("Activity should come before block in logs");
193+
}
194+
}
195+
196+
private IImmutableDictionary<string, ActivityCache> AppendBlock(BlockRowItem item)
197+
{
198+
var activityName = item.ActivityName;
199+
200+
if (_activityMap.ContainsKey(activityName))
201+
{
202+
var activity = _activityMap[activityName];
203+
204+
if (activity.IterationMap.ContainsKey(item.IterationId))
205+
{
206+
var iteration = activity.IterationMap[item.IterationId];
207+
208+
if (iteration.BlockMap.ContainsKey(item.BlockId))
152209
{
153-
var sourceBlock = sourceIteration.BlockMap[item.BlockId];
210+
var sourceBlock = iteration.BlockMap[item.BlockId];
154211

155212
return _activityMap.SetItem(
156213
activityName,
157-
sourceTable.AppendIteration(
158-
sourceIteration.AppendBlock(
214+
activity.AppendIteration(
215+
iteration.AppendBlock(
159216
new BlockCache(item, sourceBlock.UrlMap))));
160217
}
161218
else
162219
{
163220
return _activityMap.SetItem(
164221
activityName,
165-
sourceTable.AppendIteration(
166-
sourceIteration.AppendBlock(new BlockCache(item))));
222+
activity.AppendIteration(
223+
iteration.AppendBlock(new BlockCache(item))));
167224
}
168225
}
169226
else
@@ -177,26 +234,26 @@ private IImmutableDictionary<string, ActivityCache> AppendSourceBlock(
177234
}
178235
}
179236

180-
private IImmutableDictionary<string, ActivityCache> AppendSourceUrl(UrlRowItem item)
237+
private IImmutableDictionary<string, ActivityCache> AppendUrl(UrlRowItem item)
181238
{
182239
var activityName = item.ActivityName;
183240

184241
if (_activityMap.ContainsKey(activityName))
185242
{
186-
var sourceTable = _activityMap[activityName];
243+
var activity = _activityMap[activityName];
187244

188-
if (sourceTable.IterationMap.ContainsKey(item.IterationId))
245+
if (activity.IterationMap.ContainsKey(item.IterationId))
189246
{
190-
var sourceIteration = sourceTable.IterationMap[item.IterationId];
247+
var iteration = activity.IterationMap[item.IterationId];
191248

192-
if (sourceIteration.BlockMap.ContainsKey(item.BlockId))
249+
if (iteration.BlockMap.ContainsKey(item.BlockId))
193250
{
194-
var block = sourceIteration.BlockMap[item.BlockId];
251+
var block = iteration.BlockMap[item.BlockId];
195252

196253
return _activityMap.SetItem(
197254
activityName,
198-
sourceTable.AppendIteration(
199-
sourceIteration.AppendBlock(
255+
activity.AppendIteration(
256+
iteration.AppendBlock(
200257
block.AppendUrl(new UrlCache(item)))));
201258
}
202259
else

code/KustoCopyConsole/Entity/RowItems/BlockRowItem.cs

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using KustoCopyConsole.Entity.State;
1+
using KustoCopyConsole.Entity.RowItems.Keys;
2+
using KustoCopyConsole.Entity.State;
23
using System;
34
using System.Collections.Generic;
45
using System.Linq;
@@ -20,6 +21,8 @@ internal class BlockRowItem : RowItemBase
2021
public DateTime IngestionTimeStart { get; set; } = DateTime.MinValue;
2122

2223
public DateTime IngestionTimeEnd { get; set; } = DateTime.MinValue;
24+
25+
public DateTime ExtentCreationTime { get; set; } = DateTime.MinValue;
2326

2427
public string OperationId { get; set; } = string.Empty;
2528

@@ -51,6 +54,11 @@ public override void Validate()
5154
throw new InvalidDataException(
5255
$"{nameof(IngestionTimeEnd)} hasn't been populated");
5356
}
57+
if (ExtentCreationTime == DateTime.MinValue)
58+
{
59+
throw new InvalidDataException(
60+
$"{nameof(ExtentCreationTime)} hasn't been populated");
61+
}
5462
if (State != BlockState.Planned && string.IsNullOrWhiteSpace(OperationId))
5563
{
5664
throw new InvalidDataException($"{nameof(OperationId)} hasn't been populated");
@@ -70,6 +78,11 @@ public override void Validate()
7078
}
7179
}
7280

81+
public BlockKey GetIterationKey()
82+
{
83+
return new BlockKey(ActivityName, IterationId, BlockId);
84+
}
85+
7386
public BlockRowItem ChangeState(BlockState newState)
7487
{
7588
var clone = (BlockRowItem)Clone();

code/KustoCopyConsole/Entity/RowItems/IterationRowItem.cs

+7-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using KustoCopyConsole.Entity.State;
1+
using KustoCopyConsole.Entity.RowItems.Keys;
2+
using KustoCopyConsole.Entity.State;
23
using System;
34
using System.Collections.Generic;
45
using System.Linq;
@@ -19,8 +20,6 @@ internal class IterationRowItem : RowItemBase
1920

2021
public string CursorEnd { get; set; } = string.Empty;
2122

22-
public string TempTableName { get; set; } = string.Empty;
23-
2423
public override void Validate()
2524
{
2625
if (string.IsNullOrWhiteSpace(ActivityName))
@@ -36,12 +35,11 @@ public override void Validate()
3635
{
3736
throw new InvalidDataException($"{nameof(CursorEnd)} should have a value");
3837
}
39-
if (State >= IterationState.TempTableCreating && string.IsNullOrWhiteSpace(TempTableName))
40-
{
41-
throw new InvalidDataException(
42-
$"{nameof(TempTableName)} should have a value for" +
43-
$"state {State}");
44-
}
38+
}
39+
40+
public IterationKey GetIterationKey()
41+
{
42+
return new IterationKey(ActivityName, IterationId);
4543
}
4644

4745
public IterationRowItem ChangeState(IterationState newState)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
namespace KustoCopyConsole.Entity.RowItems.Keys
2+
{
3+
internal record BlockKey(string ActivityName, long IterationId, long blockId);
4+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
7+
namespace KustoCopyConsole.Entity.RowItems.Keys
8+
{
9+
internal record IterationKey(string ActivityName, long IterationId);
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using KustoCopyConsole.Entity.RowItems.Keys;
2+
using KustoCopyConsole.Entity.State;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using System.Text;
7+
using System.Threading.Tasks;
8+
9+
namespace KustoCopyConsole.Entity.RowItems
10+
{
11+
internal class TempTableRowItem : RowItemBase
12+
{
13+
public TempTableState State { get; set; }
14+
15+
public string ActivityName { get; set; } = string.Empty;
16+
17+
public long IterationId { get; set; }
18+
19+
public string TempTableName { get; set; } = string.Empty;
20+
21+
public override void Validate()
22+
{
23+
if (string.IsNullOrWhiteSpace(ActivityName))
24+
{
25+
throw new InvalidDataException($"{nameof(ActivityName)} must have a value");
26+
}
27+
if (IterationId < 1)
28+
{
29+
throw new InvalidDataException(
30+
$"{nameof(IterationId)} should be positive but is {IterationId}");
31+
}
32+
if (string.IsNullOrWhiteSpace(TempTableName))
33+
{
34+
throw new InvalidDataException($"{nameof(TempTableName)} should have a value");
35+
}
36+
}
37+
38+
public IterationKey GetIterationKey()
39+
{
40+
return new IterationKey(ActivityName, IterationId);
41+
}
42+
43+
public TempTableRowItem ChangeState(TempTableState newState)
44+
{
45+
var clone = (TempTableRowItem)Clone();
46+
47+
clone.State = newState;
48+
49+
return clone;
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)