Skip to content

Commit fcc8e56

Browse files
authored
Merge pull request #29 from Azure/vpl/perf
Vpl/perf
2 parents 4eba6f5 + 9e49093 commit fcc8e56

12 files changed

+187
-64
lines changed

code/KustoCopyConsole/Concurrency/PriorityExecutionQueue.cs

+97-27
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
using Polly;
2-
using Polly.Bulkhead;
3-
using System;
1+
using System;
42
using System.Collections.Concurrent;
53
using System.Collections.Generic;
64
using System.Linq;
@@ -37,44 +35,50 @@ public override async Task ExecuteAsync()
3735
}
3836
#endregion
3937

40-
private readonly AsyncBulkheadPolicy _bulkheadPolicy;
41-
private readonly PriorityQueue<Request, TPriority> _requestQueue;
38+
private readonly PriorityQueue<Request, TPriority> _requestQueue = new();
39+
private readonly ConcurrentQueue<Task> _runnerTasks = new();
40+
private volatile int _parallelRunCount = 0;
4241

43-
public PriorityExecutionQueue(int parallelRunCount)
42+
public PriorityExecutionQueue(int maxParallelRunCount)
4443
{
45-
_bulkheadPolicy = Policy.BulkheadAsync(parallelRunCount, int.MaxValue);
46-
_requestQueue = new PriorityQueue<Request, TPriority>();
44+
if (maxParallelRunCount < 1)
45+
{
46+
throw new ArgumentOutOfRangeException(nameof(maxParallelRunCount));
47+
}
48+
MaxParallelRunCount = maxParallelRunCount;
4749
}
4850

49-
public int ParallelRunCount => _bulkheadPolicy.BulkheadAvailableCount;
51+
public int MaxParallelRunCount { get; }
5052

5153
public async Task<T> RequestRunAsync<T>(TPriority priority, Func<Task<T>> actionAsync)
5254
{
53-
var request = new Request<T>(actionAsync);
55+
// Optimistic path: if there is capacity
56+
if (TryOptimistic())
57+
{ // Optimistic try out succeeded!
58+
var result = await actionAsync();
59+
60+
Interlocked.Decrement(ref _parallelRunCount);
61+
TryDequeueRequest();
5462

55-
lock (_requestQueue)
56-
{ // Add our item in the queue
57-
_requestQueue.Enqueue(request, priority);
63+
return result;
5864
}
59-
// Remove/execute one item from the queue (when parallelism allows)
60-
// Either the item is "us" or someone before us
61-
await _bulkheadPolicy.ExecuteAsync(async () =>
62-
{
63-
Request? request;
65+
else
66+
{ // Optimistic try out failed: get in queue
67+
var request = new Request<T>(actionAsync);
6468

6569
lock (_requestQueue)
66-
{
67-
if (!_requestQueue.TryDequeue(out request, out _))
68-
{
69-
throw new InvalidOperationException("Request queue is corrupted");
70-
}
70+
{ // Add our item in the queue
71+
_requestQueue.Enqueue(request, priority);
7172
}
73+
TryDequeueRequest();
7274

73-
await request.ExecuteAsync();
74-
});
75+
// Wait for our own turn
76+
var result = await request.Source.Task;
77+
78+
await ObserveRunnerTasksAsync();
7579

76-
// Wait for our own turn
77-
return await request.Source.Task;
80+
return result;
81+
}
7882
}
7983

8084
public async Task RequestRunAsync(TPriority priority, Func<Task> actionAsync)
@@ -86,5 +90,71 @@ await RequestRunAsync(priority, async () =>
8690
return 0;
8791
});
8892
}
93+
94+
private bool TryOptimistic()
95+
{
96+
var currentSnapshot = _parallelRunCount;
97+
98+
if (currentSnapshot >= MaxParallelRunCount)
99+
{ // We've reached capacity
100+
return false;
101+
}
102+
else
103+
{
104+
if (Interlocked.CompareExchange(
105+
ref _parallelRunCount,
106+
currentSnapshot + 1,
107+
currentSnapshot) == currentSnapshot)
108+
{
109+
return true;
110+
}
111+
else
112+
{ // Somebody else modified in the meantime, we retry
113+
return TryOptimistic();
114+
}
115+
}
116+
}
117+
118+
private void TryDequeueRequest()
119+
{
120+
if (TryOptimistic())
121+
{
122+
lock (_requestQueue)
123+
{
124+
if (_requestQueue.TryDequeue(out var request, out _))
125+
{
126+
var runningTask = Task.Run(async () =>
127+
{
128+
await request.ExecuteAsync();
129+
Interlocked.Decrement(ref _parallelRunCount);
130+
TryDequeueRequest();
131+
});
132+
133+
_runnerTasks.Enqueue(runningTask);
134+
}
135+
else
136+
{ // Revert increment since there won't be any run
137+
Interlocked.Decrement(ref _parallelRunCount);
138+
}
139+
}
140+
}
141+
}
142+
143+
private async Task ObserveRunnerTasksAsync()
144+
{
145+
while(_runnerTasks.TryDequeue(out var task))
146+
{
147+
if(task.IsCompleted)
148+
{
149+
await task;
150+
}
151+
else
152+
{
153+
_runnerTasks.Enqueue(task);
154+
155+
return;
156+
}
157+
}
158+
}
89159
}
90160
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
7+
namespace KustoCopyConsole.Kusto.Data
8+
{
9+
public record ExtentRowCount(string Tags, long RecordCount);
10+
}

