Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 12e9eff

Browse files
committedJan 15, 2025·
.
1 parent 3debfe9 commit 12e9eff

File tree

5 files changed

+43
-37
lines changed

5 files changed

+43
-37
lines changed
 

‎code/KustoCopyConsole/Entity/RowItems/ActivityRowItem.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public override void Validate()
2323
{
2424
throw new InvalidDataException($"{nameof(ActivityName)} must have a value");
2525
}
26-
SourceTable.Validate();
26+
SourceDatabase.Validate();
2727
DestinationTable.Validate();
2828
}
2929

‎code/KustoCopyConsole/Runner/ExportingRunner.cs

+5-7
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,10 @@ public async Task<BlockRowItem> RunAsync(
2626
var activityItem = RowItemGateway.InMemoryCache
2727
.ActivityMap[blockItem.ActivityName]
2828
.RowItem;
29-
var activityParam = Parameterization.Activities[blockItem.ActivityName];
3029
var exportClient = DbClientFactory.GetExportClient(
31-
activityItem.SourceTable.ClusterUri,
32-
activityItem.SourceTable.DatabaseName,
33-
activityItem.SourceTable.TableName,
34-
activityParam.KqlQuery);
30+
activityItem.SourceDatabase.ClusterUri,
31+
blockItem.SourceTable.DatabaseName,
32+
blockItem.SourceTable.TableName);
3533

