Write an Rx "RetryAfter" extension method
Solution 1:
The key to this implementation of a back off retry is deferred observables. A deferred observable won't execute its factory until someone subscribes to it. And it will invoke the factory for each subscription, making it ideal for our retry scenario.
Assume we have a method which triggers a network request.
public IObservable<WebResponse> SomeApiMethod() { ... }
For the purposes of this little snippet, let's define the deferred as source
var source = Observable.Defer(() => SomeApiMethod());
Whenever someone subscribes to source it will invoke SomeApiMethod and launch a new web request. The naive way to retry it whenever it fails would be using the built in Retry operator.
source.Retry(4)
That wouldn't be very nice to the API though and it's not what you're asking for. We need to delay the launching of requests in between each attempt. One way of doing that is with a delayed subscription.
Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)
That's not ideal since it'll add the delay even on the first request, let's fix that.
int attempt = 0;
Observable.Defer(() => {
return ((++attempt == 1) ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)
Just pausing for a second isn't a very good retry method though so let's change that constant to be a function which receives the retry count and returns an appropriate delay. Exponential back off is easy enough to implement.
Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));
((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1)))
We're almost done now, we just need to add a way of specifying for which exceptions we should retry. Let's add a function that given an exception returns whether or not it makes sense to retry, we'll call it retryOnError.
Now we need to write some scary looking code but bear with me.
Observable.Defer(() => {
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1)))
.Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
.Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
: Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
? Observable.Return(t.Item2)
: Observable.Throw<T>(t.Item3))
All of those angle brackets are there to marshal an exception for which we shouldn't retry past the .Retry()
. We've made the inner observable be an IObservable<Tuple<bool, WebResponse, Exception>>
where the first bool indicates if we have a response or an exception. If retryOnError indicates that we should retry for a particular exception the inner observable will throw and that will be picked up by the retry. The SelectMany just unwraps our Tuple and makes the resulting observable be IObservable<WebRequest>
again.
See my gist with full source and tests for the final version. Having this operator allows us to write our retry code quite succinctly
Observable.Defer(() => SomApiMethod())
.RetryWithBackoffStrategy(
retryCount: 4,
retryOnError: e => e is ApiRetryWebException
)
Solution 2:
Maybe I'm over simplifying the situation, but if we look at the implementation of Retry, it is simply an Observable.Catch over an infinite enumerable of observables:
private static IEnumerable<T> RepeatInfinite<T>(T value)
{
while (true)
yield return value;
}
public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
{
return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source));
}
So if we take this approach, we can just add a delay after the first yield.
private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime)
{
// Don't delay the first time
yield return source;
while (true)
yield return source.DelaySubscription(dueTime);
}
public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
{
return RepeateInfinite(source, dueTime).Catch();
}
An overload that catches a specific exception with a retry count can be even more concise:
public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception
{
return source.Catch<TSource, TException>(exception =>
{
if (count <= 0)
{
return Observable.Throw<TSource>(exception);
}
return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count);
});
}
Note that the overload here is using recursion. On first appearances, it would seem that a StackOverflowException is possible if count was something like Int32.MaxValue. However, DelaySubscription uses a scheduler to run the subscription action, so stack overflow would not be possible (i.e. using "trampolining"). I guess this isn't really obvious by looking at the code though. We could force a stack overflow by explicitly setting the scheduler in the DelaySubscription overload to Scheduler.Immediate, and passing in TimeSpan.Zero and Int32.MaxValue. We could pass in a non-immediate scheduler to express our intent a little more explicitly, e.g.:
return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count);
UPDATE: Added overload to take in a specific scheduler.
public static IObservable<TSource> RetryAfterDelay<TSource, TException>(
this IObservable<TSource> source,
TimeSpan retryDelay,
int retryCount,
IScheduler scheduler) where TException : Exception
{
return source.Catch<TSource, TException>(
ex =>
{
if (retryCount <= 0)
{
return Observable.Throw<TSource>(ex);
}
return
source.DelaySubscription(retryDelay, scheduler)
.RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler);
});
}
Solution 3:
Here's the one I'm using:
public static IObservable<T> DelayedRetry<T>(this IObservable<T> src, TimeSpan delay)
{
Contract.Requires(src != null);
Contract.Ensures(Contract.Result<IObservable<T>>() != null);
if (delay == TimeSpan.Zero) return src.Retry();
return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry());
}
Solution 4:
Based on Markus' answer I wrote the following:
public static class ObservableExtensions
{
private static IObservable<T> BackOffAndRetry<T>(
this IObservable<T> source,
Func<int, TimeSpan> strategy,
Func<int, Exception, bool> retryOnError,
int attempt)
{
return Observable
.Defer(() =>
{
var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt);
var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay);
return s
.Catch<T, Exception>(e =>
{
if (retryOnError(attempt, e))
{
return source.BackOffAndRetry(strategy, retryOnError, attempt + 1);
}
return Observable.Throw<T>(e);
});
});
}
public static IObservable<T> BackOffAndRetry<T>(
this IObservable<T> source,
Func<int, TimeSpan> strategy,
Func<int, Exception, bool> retryOnError)
{
return source.BackOffAndRetry(strategy, retryOnError, 0);
}
}
I like it more because
- it doesn't modify
attempts
but uses recursion. - It doesn't use
retries
but passes the number of attempts toretryOnError