Skip to content

Commit 3debfe9

Browse files
authored
Merge pull request #24 from Azure/vpl/table-row-item
Vpl/table row item
2 parents d8f508a + 969ba6a commit 3debfe9

36 files changed

+529
-448
lines changed

code/KustoCopyConsole/CommandLineOptions.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class CommandLineOptions
2121
'd',
2222
"destination",
2323
Required = false,
24-
HelpText = "Set the destination in the form cluster uri/database/table (table is optional), e.g. https://mycluster.eastus.kusto.windows.net/mydb")]
24+
HelpText = "Set the destination table in the form cluster uri/database/table (table is optional), e.g. https://mycluster.eastus.kusto.windows.net/mydb")]
2525
public string Destination { get; set; } = string.Empty;
2626

2727
[Option(
@@ -34,15 +34,15 @@ public class CommandLineOptions
3434
[Option('a', "auth", Required = false, HelpText = "Set authentication method.")]
3535
public string Authentication { get; set; } = string.Empty;
3636

37+
[Option('q', "query", Required = true, HelpText = "Set query, e.g. nyc_taxi.")]
38+
public string Query { get; set; } = string.Empty;
39+
3740
[Option(
3841
"continuous",
3942
Required = false,
4043
HelpText = "Continuous run: if set, runs continuously, otherwise, stop after one iteration")]
4144
public bool IsContinuousRun { get; set; } = false;
4245

43-
[Option('q', "query", Required = false, HelpText = "Set query.")]
44-
public string Query { get; set; } = string.Empty;
45-
4646
[Option("log-path", Required = false, HelpText = "Set log file path.")]
4747
public string LogFilePath { get; set; } = string.Empty;
4848
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using KustoCopyConsole.Entity.RowItems;
2+
using System.Collections.Immutable;
3+
4+
namespace KustoCopyConsole.Entity.InMemory
5+
{
6+
internal class ActivityCache : CacheBase<ActivityRowItem>
7+
{
8+
public ActivityCache(
9+
ActivityRowItem item,
10+
IImmutableDictionary<long, IterationCache> iterationMap)
11+
: base(item)
12+
{
13+
IterationMap = iterationMap;
14+
}
15+
16+
public ActivityCache(ActivityRowItem item)
17+
: this(item, ImmutableDictionary<long, IterationCache>.Empty)
18+
{
19+
}
20+
21+
public IImmutableDictionary<long, IterationCache> IterationMap { get; }
22+
23+
public ActivityCache AppendIteration(IterationCache iteration)
24+
{
25+
var iterationId = iteration.RowItem.IterationId;
26+
27+
return new ActivityCache(RowItem, IterationMap.SetItem(iterationId, iteration));
28+
}
29+
}
30+
}

code/KustoCopyConsole/Entity/InMemory/IterationCache.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@
33

44
namespace KustoCopyConsole.Entity.InMemory
55
{
6-
internal class IterationCache : CacheBase<TableRowItem>
6+
internal class IterationCache : CacheBase<IterationRowItem>
77
{
88
public IterationCache(
9-
TableRowItem item,
9+
IterationRowItem item,
1010
IImmutableDictionary<long, BlockCache> blockMap)
1111
: base(item)
1212
{
1313
BlockMap = blockMap;
1414
}
1515

16-
public IterationCache(TableRowItem item)
16+
public IterationCache(IterationRowItem item)
1717
: this(item, ImmutableDictionary<long, BlockCache>.Empty)
1818
{
1919
}

code/KustoCopyConsole/Entity/InMemory/RowItemInMemoryCache.cs

+56-36
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ namespace KustoCopyConsole.Entity.InMemory
1111
internal class RowItemInMemoryCache
1212
{
1313
private readonly object _lock = new object();
14-
private volatile IImmutableDictionary<TableIdentity, TableCache> _sourceTableMap =
15-
ImmutableDictionary<TableIdentity, TableCache>.Empty;
14+
private volatile IImmutableDictionary<string, ActivityCache> _activityMap =
15+
ImmutableDictionary<string, ActivityCache>.Empty;
1616

1717
public RowItemInMemoryCache(IEnumerable<RowItemBase> items)
1818
{
@@ -25,12 +25,12 @@ public RowItemInMemoryCache(IEnumerable<RowItemBase> items)
2525
}
2626
}
2727

28-
public IImmutableDictionary<TableIdentity, TableCache> SourceTableMap
29-
=> _sourceTableMap;
28+
public IImmutableDictionary<string, ActivityCache> ActivityMap
29+
=> _activityMap;
3030

3131
public IEnumerable<RowItemBase> GetItems()
3232
{
33-
foreach (var sourceTable in SourceTableMap.Values)
33+
foreach (var sourceTable in ActivityMap.Values)
3434
{
3535
foreach (var sourceTableIteration in sourceTable.IterationMap.Values)
3636
{
@@ -51,17 +51,19 @@ public void AppendItem(RowItemBase item)
5151
{
5252
lock (_lock)
5353
{
54-
Interlocked.Exchange(ref _sourceTableMap, AppendItemToCache(item));
54+
Interlocked.Exchange(ref _activityMap, AppendItemToCache(item));
5555
}
5656
}
5757

58-
private IImmutableDictionary<TableIdentity, TableCache> AppendItemToCache(
58+
private IImmutableDictionary<string, ActivityCache> AppendItemToCache(
5959
RowItemBase item)
6060
{
6161
switch (item)
6262
{
63-
case TableRowItem st:
64-
return AppendSourceTable(st);
63+
case ActivityRowItem a:
64+
return AppendActivity(a);
65+
case IterationRowItem i:
66+
return AppendIteration(i);
6567
case BlockRowItem sb:
6668
return AppendSourceBlock(sb);
6769
case UrlRowItem url:
@@ -72,45 +74,64 @@ private IImmutableDictionary<TableIdentity, TableCache> AppendItemToCache(
7274
}
7375
}
7476

75-
private IImmutableDictionary<TableIdentity, TableCache> AppendSourceTable(
76-
TableRowItem item)
77+
private IImmutableDictionary<string, ActivityCache> AppendActivity(
78+
ActivityRowItem item)
7779
{
78-
var tableId = item.SourceTable;
80+
var activityName = item.ActivityName;
7981

80-
if (_sourceTableMap.ContainsKey(tableId))
82+
if (_activityMap.ContainsKey(activityName))
8183
{
82-
var table = _sourceTableMap[tableId];
84+
var activity = _activityMap[activityName];
85+
86+
return _activityMap.SetItem(
87+
activityName,
88+
new ActivityCache(item, activity.IterationMap));
89+
}
90+
else
91+
{
92+
return _activityMap.Add(activityName, new ActivityCache(item));
93+
}
94+
}
95+
96+
private IImmutableDictionary<string, ActivityCache> AppendIteration(
97+
IterationRowItem item)
98+
{
99+
var activityName = item.ActivityName;
100+
101+
if (_activityMap.ContainsKey(activityName))
102+
{
103+
var table = _activityMap[activityName];
83104

84105
if (table.IterationMap.ContainsKey(item.IterationId))
85106
{
86107
var iteration = table.IterationMap[item.IterationId];
87108

88-
return _sourceTableMap.SetItem(
89-
tableId,
109+
return _activityMap.SetItem(
110+
activityName,
90111
table.AppendIteration(
91112
new IterationCache(item, iteration.BlockMap)));
92113
}
93114
else
94115
{
95-
return _sourceTableMap.SetItem(
96-
tableId,
116+
return _activityMap.SetItem(
117+
activityName,
97118
table.AppendIteration(new IterationCache(item)));
98119
}
99120
}
100121
else
101122
{
102-
return _sourceTableMap.Add(tableId, new TableCache(item));
123+
throw new NotSupportedException("Activity should come before block in logs");
103124
}
104125
}
105126

106-
private IImmutableDictionary<TableIdentity, TableCache> AppendSourceBlock(
127+
private IImmutableDictionary<string, ActivityCache> AppendSourceBlock(
107128
BlockRowItem item)
108129
{
109-
var tableId = item.SourceTable;
130+
var activityName = item.ActivityName;
110131

111-
if (_sourceTableMap.ContainsKey(tableId))
132+
if (_activityMap.ContainsKey(activityName))
112133
{
113-
var sourceTable = _sourceTableMap[tableId];
134+
var sourceTable = _activityMap[activityName];
114135

115136
if (sourceTable.IterationMap.ContainsKey(item.IterationId))
116137
{
@@ -120,16 +141,16 @@ private IImmutableDictionary<TableIdentity, TableCache> AppendSourceBlock(
120141
{
121142
var sourceBlock = sourceIteration.BlockMap[item.BlockId];
122143

123-
return _sourceTableMap.SetItem(
124-
tableId,
144+
return _activityMap.SetItem(
145+
activityName,
125146
sourceTable.AppendIteration(
126147
sourceIteration.AppendBlock(
127148
new BlockCache(item, sourceBlock.UrlMap))));
128149
}
129150
else
130151
{
131-
return _sourceTableMap.SetItem(
132-
tableId,
152+
return _activityMap.SetItem(
153+
activityName,
133154
sourceTable.AppendIteration(
134155
sourceIteration.AppendBlock(new BlockCache(item))));
135156
}
@@ -141,18 +162,17 @@ private IImmutableDictionary<TableIdentity, TableCache> AppendSourceBlock(
141162
}
142163
else
143164
{
144-
throw new NotSupportedException("Table should come before block in logs");
165+
throw new NotSupportedException("Activity should come before block in logs");
145166
}
146167
}
147168

148-
private IImmutableDictionary<TableIdentity, TableCache> AppendSourceUrl(
149-
UrlRowItem item)
169+
private IImmutableDictionary<string, ActivityCache> AppendSourceUrl(UrlRowItem item)
150170
{
151-
var tableId = item.SourceTable;
171+
var activityName = item.ActivityName;
152172

153-
if (_sourceTableMap.ContainsKey(tableId))
173+
if (_activityMap.ContainsKey(activityName))
154174
{
155-
var sourceTable = _sourceTableMap[tableId];
175+
var sourceTable = _activityMap[activityName];
156176

157177
if (sourceTable.IterationMap.ContainsKey(item.IterationId))
158178
{
@@ -162,8 +182,8 @@ private IImmutableDictionary<TableIdentity, TableCache> AppendSourceUrl(
162182
{
163183
var block = sourceIteration.BlockMap[item.BlockId];
164184

165-
return _sourceTableMap.SetItem(
166-
tableId,
185+
return _activityMap.SetItem(
186+
activityName,
167187
sourceTable.AppendIteration(
168188
sourceIteration.AppendBlock(
169189
block.AppendUrl(new UrlCache(item)))));
@@ -180,7 +200,7 @@ private IImmutableDictionary<TableIdentity, TableCache> AppendSourceUrl(
180200
}
181201
else
182202
{
183-
throw new NotSupportedException("Table should come before block in logs");
203+
throw new NotSupportedException("Activity should come before block in logs");
184204
}
185205
}
186206
}

code/KustoCopyConsole/Entity/InMemory/TableCache.cs

-32
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using KustoCopyConsole.Entity.State;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Text;
6+
using System.Threading.Tasks;
7+
8+
namespace KustoCopyConsole.Entity.RowItems
9+
{
10+
internal class ActivityRowItem : RowItemBase
11+
{
12+
public ActivityState State { get; set; }
13+
14+
public string ActivityName { get; set; } = string.Empty;
15+
16+
public TableIdentity SourceTable { get; set; } = TableIdentity.Empty;
17+
18+
public TableIdentity DestinationTable { get; set; } = TableIdentity.Empty;
19+
20+
public override void Validate()
21+
{
22+
if (string.IsNullOrWhiteSpace(ActivityName))
23+
{
24+
throw new InvalidDataException($"{nameof(ActivityName)} must have a value");
25+
}
26+
SourceTable.Validate();
27+
DestinationTable.Validate();
28+
}
29+
30+
public ActivityRowItem ChangeState(ActivityState newState)
31+
{
32+
var clone = (ActivityRowItem)Clone();
33+
34+
clone.State = newState;
35+
36+
return clone;
37+
}
38+
}
39+
}

code/KustoCopyConsole/Entity/RowItems/BlockRowItem.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ internal class BlockRowItem : RowItemBase
1111
{
1212
public BlockState State { get; set; }
1313

14-
public TableIdentity SourceTable { get; set; } = TableIdentity.Empty;
15-
16-
public TableIdentity DestinationTable { get; set; } = TableIdentity.Empty;
14+
public string ActivityName { get; set; } = string.Empty;
1715

1816
public long IterationId { get; set; }
1917

@@ -29,8 +27,10 @@ internal class BlockRowItem : RowItemBase
2927

3028
public override void Validate()
3129
{
32-
SourceTable.Validate();
33-
DestinationTable.Validate();
30+
if (string.IsNullOrWhiteSpace(ActivityName))
31+
{
32+
throw new InvalidDataException($"{nameof(ActivityName)} must have a value");
33+
}
3434
if (IterationId < 1)
3535
{
3636
throw new InvalidDataException(

0 commit comments

Comments
 (0)