code/KustoCopyConsole/Kusto/DbCommandClient.cs

+8-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using KustoCopyConsole.Kusto.Data;
66
using System.Collections.Immutable;
77
using System.Data;
8+
using System.Diagnostics;
89

910
namespace KustoCopyConsole.Kusto
1011
{
@@ -292,19 +293,18 @@ .alter table ['{tempTableName}'] policy restricted_view_access true
292293
}
293294
#endregion
294295

295-
public async Task<long> GetExtentRowCountAsync(
296+
public async Task<IImmutableList<ExtentRowCount>> GetExtentRowCountsAsync(
296297
KustoPriority priority,
297298
string tempTableName,
298-
string blockTag,
299299
CancellationToken ct)
300300
{
301301
return await _commandQueue.RequestRunAsync(
302302
priority,
303303
async () =>
304304
{
305305
var commandText = @$"
306-
.show table ['{tempTableName}'] extents where tags contains '{blockTag}'
307-
| summarize sum(RowCount)
306+
.show table ['{tempTableName}'] extents
307+
| summarize RowCount=sum(RowCount) by Tags
308308
";
309309
var properties = new ClientRequestProperties();
310310
var reader = await _provider.ExecuteControlCommandAsync(
@@ -313,8 +313,10 @@ .show table ['{tempTableName}'] extents where tags contains '{blockTag}'
313313
properties);
314314
var result = reader.ToDataSet().Tables[0].Rows
315315
.Cast<DataRow>()
316-
.Select(r => (long)r[0])
317-
.First();
316+
.Select(r => new ExtentRowCount(
317+
(string)r["Tags"],
318+
(long)r["RowCount"]))
319+
.ToImmutableArray();
318320

319321
return result;
320322
});

code/KustoCopyConsole/Kusto/IngestClient.cs

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Kusto.Data.Common;
22
using Kusto.Ingest;
3+
using System.Diagnostics;
34

45
namespace KustoCopyConsole.Kusto
56
{

code/KustoCopyConsole/Runner/AwaitExportedRunner.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal class AwaitExportedRunner : RunnerBase
1717
private record ClusterBlocks(Uri ClusterUri, IEnumerable<BlockRowItem> BlockItems);
1818
#endregion
1919

20-
private const int MAX_OPERATIONS = 50;
20+
private const int MAX_OPERATIONS = 200;
2121
private static readonly IImmutableSet<string> FAILED_STATUS =
2222
ImmutableHashSet.Create(
2323
[
@@ -33,7 +33,7 @@ public AwaitExportedRunner(
3333
MainJobParameterization parameterization,
3434
RowItemGateway rowItemGateway,
3535
DbClientFactory dbClientFactory)
36-
: base(parameterization, rowItemGateway, dbClientFactory, TimeSpan.FromSeconds(5))
36+
: base(parameterization, rowItemGateway, dbClientFactory, TimeSpan.FromSeconds(3))
3737
{
3838
}
3939

code/KustoCopyConsole/Runner/AwaitIngestRunner.cs

+36-21
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ public async Task RunAsync(CancellationToken ct)
2828
var queuedBlocks = allBlocks
2929
.Where(h => h.Block.State == BlockState.Queued);
3030
var ingestionTasks = queuedBlocks
31-
.Select(h => UpdateQueuedBlockAsync(h, ct))
31+
.Where(h => h.TempTable != null)
32+
.GroupBy(h => h.TempTable)
33+
.Select(g => UpdateQueuedBlocksAsync(g, ct))
3234
.ToImmutableArray();
3335

3436
await Task.WhenAll(ingestionTasks);
@@ -38,33 +40,46 @@ public async Task RunAsync(CancellationToken ct)
3840
}
3941
}
4042

41-
private async Task UpdateQueuedBlockAsync(
42-
ActivityFlatHierarchy item,
43+
private async Task UpdateQueuedBlocksAsync(
44+
IEnumerable<ActivityFlatHierarchy> items,
4345
CancellationToken ct)
4446
{
45-
var targetRowCount = item
46-
.Urls
47-
.Sum(u => u.RowCount);
47+
var activity = items.First().Activity;
48+
var iteration = items.First().Iteration;
49+
var tempTableName = items.First().TempTable!.TempTableName;
4850
var dbClient = DbClientFactory.GetDbCommandClient(
49-
item.Activity.DestinationTable.ClusterUri,
50-
item.Activity.DestinationTable.DatabaseName);
51-
var rowCount = await dbClient.GetExtentRowCountAsync(
52-
new KustoPriority(item.Block.GetIterationKey()),
53-
item.TempTable!.TempTableName,
54-
item.Block.BlockTag,
51+
activity.DestinationTable.ClusterUri,
52+
activity.DestinationTable.DatabaseName);
53+
var extentRowCounts = await dbClient.GetExtentRowCountsAsync(
54+
new KustoPriority(iteration.GetIterationKey()),
55+
tempTableName,
5556
ct);
5657

57-
if (rowCount > targetRowCount)
58+
foreach (var item in items)
5859
{
59-
throw new CopyException(
60-
$"Target row count is {targetRowCount} while we observe {rowCount}",
61-
false);
62-
}
63-
if (rowCount == targetRowCount)
64-
{
65-
var newBlockItem = item.Block.ChangeState(BlockState.Ingested);
60+
var targetRowCount = item
61+
.Urls
62+
.Sum(h => h.RowCount);
63+
var blockExtentRowCount = extentRowCounts
64+
.Where(e => e.Tags.Contains(item.Block.BlockTag))
65+
.FirstOrDefault();
66+
67+
if (blockExtentRowCount != null)
68+
{
69+
if (blockExtentRowCount.RecordCount > targetRowCount)
70+
{
71+
throw new CopyException(
72+
$"Target row count is {targetRowCount} while " +
73+
$"we observe {blockExtentRowCount.RecordCount}",
74+
false);
75+
}
76+
if (blockExtentRowCount.RecordCount == targetRowCount)
77+
{
78+
var newBlockItem = item.Block.ChangeState(BlockState.Ingested);
6679

67-
RowItemGateway.Append(newBlockItem);
80+
RowItemGateway.Append(newBlockItem);
81+
}
82+
}
6883
}
6984
}
7085
}

code/KustoCopyConsole/Runner/ExportingRunner.cs

+12-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Microsoft.Extensions.Azure;
88
using System;
99
using System.Collections.Immutable;
10+
using System.Diagnostics;
1011
using System.Linq;
1112

1213
namespace KustoCopyConsole.Runner
@@ -36,11 +37,20 @@ public async Task RunAsync(CancellationToken ct)
3637
var exportLineUp = await GetExportLineUpAsync(capacityMap, ct);
3738
var exportCount = await StartExportAsync(exportLineUp, ct);
3839

39-
// Sleep
40-
await SleepAsync(ct);
40+
if (exportCount == 0)
41+
{
42+
// Sleep
43+
await SleepAsync(ct);
44+
}
4145
}
4246
}
4347

48+
protected override bool IsWakeUpRelevant(RowItemBase item)
49+
{
50+
return item is BlockRowItem b
51+
&& (b.State == BlockState.Planned || b.State == BlockState.Exported);
52+
}
53+
4454
private async Task<int> StartExportAsync(
4555
IEnumerable<ActivityFlatHierarchy> exportLineUp,
4656
CancellationToken ct)

code/KustoCopyConsole/Runner/IterationCompletingRunner.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private async Task CompleteIterationsAsync(CancellationToken ct)
6161

6262
await dbClient.DropTableIfExistsAsync(
6363
new KustoPriority(iteration.RowItem.GetIterationKey()),
64-
tableId.TableName,
64+
iteration.TempTable.TempTableName,
6565
ct);
6666
}
6767
var newIteration = iteration.RowItem.ChangeState(IterationState.Completed);

code/KustoCopyConsole/Runner/MoveExtentsRunner.cs

+5-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,11 @@ public async Task RunAsync(CancellationToken ct)
3333

3434
await Task.WhenAll(moveTasks);
3535

36-
// Sleep
37-
await SleepAsync(ct);
36+
if (!moveTasks.Any())
37+
{
38+
// Sleep
39+
await SleepAsync(ct);
40+
}
3841
}
3942
}
4043

code/KustoCopyConsole/Runner/PlanningRunner.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public PlanningRunner(
3131
MainJobParameterization parameterization,
3232
RowItemGateway rowItemGateway,
3333
DbClientFactory dbClientFactory)
34-
: base(parameterization, rowItemGateway, dbClientFactory, TimeSpan.FromMinutes(1))
34+
: base(parameterization, rowItemGateway, dbClientFactory, TimeSpan.FromSeconds(10))
3535
{
3636
}
3737

0 commit comments

Comments
 (0)