How to handle the exception thrown by the async method with observable?
What's happening in your code is a direct consequence of you using Observable.Create
and filling the observable with this code:
Enumerable.Range(1, 10).ToList().ForEach(x =>
{
observer.OnNext(x);
});
Observable.Create
uses the current thread to create the observable, so the Enumerable.Range(1, 10).ToList().ForEach
executes immediately on the current thread and the call to OnNext
executes the handler(x).Wait()
immediately.
You'll note, though, that the exception occurs in the delegate passed to the Subscribe
. Internally there is code like this:
catch (Exception exception)
{
if (!autoDetachObserver.Fail(exception))
{
throw;
}
return autoDetachObserver;
}
That catches the exception in the subscribe, cancels the subscription - hence the "Observable Dispose"
message - and then rethrows the exception and that's where your code catches it.
Now, if you wanted to do this properly in Rx, you'd avoid Observable.Create
. It's a tempting way to create observables, but it leads to trouble.
Instead do this:
public async Task Test()
{
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
await
Observable
.Range(1, 10)
.SelectMany(i => Observable.FromAsync(() => handler(i)))
.LastOrDefaultAsync();
}
But, of course, we want to handle the exception. The simple way is like this:
public async Task Test()
{
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
await
Observable
.Range(1, 10)
.SelectMany(i =>
Observable
.FromAsync(() => handler(i))
.Catch<Unit, Exception>(ex =>
{
Console.WriteLine($"The exception is catch:{ex.ToString()}");
return Observable.Empty<Unit>();
}))
.LastOrDefaultAsync();
}
That now outputs the 10 exception errors and completes normally.