How to add "reload" and IsLoading status to 2nd level Observable
I feel like I'm trying to reinvent a wheel, so I better ask.
GIVEN
- that I have an
Observable<T> source
- and
Task LoadAsync<T>(T value)
method
WHEN
-
I use Select/Switch pattern to call LoadMethod when source emits
observable .Select(value => Observable.FromAsync(cancellationToken => LoadAsync(value, cancellationToken))) .Switch() .Subscribe();
THEN
- How do I add reload functionality?
- How do I report IsLoading status: whether the
LoadAsync
is running - How to cancel
LoadAsync
whensource
completes
I want to create some reusable method, or class, that would implement answers to #1 and #2.
I have this so far: https://dotnetfiddle.net/0zPhBE
public class ReactiveLoader<T> : IDisposable
{
private readonly BehaviorSubject<bool> _isLoading = new(false);
private readonly Subject<Unit> _completes = new();
private readonly Subject<T> _reloads = new Subject<T>();
private readonly IDisposable _subscription;
public bool IsLoading => _isLoading.Value;
public IObservable<bool> IsLoadingObservable => _isLoading.Skip(1).DistinctUntilChanged(); //Not nice
public ReactiveLoader(IObservable<T> observable, Func<T, CancellationToken, Task> handler)
{
_subscription = observable
.Finally(() => //Not nice
{
_completes.OnNext(Unit.Default);
})
.Merge(_reloads)
.Do(_ => _isLoading.OnNext(true))
.Select(value => Observable.FromAsync(cancellationToken => handler(value, cancellationToken)))
.Switch()
.Do(_ => _isLoading.OnNext(false))
.TakeUntil(_completes) //cancels loading when observable completes
.Subscribe();
}
public void Reload()
{
_reloads.OnNext(??); //needs last value of source
}
public void Dispose()
{
_completes.OnNext(Unit.Default);
_subscription.Dispose();
}
}
Here is one approach:
IObservable<bool> sequence = source.Publish(published => published
.CombineLatest(_reloads, (x, _) => x)
.Select(x => Observable.FromAsync(ct => LoadAsync(x, ct)).Select(_ => false).Prepend(true))
.Switch()
.Do(_isLoading)
.TakeUntil(published.LastOrDefaultAsync()));
The CombineLatest
operator will re-emit the latest value every time the _reloads
emits a signal.
The .Select(_ => false).Prepend(true)
converts the inner observable from an IObservable<Unit>
to an IObservable<bool>
, that emits loading-status signals.
The TakeUntil(published.LastOrDefaultAsync())
will terminate the sequence immediately when the source
terminates (without waiting any pending LoadAsync
operation).