3634
if (blockItem.State == BlockState.CompletingExport)
3735
{
@@ -55,8 +53,8 @@ private async Task<BlockRowItem> AwaitExportBlockAsync(
5553
CancellationToken ct)
5654
{
5755
var exportDetails = await exportClient.AwaitExportAsync(
58-
new KustoPriority(
59-
blockItem.ActivityName, blockItem.IterationId, blockItem.BlockId),
56+
blockItem.IterationId,
57+
blockItem.SourceTable.TableName,
6058
blockItem.OperationId,
6159
ct);
6260
var urlItems = exportDetails

‎code/KustoCopyConsole/Runner/IterationRunner.cs

+23-10
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ private async Task<IterationRowItem> StartIterationAsync(
130130
.ActivityMap[iterationItem.ActivityName]
131131
.RowItem;
132132
var queryClient = DbClientFactory.GetDbQueryClient(
133-
activity.SourceTable.ClusterUri,
134-
activity.SourceTable.DatabaseName);
133+
activity.SourceDatabase.ClusterUri,
134+
activity.SourceDatabase.DatabaseName);
135135
var cursorEnd = await queryClient.GetCurrentCursorAsync(
136136
new KustoPriority(iterationItem.ActivityName, iterationItem.IterationId),
137137
ct);
@@ -174,7 +174,7 @@ private async Task<IterationRowItem> CreateTempTableAsync(
174174
#region Block Level
175175
private async Task ProcessAllBlocksAsync(IterationRowItem iterationItem, CancellationToken ct)
176176
{
177-
var blobPathProvider = GetBlobPathFactory();
177+
var blobPathProvider = GetBlobPathFactory(iterationItem.SourceTable);
178178
var blockItems = RowItemGateway.InMemoryCache
179179
.ActivityMap[iterationItem.ActivityName]
180180
.IterationMap[iterationItem.IterationId]
@@ -192,15 +192,28 @@ private async Task ProcessAllBlocksAsync(IterationRowItem iterationItem, Cancell
192192
await Task.WhenAll(processBlockTasks);
193193
}
194194

195-
private IStagingBlobUriProvider GetBlobPathFactory()
195+
private IStagingBlobUriProvider GetBlobPathFactory(TableIdentity sourceTable)
196196
{
197-
var tempUriProvider = new AzureBlobUriProvider(
198-
Parameterization.StagingStorageContainers
199-
.Select(s => new Uri(s))
200-
.ToImmutableArray(),
201-
Parameterization.GetCredentials());
197+
var activity = Parameterization.Activities
198+
.Values
199+
.Where(a => a.Source.GetTableIdentity() == sourceTable)
200+
.FirstOrDefault();
202201

203-
return tempUriProvider;
202+
if (activity == null)
203+
{
204+
throw new InvalidDataException($"Can't find table in parameters: {sourceTable}");
205+
}
206+
else
207+
{
208+
var destinationTable = activity.Destination.GetTableIdentity();
209+
var tempUriProvider = new AzureBlobUriProvider(
210+
Parameterization.StagingStorageContainers
211+
.Select(s => new Uri(s))
212+
.ToImmutableArray(),
213+
Parameterization.GetCredentials());
214+
215+
return tempUriProvider;
216+
}
204217
}
205218

206219
private async Task ProcessSingleBlockAsync(

‎code/KustoCopyConsole/Runner/MainRunner.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ private async Task EnsureActivityAsync(
150150
{
151151
State = ActivityState.Active,
152152
ActivityName = activityParam.ActivityName,
153-
SourceTable = activityParam.Source.GetTableIdentity(),
153+
SourceDatabase = activityParam.Source.GetTableIdentity(),
154154
DestinationTable = activityParam.GetEffectiveDestinationTableIdentity()
155155
};
156156

‎code/KustoCopyConsole/Runner/PlanningRunner.cs

+13-18
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ private async Task<IterationRowItem> PlanBlocksAsync(
5656
.ActivityMap[iterationItem.ActivityName]
5757
.RowItem;
5858
var queryClient = DbClientFactory.GetDbQueryClient(
59-
activity.SourceTable.ClusterUri,
60-
activity.SourceTable.DatabaseName);
59+
activity.SourceDatabase.ClusterUri,
60+
activity.SourceDatabase.DatabaseName);
6161
var dbCommandClient = DbClientFactory.GetDbCommandClient(
62-
activity.SourceTable.ClusterUri,
63-
activity.SourceTable.DatabaseName);
62+
activity.SourceDatabase.ClusterUri,
63+
activity.SourceDatabase.DatabaseName);
6464

6565
// Loop on block batches
6666
while (iterationItem.State == TableState.Planning)
@@ -147,23 +147,18 @@ private async Task<IterationRowItem> PlanBlocksAsync(
147147
}
148148

149149
// Merge results from query + show extents command
150-
private async Task<IImmutableList<RecordDistributionInExtent>> GetRecordDistributionInExtents(
151-
IterationRowItem iterationItem,
150+
private static async Task<IImmutableList<RecordDistributionInExtent>> GetRecordDistributionInExtents(
151+
IterationRowItem sourceTableItem,
152152
DateTime? ingestionTimeStart,
153153
DbQueryClient queryClient,
154154
DbCommandClient dbCommandClient,
155155
CancellationToken ct)
156156
{
157-
var activityItem = RowItemGateway.InMemoryCache
158-
.ActivityMap[iterationItem.ActivityName]
159-
.RowItem;
160-
var activityParam = Parameterization.Activities[iterationItem.ActivityName];
161157
var distributions = await queryClient.GetRecordDistributionAsync(
162-
new KustoPriority(iterationItem.ActivityName, iterationItem.IterationId),
163-
activityItem.SourceTable.TableName,
164-
activityParam.KqlQuery,
165-
iterationItem.CursorStart,
166-
iterationItem.CursorEnd,
158+
sourceTableItem.IterationId,
159+
sourceTableItem.SourceTable.TableName,
160+
sourceTableItem.CursorStart,
161+
sourceTableItem.CursorEnd,
167162
ingestionTimeStart,
168163
MAX_STATS_COUNT,
169164
ct);
@@ -175,8 +170,8 @@ private async Task<IImmutableList<RecordDistributionInExtent>> GetRecordDistribu
175170
.Where(id => !string.IsNullOrWhiteSpace(id))
176171
.Distinct();
177172
var extentDates = await dbCommandClient.GetExtentDatesAsync(
178-
new KustoPriority(iterationItem.ActivityName, iterationItem.IterationId),
179-
activityItem.SourceTable.TableName,
173+
sourceTableItem.IterationId,
174+
sourceTableItem.SourceTable.TableName,
180175
extentIds,
181176
ct);
182177

@@ -200,7 +195,7 @@ private async Task<IImmutableList<RecordDistributionInExtent>> GetRecordDistribu
200195
else
201196
{
202197
return await GetRecordDistributionInExtents(
203-
iterationItem,
198+
sourceTableItem,
204199
ingestionTimeStart,
205200
queryClient,
206201
dbCommandClient,

0 commit comments

Comments
 (0)
Please sign in to comment.