Observable with backpressure in C#

Is there a way in C# rx to handle backpressure? I'm trying to call a web api from the results of a paged query. This web api is very fragile and I need to not have more than say 3 concurrent calls, so, the program should be something like:

  1. Feth a page from db
  2. Call the web api with a maximum of three concurrent calls per each record on the page
  3. Save the results back to db
  4. Fetch another page and repeat until there are no more results.

I'm not really getting the sequence that I'm after, basically the db gets all the records regardless of whether they can be processed or not.

I've tried a variety of things including tweaking at the ObserveOn operator, implementing a semaphore, and a few other things. Could I get a little bit of guidance to implement something like this?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Castle.Core.Internal;
using Xunit;
using Xunit.Abstractions;

namespace ProductValidation.CLI.Tests.Services
{
    public class Example
    {
        private readonly ITestOutputHelper output;

        public Example(ITestOutputHelper output)
        {
            this.output = output;
        }

        [Fact]
        public async Task RunsObservableToCompletion()
        {
            var repo = new Repository(output);
            var client = new ServiceClient(output);

            var results = repo.FetchRecords()
                .Select(x => client.FetchMoreInformation(x).ToObservable())
                .Merge(1)
                .Do(async x => await repo.Save(x));

            await results.LastOrDefaultAsync();
        } 
    }

    public class Repository
    {
        private readonly ITestOutputHelper output;

        public Repository(ITestOutputHelper output)
        {
            this.output = output;
        }

        public IObservable<int> FetchRecords()
        {
            return Observable.Create<int>(async (observer) =>
            {
                var page = 1;
                var products = await FetchPage(page);
                while (!products.IsNullOrEmpty())
                {
                    foreach (var product in products)
                    {
                        observer.OnNext(product);
                    }

                    page += 1;
                    products = await FetchPage(page);
                }
                observer.OnCompleted();
            })
            .ObserveOn(SynchronizationContext.Current);
        }

        private async Task<IEnumerable<int>> FetchPage(int page)
        {
            // Simulate fetching a paged query.
            await Task.Delay(500).ToObservable().ObserveOn(new TaskPoolScheduler(new TaskFactory()));
            output.WriteLine("Fetching page {0}", page);
            if (page >= 4) return Enumerable.Empty<int>();
            return Enumerable.Range(1, 3).Select(_ => page);
        }

        public async Task Save(string id)
        {
            await Task.Delay(50); //Simulates latency
        }
    }

    public class ServiceClient
    {
        private readonly ITestOutputHelper output;
        private readonly SemaphoreSlim semaphore;

        public ServiceClient(ITestOutputHelper output)
        {
            this.output = output;
            this.semaphore = new SemaphoreSlim(2);
        }

        public async Task<string> FetchMoreInformation(int id)
        {
            try
            {
                output.WriteLine("Calling the web client for {0}", id);
                await semaphore.WaitAsync(); // Protection for the webapi not sending too many calls
                await Task.Delay(1000); //Simulates latency
                return id.ToString();
            }
            finally
            {
                semaphore.Release();
            }
        }
    }
}

The Rx does not support backpressure, so there is no easy way to fetch the records from the DB at the same tempo that the records are processed. Maybe you could use a Subject<Unit> as a signaling mechanism, push a value every time a record is processed, and devise a way to use these signals at the producing site to fetch a new record from the DB when a signal is received. But it will be a messy and idiomatic solution. The TPL Dataflow is a more suitable tool than the Rx for doing this kind of work. It supports natively the BoundedCapacity configuration option.

Some comments regarding the code you've posted, that are not directly related to the backpressure issue:

The Merge operator with a maxConcurrent parameter imposes a limit on the concurrent subscriptions to the inner sequences, but this will have no effect in case the inner sequences are already up and running. So you have to ensure that the inner sequences are cold, and a handy way to do this is the Defer operator:

.Select(x => Observable.Defer(() =>
    client.FetchMoreInformation(x).ToObservable()))

A more common way to convert asynchronous methods to deferred observable sequences is the FromAsync operator:

.Select(x => Observable.FromAsync(() => client.FetchMoreInformation(x)))

Btw the Do operator does not understand async delegates, so instead of:

.Do(async x => await repo.Save(x));

...which creates async void lambdas, it's better to do this:

.Select(x => Observable.FromAsync(() => repo.Save(x)))
.Merge(1);

Update: Here is an example of how you could use a SemaphoreSlim in order to implement backpressure in Rx:

const int boundedCapacity = 10;
using var semaphore = new SemaphoreSlim(boundedCapacity, boundedCapacity);

IObservable<int> results = repo
    .FetchRecords(semaphore)
    .Select(x => Observable.FromAsync(() => client.FetchMoreInformation(x)))
    .Merge(1)
    .Select(x => Observable.FromAsync(() => repo.Save(x)))
    .Merge(1)
    .Do(_ => semaphore.Release());

await results.DefaultIfEmpty();

And inside the FetchRecords method:

//...
await semaphore.WaitAsync();
observer.OnNext(product);
//...

This is a fragile solution, because it depends on propagating all elements through the pipeline. If in the future you decide to include filtering or throttling inside the pipeline, then the one-to-one relationship between WaitAsync and Release will be violated, with the most probable outcome being a deadlocked pipeline.