Skip to content

Commit ed83ccf

Browse files
committed
Simplify gateway
1 parent 3debfe9 commit ed83ccf

10 files changed

+110
-177
lines changed

code/KustoCopyConsole/Runner/AwaitIngestRunner.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private async Task<BlockRowItem> AwaitIngestionAsync(
6161
if (rowCount == targetRowCount)
6262
{
6363
blockItem = blockItem.ChangeState(BlockState.Ingested);
64-
await RowItemGateway.AppendAsync(blockItem, ct);
64+
RowItemGateway.Append(blockItem);
6565

6666
return blockItem;
6767
}

code/KustoCopyConsole/Runner/ExportingRunner.cs

+8-12
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public async Task<BlockRowItem> RunAsync(
3535

3636
if (blockItem.State == BlockState.CompletingExport)
3737
{
38-
blockItem = await CleanUrlsAsync(blockItem, ct);
38+
blockItem = CleanUrls(blockItem);
3939
}
4040
if (blockItem.State == BlockState.Planned)
4141
{
@@ -75,13 +75,13 @@ private async Task<BlockRowItem> AwaitExportBlockAsync(
7575
throw new InvalidDataException("No URL exported");
7676
}
7777
blockItem = blockItem.ChangeState(BlockState.CompletingExport);
78-
await RowItemGateway.AppendAsync(blockItem, ct);
78+
RowItemGateway.Append(blockItem);
7979
foreach (var urlItem in urlItems)
8080
{
81-
await RowItemGateway.AppendAsync(urlItem, ct);
81+
RowItemGateway.Append(urlItem);
8282
}
8383
blockItem = blockItem.ChangeState(BlockState.Exported);
84-
await RowItemGateway.AppendAsync(blockItem, ct);
84+
RowItemGateway.Append(blockItem);
8585

8686
return blockItem;
8787
}
@@ -112,14 +112,12 @@ private async Task<BlockRowItem> ExportBlockAsync(
112112

113113
blockItem = blockItem.ChangeState(BlockState.Exporting);
114114
blockItem.OperationId = operationId;
115-
await RowItemGateway.AppendAsync(blockItem, ct);
115+
RowItemGateway.Append(blockItem);
116116

117117
return blockItem;
118118
}
119119

120-
private async Task<BlockRowItem> CleanUrlsAsync(
121-
BlockRowItem blockItem,
122-
CancellationToken ct)
120+
private BlockRowItem CleanUrls(BlockRowItem blockItem)
123121
{
124122
var existingUrls = RowItemGateway.InMemoryCache
125123
.ActivityMap[blockItem.ActivityName]
@@ -130,12 +128,10 @@ private async Task<BlockRowItem> CleanUrlsAsync(
130128

131129
foreach (var url in existingUrls)
132130
{
133-
await RowItemGateway.AppendAsync(
134-
url.RowItem.ChangeState(UrlState.Deleted),
135-
ct);
131+
RowItemGateway.Append(url.RowItem.ChangeState(UrlState.Deleted));
136132
}
137133
blockItem = blockItem.ChangeState(BlockState.Exporting);
138-
await RowItemGateway.AppendAsync(blockItem, ct);
134+
RowItemGateway.Append(blockItem);
139135

140136
return blockItem;
141137
}

code/KustoCopyConsole/Runner/IterationRunner.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ private async Task<IterationRowItem> StartIterationAsync(
138138

139139
iterationItem = iterationItem.ChangeState(TableState.Planning);
140140
iterationItem.CursorEnd = cursorEnd;
141-
await RowItemGateway.AppendAsync(iterationItem, ct);
141+
RowItemGateway.Append(iterationItem);
142142
}
143143

144144
return iterationItem;

code/KustoCopyConsole/Runner/MainRunner.cs

+4-6
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private async Task RunActivityAsync(
9191
$"'{activityParam.TableOption.ExportMode}' isn't supported yet");
9292
}
9393

94-
await EnsureActivityAsync(activityParam, ct);
94+
EnsureActivity(activityParam);
9595

9696
var cache = RowItemGateway.InMemoryCache;
9797
var activity = cache.ActivityMap[activityParam.ActivityName].RowItem;
@@ -130,7 +130,7 @@ private async Task RunActivityAsync(
130130
CursorEnd = string.Empty
131131
};
132132

133-
await RowItemGateway.AppendAsync(newIterationItem, ct);
133+
RowItemGateway.Append(newIterationItem);
134134
await iterationRunner.RunAsync(newIterationItem, ct);
135135
}
136136
else
@@ -140,9 +140,7 @@ await Task.WhenAll(activeIterations
140140
}
141141
}
142142

