@@ -11,28 +11,39 @@ public static class ObservableExtensions
11
11
/// <param name="source">The source observable.</param>
12
12
/// <param name="dueTime">How long to wait between attempts.</param>
13
13
/// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
14
+ /// <param name="retryCount">The number of attempts of running the source observable before failing.</param>
14
15
/// <returns>
15
16
/// A cold observable which retries (re-subscribes to) the source observable on error up to the
16
17
/// specified number of times or until it successfully terminates.
17
18
/// </returns>
18
19
public static IObservable < T > RetryAfterDelay < T , TException > (
19
20
this IObservable < T > source ,
20
21
TimeSpan dueTime ,
21
- Func < TException , bool > retryOnError )
22
+ Func < TException , bool > retryOnError ,
23
+ int ? retryCount = null )
22
24
where TException : Exception
23
25
{
24
26
int attempt = 0 ;
25
27
26
- return Observable . Defer ( ( ) =>
28
+ var pipeline = Observable . Defer ( ( ) =>
27
29
{
28
30
return ( ( ++ attempt == 1 ) ? source : source . DelaySubscription ( dueTime ) )
29
31
. Select ( item => new Tuple < bool , T , Exception > ( true , item , null ) )
30
32
. Catch < Tuple < bool , T , Exception > , TException > ( e => retryOnError ( e )
31
33
? Observable . Throw < Tuple < bool , T , Exception > > ( e )
32
34
: Observable . Return ( new Tuple < bool , T , Exception > ( false , default ( T ) , e ) ) ) ;
33
- } )
34
- . Retry ( )
35
- . SelectMany ( t => t . Item1
35
+ } ) ;
36
+
37
+ if ( retryCount . HasValue )
38
+ {
39
+ pipeline = pipeline . Retry ( retryCount . Value ) ;
40
+ }
41
+ else
42
+ {
43
+ pipeline = pipeline . Retry ( ) ;
44
+ }
45
+
46
+ return pipeline . SelectMany ( t => t . Item1
36
47
? Observable . Return ( t . Item2 )
37
48
: Observable . Throw < T > ( t . Item3 ) ) ;
38
49
}
0 commit comments