How to add "reload" and IsLoading status to 2nd level Observable

I feel like I'm trying to reinvent a wheel, so I better ask.

GIVEN

  • that I have an Observable<T> source
  • and Task LoadAsync<T>(T value) method

WHEN

  • I use Select/Switch pattern to call LoadMethod when source emits

    observable
       .Select(value => Observable.FromAsync(cancellationToken => LoadAsync(value, cancellationToken)))
       .Switch()
       .Subscribe(); 
    

THEN

  1. How do I add reload functionality?
  2. How do I report IsLoading status: whether the LoadAsync is running
  3. How to cancel LoadAsync when source completes

I want to create some reusable method, or class, that would implement answers to #1 and #2.

I have this so far: https://dotnetfiddle.net/0zPhBE

public class ReactiveLoader<T> : IDisposable
{
    private readonly BehaviorSubject<bool> _isLoading = new(false);
    private readonly Subject<Unit> _completes = new();
    private readonly Subject<T> _reloads = new Subject<T>();
    private readonly IDisposable _subscription;

    public bool IsLoading => _isLoading.Value;
    public IObservable<bool> IsLoadingObservable => _isLoading.Skip(1).DistinctUntilChanged(); //Not nice

    public ReactiveLoader(IObservable<T> observable, Func<T, CancellationToken, Task> handler)
    {           
        _subscription = observable
            .Finally(() => //Not nice
            {
                 _completes.OnNext(Unit.Default);
            })
            .Merge(_reloads)
            .Do(_ => _isLoading.OnNext(true))
            .Select(value => Observable.FromAsync(cancellationToken => handler(value, cancellationToken)))
            .Switch()
            .Do(_ => _isLoading.OnNext(false))
            .TakeUntil(_completes) //cancels loading when observable completes
            .Subscribe();
    }

    public void Reload()
    {
         _reloads.OnNext(??); //needs last value of source
    }

    public void Dispose()
    {
        _completes.OnNext(Unit.Default);
        _subscription.Dispose();
    }
}

Here is one approach:

IObservable<bool> sequence = source.Publish(published => published
    .CombineLatest(_reloads, (x, _) => x)
    .Select(x => Observable.FromAsync(ct => LoadAsync(x, ct)).Select(_ => false).Prepend(true))
    .Switch()
    .Do(_isLoading)
    .TakeUntil(published.LastOrDefaultAsync()));

The CombineLatest operator will re-emit the latest value every time the _reloads emits a signal.

The .Select(_ => false).Prepend(true) converts the inner observable from an IObservable<Unit> to an IObservable<bool>, that emits loading-status signals.

The TakeUntil(published.LastOrDefaultAsync()) will terminate the sequence immediately when the source terminates (without waiting any pending LoadAsync operation).