143-
private async Task EnsureActivityAsync(
144-
ActivityParameterization activityParam,
145-
CancellationToken ct)
143+
private void EnsureActivity(ActivityParameterization activityParam)
146144
{
147145
if (!RowItemGateway.InMemoryCache.ActivityMap.ContainsKey(activityParam.ActivityName))
148146
{
@@ -154,7 +152,7 @@ private async Task EnsureActivityAsync(
154152
DestinationTable = activityParam.GetEffectiveDestinationTableIdentity()
155153
};
156154

157-
await RowItemGateway.AppendAsync(activity, ct);
155+
RowItemGateway.Append(activity);
158156
}
159157
}
160158
}

code/KustoCopyConsole/Runner/MoveExtentsRunner.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private async Task<BlockRowItem> MoveExtentsAsync(
5555
ct);
5656

5757
blockItem = blockItem.ChangeState(BlockState.ExtentMoved);
58-
await RowItemGateway.AppendAsync(blockItem, ct);
58+
RowItemGateway.Append(blockItem);
5959
}
6060

6161
return blockItem;

code/KustoCopyConsole/Runner/PlanningRunner.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,14 @@ private async Task<IterationRowItem> PlanBlocksAsync(
9595

9696
orderedDistributionInExtents = remainingDistributionInExtents
9797
.ToImmutableArray();
98-
await RowItemGateway.AppendAsync(newBlockItem, ct);
98+
RowItemGateway.Append(newBlockItem);
9999
lastBlock = newBlockItem;
100100
}
101101
}
102102
else
103103
{
104104
iterationItem = iterationItem.ChangeState(TableState.Planned);
105-
await RowItemGateway.AppendAsync(iterationItem, ct);
105+
RowItemGateway.Append(iterationItem);
106106
}
107107
}
108108

code/KustoCopyConsole/Runner/QueueIngestRunner.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ private async Task<BlockRowItem> QueueIngestBlockAsync(
6363

6464
blockItem = blockItem.ChangeState(BlockState.Queued);
6565
blockItem.BlockTag = blockTag;
66-
await RowItemGateway.AppendAsync(blockItem, ct);
66+
RowItemGateway.Append(blockItem);
6767

6868
return blockItem;
6969
}

code/KustoCopyConsole/Runner/TempTableCreatingRunner.cs

+2-8
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,7 @@ await dbCommandClient.CreateTempTableAsync(
6262
ct);
6363

6464
iterationItem = iterationItem.ChangeState(TableState.TempTableCreated);
65-
66-
var rowItemAppend = await RowItemGateway.AppendAsync(iterationItem, ct);
67-
68-
// We want to make sure this is recorded before we start ingesting data into it
69-
await rowItemAppend.ItemAppendTask;
65+
RowItemGateway.Append(iterationItem);
7066

7167
return iterationItem;
7268
}
@@ -84,11 +80,9 @@ private async Task<IterationRowItem> PrepareTempTableAsync(
8480
iterationItem = iterationItem.ChangeState(TableState.TempTableCreating);
8581
iterationItem.TempTableName = tempTableName;
8682

87-
var rowItemAppend = await RowItemGateway.AppendAsync(iterationItem, ct);
88-
8983
// We want to ensure the item is appended before creating a temp table so
9084
// we don't lose track of the table
91-
await rowItemAppend.ItemAppendTask;
85+
await RowItemGateway.AppendAndPersistAsync(iterationItem, ct);
9286

9387
return iterationItem;
9488
}

code/KustoCopyConsole/Storage/RowItemAppend.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77

88
namespace KustoCopyConsole.Storage
99
{
10-
internal record RowItemAppend(RowItemBase Item, Task ItemAppendTask);
10+
internal record RowItemAppend(RowItemBase Item);
1111
}

0 commit comments

Comments
 (0)