Skip to content

Commit 9b0e1da

Browse files
committed
.
1 parent fd311c7 commit 9b0e1da

File tree

2 files changed

+93
-9
lines changed

2 files changed

+93
-9
lines changed

code/KustoCopyConsole/Entity/RowItem.cs

+3
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ private class RowTypeConverter : DefaultTypeConverter
121121
/// <summary>Number of rows in a block.</summary>
122122
[Index(17)]
123123
public long Cardinality { get; set; }
124+
125+
[Index(18)]
126+
public string OperationId { get; set; } = string.Empty;
124127
#endregion
125128

126129
public T ParseState<T>() where T : struct

code/KustoCopyConsole/Orchestration/SourceTableExportingOrchestration.cs

+90-9
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
using System.Collections;
1212
using System.Collections.Generic;
1313
using System.Collections.Immutable;
14+
using System.Collections.ObjectModel;
15+
using System.Diagnostics.CodeAnalysis;
1416
using System.Linq;
1517
using System.Text;
1618
using System.Threading.Tasks;
@@ -33,6 +35,8 @@ private record ClusterCache(
3335
private class ClusterQueue
3436
{
3537
private readonly object _lock = new object();
38+
private readonly PriorityQueue<RowItem, KustoPriority> _exportQueue;
39+
private readonly IDictionary<string, RowItem> _operationIdToBlockMap;
3640
private readonly Func<CancellationToken, Task<ClusterCache>> _clusterCacheFetch;
3741
private Task<ClusterCache>? _cacheTask;
3842

@@ -41,15 +45,11 @@ public ClusterQueue(
4145
IDictionary<string, RowItem> operationIdToBlockMap,
4246
Func<CancellationToken, Task<ClusterCache>> clusterCacheFetch)
4347
{
44-
ExportQueue = exportQueue;
45-
OperationIdToBlockMap = operationIdToBlockMap;
48+
_exportQueue = exportQueue;
49+
_operationIdToBlockMap = operationIdToBlockMap;
4650
_clusterCacheFetch = clusterCacheFetch;
4751
}
4852

49-
public PriorityQueue<RowItem, KustoPriority> ExportQueue { get; }
50-
51-
public IDictionary<string, RowItem> OperationIdToBlockMap { get; }
52-
5353
public async Task<ClusterCache> GetClusterCacheAsync(CancellationToken ct)
5454
{
5555
var cacheTask = _cacheTask;
@@ -68,13 +68,43 @@ public async Task<ClusterCache> GetClusterCacheAsync(CancellationToken ct)
6868

6969
return await _cacheTask!;
7070
}
71+
72+
#region Export queue
73+
public void EnqueueExport(RowItem blockItem, KustoPriority kustoPriority)
74+
{
75+
lock (_exportQueue)
76+
{
77+
_exportQueue.Enqueue(blockItem, kustoPriority);
78+
}
79+
}
80+
81+
public bool TryDequeueExport([MaybeNullWhen(false)] out RowItem blockItem)
82+
{
83+
lock (_exportQueue)
84+
{
85+
return _exportQueue.TryDequeue(out blockItem, out _);
86+
}
87+
}
88+
#endregion
89+
90+
#region Operation ID map
91+
public int GetOperationIDCount()
92+
{
93+
lock (_operationIdToBlockMap)
94+
{
95+
return _operationIdToBlockMap.Count;
96+
}
97+
}
98+
#endregion
7199
}
72100
#endregion
73101

74102
private static readonly TimeSpan CACHE_REFRESH_RATE = TimeSpan.FromMinutes(10);
75103

76104
private readonly IDictionary<Uri, ClusterQueue> _clusterToExportQueue =
77105
new Dictionary<Uri, ClusterQueue>();
106+
private volatile int _isExportingTaskRunning = Convert.ToInt32(false);
107+
private volatile int _isExportedTaskRunning = Convert.ToInt32(false);
78108

79109
public SourceTableExportingOrchestration(
80110
RowItemGateway rowItemGateway,
@@ -126,9 +156,9 @@ protected override void OnProcessRowItemAppended(RowItemAppend e, CancellationTo
126156
private async Task OnQueueExportingItemAsync(RowItem blockItem, CancellationToken ct)
127157
{
128158
var tableIdentity = blockItem.GetSourceTableIdentity();
129-
var clusterQueue = GetClusterQueue(blockItem, ct);
159+
var clusterQueue = EnsureClusterQueue(blockItem, ct);
130160

131-
clusterQueue.ExportQueue.Enqueue(
161+
clusterQueue.EnqueueExport(
132162
blockItem,
133163
new KustoPriority(
134164
tableIdentity.DatabaseName,
@@ -137,6 +167,7 @@ private async Task OnQueueExportingItemAsync(RowItem blockItem, CancellationToke
137167
tableIdentity.TableName,
138168
blockItem.BlockId)));
139169

170+
// We test since it might be coming from a persisted state which is "exporting" already
140171
if (blockItem.ParseState<SourceBlockState>() == SourceBlockState.Planned)
141172
{
142173
var newBlockItem = blockItem.Clone();
@@ -145,10 +176,60 @@ private async Task OnQueueExportingItemAsync(RowItem blockItem, CancellationToke
145176

146177
await RowItemGateway.AppendAsync(newBlockItem, ct);
147178
}
179+
if (Interlocked.CompareExchange(
180+
ref _isExportingTaskRunning,
181+
Convert.ToInt32(true),
182+
Convert.ToInt32(false)) == Convert.ToInt32(false))
183+
{
184+
BackgroundTaskContainer.AddTask(OnExportingAsync(ct));
185+
}
186+
}
187+
188+
private async Task OnExportingAsync(CancellationToken ct)
189+
{
190+
while (true)
191+
{
192+
foreach (var clusterQueue in GetClusterQueues())
193+
{
194+
var clusterCache = await clusterQueue.GetClusterCacheAsync(ct);
195+
196+
if (clusterQueue.GetOperationIDCount() > clusterCache.ExportCapacity
197+
&& clusterQueue.TryDequeueExport(out var blockItem))
198+
{
199+
var tableIdentity = blockItem.GetSourceTableIdentity();
200+
var commandClient = DbClientFactory.GetDbCommandClient(
201+
tableIdentity.ClusterUri,
202+
tableIdentity.DatabaseName);
203+
var operationId = await commandClient.ExportBlockAsync(
204+
clusterCache.ExportRootUris,
205+
tableIdentity.TableName,
206+
blockItem.CursorStart,
207+
blockItem.CursorEnd,
208+
blockItem.IngestionTimeStart,
209+
blockItem.IngestionTimeEnd,
210+
ct);
211+
var newBlockItem = blockItem.Clone();
212+
213+
newBlockItem.State = SourceBlockState.Exporting.ToString();
214+
newBlockItem.OperationId = operationId;
215+
await RowItemGateway.AppendAsync(newBlockItem, ct);
216+
}
217+
}
218+
}
148219
}
149220

150221
#region Cluster queue
151-
private ClusterQueue GetClusterQueue(RowItem blockItem, CancellationToken ct)
222+
private IImmutableList<ClusterQueue> GetClusterQueues()
223+
{
224+
lock (_clusterToExportQueue)
225+
{
226+
return _clusterToExportQueue
227+
.Values
228+
.ToImmutableArray();
229+
}
230+
}
231+
232+
private ClusterQueue EnsureClusterQueue(RowItem blockItem, CancellationToken ct)
152233
{
153234
var tableIdentity = blockItem.GetSourceTableIdentity();
154235

0 commit comments

Comments
 (0)