@@ -57,6 +57,7 @@ public class FetchItemQueues {
57
57
long timelimit = -1 ;
58
58
int maxExceptionsPerQueue = -1 ;
59
59
long exceptionsPerQueueDelay = -1 ;
60
+ long exceptionsPerQueueClearAfter = 1800 * 1000L ;
60
61
boolean feederAlive = true ;
61
62
Configuration conf ;
62
63
@@ -88,6 +89,8 @@ public FetchItemQueues(Configuration conf) {
88
89
"fetcher.max.exceptions.per.queue" , -1 );
89
90
this .exceptionsPerQueueDelay = (long ) (conf
90
91
.getFloat ("fetcher.exceptions.per.queue.delay" , .0f ) * 1000 );
92
+ this .exceptionsPerQueueClearAfter = (long ) (conf
93
+ .getFloat ("fetcher.exceptions.per.queue.clear.after" , 1800.0f ) * 1000 );
91
94
92
95
int dedupRedirMaxTime = conf .getInt ("fetcher.redirect.dedupcache.seconds" ,
93
96
-1 );
@@ -179,25 +182,41 @@ public synchronized FetchItem getFetchItem() {
179
182
it = queues .entrySet ().iterator ();
180
183
}
181
184
185
+ boolean keepExceptionState = (maxExceptionsPerQueue > -1
186
+ || exceptionsPerQueueDelay > 0 );
187
+
182
188
while (it .hasNext ()) {
183
189
FetchItemQueue fiq = it .next ().getValue ();
184
190
185
191
// reap empty queues which do not hold state required to ensure politeness
186
192
if (fiq .getQueueSize () == 0 && fiq .getInProgressSize () == 0 ) {
187
193
if (!feederAlive ) {
188
- // no more fetch items added
194
+ // no more fetch items added: queue can be safely removed
189
195
it .remove ();
190
- } else if ((maxExceptionsPerQueue > -1 || exceptionsPerQueueDelay > 0 )
191
- && fiq .exceptionCounter .get () > 0 ) {
192
- // keep queue because the exceptions counter is bound to it
193
- // and is required to skip or delay items on this queue
194
- } else if (fiq .nextFetchTime .get () > System .currentTimeMillis ()) {
196
+ continue ;
197
+ }
198
+
199
+ if (fiq .nextFetchTime .get () > System .currentTimeMillis ()) {
195
200
// keep queue to have it blocked in case new fetch items of this queue
196
201
// are added by the QueueFeeder
197
- } else {
198
- // empty queue without state
199
- it .remove ();
202
+ continue ;
200
203
}
204
+
205
+ if (keepExceptionState && fiq .exceptionCounter .get () > 0 ) {
206
+ if ((fiq .nextFetchTime .get () + exceptionsPerQueueClearAfter ) < System
207
+ .currentTimeMillis ()) {
208
+ /*
209
+ * the time configured by fetcher.exceptions.per.queue.clear.after
210
+ * has passed in addition to the delay defined by the exponential
211
+ * backoff
212
+ */
213
+ it .remove ();
214
+ }
215
+ continue ;
216
+ }
217
+
218
+ // queue is empty and does not hold state required to ensure politeness
219
+ it .remove ();
201
220
continue ;
202
221
}
203
222
@@ -239,9 +258,9 @@ public synchronized int checkTimelimit() {
239
258
return count ;
240
259
}
241
260
242
- // empties the queues (used by timebomb and throughput threshold)
261
+ // empties the queues (used by fetcher timelimit and throughput threshold)
243
262
public synchronized int emptyQueues () {
244
- int count = 0 ;
263
+ int count = 0 , queuesDropped = 0 ;
245
264
246
265
for (String id : queues .keySet ()) {
247
266
FetchItemQueue fiq = queues .get (id );
@@ -251,8 +270,12 @@ public synchronized int emptyQueues() {
251
270
int deleted = fiq .emptyQueue ();
252
271
totalSize .addAndGet (-deleted );
253
272
count += deleted ;
273
+ queuesDropped ++;
254
274
}
255
275
276
+ LOG .info ("Emptied all queues: {} queues with {} items" ,
277
+ queuesDropped , count );
278
+
256
279
return count ;
257
280
}
258
281
@@ -282,10 +305,18 @@ public synchronized int checkExceptionThreshold(String queueid,
282
305
if (fiq == null ) {
283
306
return 0 ;
284
307
}
308
+
285
309
int excCount = fiq .incrementExceptionCounter ();
310
+ if (maxExceptions != -1 && excCount >= maxExceptions ) {
311
+ // too many exceptions for items in this queue - purge it
312
+ return purgeAndBlockQueue (queueid , fiq , excCount );
313
+ }
314
+
315
+ long nexFetchTime = 0 ;
286
316
if (delay > 0 ) {
287
- fiq .nextFetchTime .getAndAdd (delay );
317
+ nexFetchTime = fiq .nextFetchTime .addAndGet (delay );
288
318
LOG .info ("* queue: {} >> delayed next fetch by {} ms" , queueid , delay );
319
+
289
320
} else if (exceptionsPerQueueDelay > 0 ) {
290
321
/*
291
322
* Delay the next fetch by a time span growing exponentially with the
@@ -298,32 +329,41 @@ public synchronized int checkExceptionThreshold(String queueid,
298
329
// double the initial delay with every observed exception
299
330
exceptionDelay *= 2L << Math .min ((excCount - 2 ), 31 );
300
331
}
301
- fiq .nextFetchTime .getAndAdd (exceptionDelay );
332
+ nexFetchTime = fiq .nextFetchTime .addAndGet (exceptionDelay );
302
333
LOG .info (
303
334
"* queue: {} >> delayed next fetch by {} ms after {} exceptions in queue" ,
304
335
queueid , exceptionDelay , excCount );
305
336
}
306
- if (maxExceptions != -1 && excCount >= maxExceptions ) {
307
- // too many exceptions for items in this queue - purge it
308
- int deleted = fiq .emptyQueue ();
309
- if (deleted > 0 ) {
310
- LOG .info (
311
- "* queue: {} >> removed {} URLs from queue because {} exceptions occurred" ,
312
- queueid , deleted , excCount );
313
- totalSize .getAndAdd (-deleted );
314
- }
315
- if (feederAlive ) {
316
- LOG .info ("* queue: {} >> blocked after {} exceptions" , queueid ,
317
- excCount );
318
- // keep queue IDs to ensure that these queues aren't created and filled
319
- // again, see addFetchItem(FetchItem)
320
- queuesMaxExceptions .add (queueid );
321
- }
322
- return deleted ;
337
+
338
+ if (timelimit > 0 && nexFetchTime > timelimit ) {
339
+ // the next fetch would happen after the fetcher timelimit
340
+ LOG .info (
341
+ "* queue: {} >> purging queue because next fetch scheduled after fetcher timelimit" ,
342
+ queueid );
343
+ return purgeAndBlockQueue (queueid , fiq , excCount );
323
344
}
345
+
324
346
return 0 ;
325
347
}
326
348
349
+ private int purgeAndBlockQueue (String queueid , FetchItemQueue fiq ,
350
+ int excCount ) {
351
+ int deleted = fiq .emptyQueue ();
352
+ if (deleted > 0 ) {
353
+ LOG .info (
354
+ "* queue: {} >> removed {} URLs from queue after {} exceptions occurred" ,
355
+ queueid , deleted , excCount );
356
+ totalSize .getAndAdd (-deleted );
357
+ }
358
+ if (feederAlive ) {
359
+ LOG .info ("* queue: {} >> blocked after {} exceptions" , queueid , excCount );
360
+ // keep queue IDs to ensure that these queues aren't created and filled
361
+ // again, see addFetchItem(FetchItem)
362
+ queuesMaxExceptions .add (queueid );
363
+ }
364
+ return deleted ;
365
+ }
366
+
327
367
/**
328
368
* Increment the exception counter of a queue in case of an exception e.g.
329
369
* timeout; when higher than a given threshold simply empty the queue.
0 commit comments