@@ -18,9 +18,10 @@ namespace Microsoft.VisualStudio.Services.Agent
18
18
[ System . Diagnostics . CodeAnalysis . SuppressMessage ( "Microsoft.Naming" , "CA1711: Identifiers should not have incorrect suffix" ) ]
19
19
public interface IJobServerQueue : IAgentService , IThrottlingReporter
20
20
{
21
+ bool ForceDrainWebConsoleQueue { get ; set ; }
22
+ bool ForceDrainTimelineQueue { get ; set ; }
21
23
event EventHandler < ThrottlingEventArgs > JobServerQueueThrottling ;
22
24
Task ShutdownAsync ( ) ;
23
- Task DrainQueues ( ) ;
24
25
void Start ( Pipelines . AgentJobRequestMessage jobRequest ) ;
25
26
void QueueWebConsoleLine ( Guid stepRecordId , string line , long lineNumber ) ;
26
27
void QueueFileUpload ( Guid timelineId , Guid timelineRecordId , string type , string name , string path , bool deleteSource ) ;
@@ -86,34 +87,15 @@ public sealed class JobServerQueue : AgentService, IJobServerQueue
86
87
private bool _writeToBlobStoreAttachments = false ;
87
88
private bool _debugMode = false ;
88
89
90
+ public bool ForceDrainWebConsoleQueue { get ; set ; }
91
+ public bool ForceDrainTimelineQueue { get ; set ; }
92
+
89
93
public override void Initialize ( IHostContext hostContext )
90
94
{
91
95
base . Initialize ( hostContext ) ;
92
96
_jobServer = hostContext . GetService < IJobServer > ( ) ;
93
97
}
94
98
95
- public async Task DrainQueues ( )
96
- {
97
- // Drain the queue
98
- // ProcessWebConsoleLinesQueueAsync() will never throw exception, live console update is always best effort.
99
- Trace . Verbose ( "Draining web console line queue." ) ;
100
- await ProcessWebConsoleLinesQueueAsync ( runOnce : true ) ;
101
- Trace . Info ( "Web console line queue drained." ) ;
102
-
103
- // ProcessFilesUploadQueueAsync() will never throw exception, log file upload is always best effort.
104
- Trace . Verbose ( "Draining file upload queue." ) ;
105
- await ProcessFilesUploadQueueAsync ( runOnce : true ) ;
106
- Trace . Info ( "File upload queue drained." ) ;
107
-
108
- // ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown
109
- // if there is any timeline records that failed to update contains output variabls.
110
- Trace . Verbose ( "Draining timeline update queue." ) ;
111
- await ProcessTimelinesUpdateQueueAsync ( runOnce : true ) ;
112
- Trace . Info ( "Timeline update queue drained." ) ;
113
-
114
- Trace . Info ( "All queues are drained." ) ;
115
- }
116
-
117
99
public void Start ( Pipelines . AgentJobRequestMessage jobRequest )
118
100
{
119
101
Trace . Entering ( ) ;
@@ -192,7 +174,24 @@ public async Task ShutdownAsync()
192
174
_queueInProcess = false ;
193
175
Trace . Info ( "All queue process task stopped." ) ;
194
176
195
- await DrainQueues ( ) ;
177
+ // Drain the queue
178
+ // ProcessWebConsoleLinesQueueAsync() will never throw exception, live console update is always best effort.
179
+ Trace . Verbose ( "Draining web console line queue." ) ;
180
+ await ProcessWebConsoleLinesQueueAsync ( runOnce : true ) ;
181
+ Trace . Info ( "Web console line queue drained." ) ;
182
+
183
+ // ProcessFilesUploadQueueAsync() will never throw exception, log file upload is always best effort.
184
+ Trace . Verbose ( "Draining file upload queue." ) ;
185
+ await ProcessFilesUploadQueueAsync ( runOnce : true ) ;
186
+ Trace . Info ( "File upload queue drained." ) ;
187
+
188
+ // ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown
189
+ // if there is any timeline records that failed to update contains output variables.
190
+ Trace . Verbose ( "Draining timeline update queue." ) ;
191
+ await ProcessTimelinesUpdateQueueAsync ( runOnce : true ) ;
192
+ Trace . Info ( "Timeline update queue drained." ) ;
193
+
194
+ Trace . Info ( "All queue process tasks have been stopped, and all queues are drained." ) ;
196
195
}
197
196
198
197
public void QueueWebConsoleLine ( Guid stepRecordId , string line , long lineNumber )
@@ -248,6 +247,12 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false)
248
247
{
249
248
while ( ! _jobCompletionSource . Task . IsCompleted || runOnce )
250
249
{
250
+ bool shouldDrain = ForceDrainWebConsoleQueue ;
251
+ if ( ForceDrainWebConsoleQueue )
252
+ {
253
+ ForceDrainWebConsoleQueue = false ;
254
+ }
255
+
251
256
if ( _webConsoleLineAggressiveDequeue && ++ _webConsoleLineAggressiveDequeueCount > _webConsoleLineAggressiveDequeueLimit )
252
257
{
253
258
Trace . Info ( "Stop aggressive process web console line queue." ) ;
@@ -279,7 +284,7 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false)
279
284
// process at most about 500 lines of web console line during regular timer dequeue task.
280
285
// Send the first line of output to the customer right away
281
286
// It might take a while to reach 500 line outputs, which would cause delays before customers see the first line
282
- if ( ( ! runOnce && linesCounter > 500 ) || _firstConsoleOutputs )
287
+ if ( ( ! runOnce && ! shouldDrain && linesCounter > 500 ) || _firstConsoleOutputs )
283
288
{
284
289
break ;
285
290
}
@@ -314,7 +319,7 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false)
314
319
// We batch and produce 500 lines of web console output every 500ms
315
320
// If customer's task produce massive of outputs, then the last queue drain run might take forever.
316
321
// So we will only upload the last 200 lines of each step from all buffered web console lines.
317
- if ( runOnce && batchedLines . Count > 2 )
322
+ if ( ( runOnce || shouldDrain ) && batchedLines . Count > 2 )
318
323
{
319
324
Trace . Info ( $ "Skip { batchedLines . Count - 2 } batches web console lines for last run") ;
320
325
batchedLines = batchedLines . TakeLast ( 2 ) . ToList ( ) ;
@@ -430,6 +435,8 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
430
435
{
431
436
while ( ! _jobCompletionSource . Task . IsCompleted || runOnce )
432
437
{
438
+ bool shouldDrain = ForceDrainTimelineQueue ;
439
+
433
440
List < PendingTimelineRecord > pendingUpdates = new List < PendingTimelineRecord > ( ) ;
434
441
foreach ( var timeline in _allTimelines )
435
442
{
@@ -442,7 +449,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
442
449
{
443
450
records . Add ( record ) ;
444
451
// process at most 25 timeline records update for each timeline.
445
- if ( ! runOnce && records . Count > 25 )
452
+ if ( ! runOnce && ! shouldDrain && records . Count > 25 )
446
453
{
447
454
break ;
448
455
}
@@ -514,7 +521,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
514
521
}
515
522
}
516
523
517
- if ( runOnce )
524
+ if ( runOnce || shouldDrain )
518
525
{
519
526
// continue process timeline records update,
520
527
// we might have more records need update,
@@ -535,14 +542,19 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
535
542
}
536
543
else
537
544
{
538
- break ;
545
+ if ( ForceDrainTimelineQueue )
546
+ {
547
+ ForceDrainTimelineQueue = false ;
548
+ }
549
+ if ( runOnce )
550
+ {
551
+ break ;
552
+ }
539
553
}
540
554
}
541
555
}
542
- else
543
- {
544
- await Task . Delay ( _delayForTimelineUpdateDequeue ) ;
545
- }
556
+
557
+ await Task . Delay ( _delayForTimelineUpdateDequeue ) ;
546
558
}
547
559
}
548
560
0 commit comments