A way to push buffered events in even intervals

What I'm trying to achieve is to buffer incoming events from some IObservable ( they come in bursts) and release them further, but one by one, in even intervals. Like this:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->

Since I'm quite new to Rx, I'm not sure if there already is a Subject or an operator that does just this. Maybe It can be done by composition?

update:

Thanks to Richard Szalay for pointing out the Drain operator, I found another example by James Miles of Drain operator usage. Here's how I managed to get it to work in a WPF app:

    .Drain(x => {
        Process(x);
        return Observable.Return(new Unit())
            .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
    }).Subscribe();

I had some fun, because omitting the scheduler parameter causes the app to crash in debug mode without any exception showing up ( I need to learn how to deal with exceptions in Rx). The Process method modifies the UI state directly, but I guess it's quite simple to make an IObservable out of it (using a ISubject?).

update:

In the meantime I've been experimenting with ISubject, the class below does what I wanted - it lets out buffered Ts in a timely manner:

public class StepSubject<T> : ISubject<T>
{
    IObserver<T> subscriber;
    Queue<T> queue = new Queue<T>();
    MutableDisposable cancel = new MutableDisposable();
    TimeSpan interval;
    IScheduler scheduler;
    bool idle = true;

    public StepSubject(TimeSpan interval, IScheduler scheduler)
    {
        this.interval = interval;
        this.scheduler = scheduler;
    }

    void Step()
    {
        T next;
        lock (queue)
        {
            idle = queue.Count == 0;
            if (!idle)
                next = queue.Dequeue();
        }

        if (!idle)
        {
            cancel.Disposable = scheduler.Schedule(Step, interval);
            subscriber.OnNext(next);
        }
    }

    public void OnNext(T value)
    {
        lock (queue)
            queue.Enqueue(value);

        if (idle)
            cancel.Disposable = scheduler.Schedule(Step);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        subscriber = observer;
        return cancel;
    }
}

This naive implementation is stripped from OnCompleted and OnError for clarity, also only single subscription allowed.


Solution 1:

It's actually tricker than it sounds.

Using Delay doesn't work because the values will still happen in bulk, only slightly delayed.

Using Interval with either CombineLatest or Zip doesn't work, since the former will cause source values to be skipped and the latter will buffer interval values.

I think the new Drain operator (added in 1.0.2787.0), combined with Delay should do the trick:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));

The Drain operator works like SelectMany, but waits until the previous output completes before calling the selector with the next value. It's still not exactly what you are after (the first value in a block will also be delayed), but it's close: The usage above matches your marble diagram now.

Edit: Apparently the Drain in the framework doesn't work like SelectMany. I'll ask for some advice in the official forums. In the meantime, here's an implementation of Drain that does what you're after:

Edit 09/11: Fixed errors in implementation and updated usage to match your requested marble diagram.

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

Solution 2:

Just for completeness here is an alterantive (more compact) version of the Drain() method suggested by Richard:

public static IObservable<T2> SelectManySequential<T1, T2>(
    this IObservable<T1> source, 
    Func<T1, IObservable<T2>> selector
)
{
    return source
        .Select(x => Observable.Defer<T2>(() => selector(x)))
        .Concat();
}

See the thread Drain + SelectMany = ? in the Rx forum.

Update: I realized that the Concat() overload that I used was one of my personal Rx extensions that are (not yet) part of the framework. I am sorry for this mistake .. Of course this makes my solution less elegant than I thought.

Nevertheless for completeness I post here my Conact() extension method overload:

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
    return Observable.CreateWithDisposable<T>(o =>
    {
        var lockCookie = new Object();
        bool completed = false;
        bool subscribed = false;
        var waiting = new Queue<IObservable<T>>();
        var pendingSubscription = new MutableDisposable();

        Action<Exception> errorHandler = e =>
        {
            o.OnError(e);
            pendingSubscription.Dispose();
        };

        Func<IObservable<T>, IDisposable> subscribe = null;
        subscribe = (ob) =>
        {
            subscribed = true;
            return ob.Subscribe(
                o.OnNext,
                errorHandler,
                () =>
                {
                    lock (lockCookie)
                    {
                        if (waiting.Count > 0)
                            pendingSubscription.Disposable = subscribe(waiting.Dequeue());
                        else if (completed)
                            o.OnCompleted();
                        else
                            subscribed = false;
                    }
                }
            );
        };

        return new CompositeDisposable(pendingSubscription,
            source.Subscribe(
                n =>
                {
                    lock (lockCookie)
                    {
                        if (!subscribed)
                            pendingSubscription.Disposable = subscribe(n);
                        else
                            waiting.Enqueue(n);
                    }

                },
                errorHandler
                , () =>
                {
                    lock (lockCookie)
                    {
                        completed = true;
                        if (!subscribed)
                            o.OnCompleted();
                    }
                }
            )
        );
    });
}

And now beating myself with my own weapons: The same Concat() method could be written much more elegant in Richard Szalay's brilliant way:

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Defer(() =>
    {
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
        return source
            .Zip(queue, (v, q) => v)
            .SelectMany(v => 
                v.Do(_ => { }, () => queue.OnNext(new Unit()))
            );
    });
}

So credit belongs to Richard. :-)

Solution 3:

Here's how I did this, just using an explicit queue (ReactiveCollection is just a fancy version of WPF's ObservableCollection - ReactiveCollection.ItemsAdded OnNext's for each item added, as you can imagine):

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309

public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null)
{
    var ret = new ReactiveCollection<T>();
    if (WithDelay == null) {
        FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add);
        return ret;
    }

    // On a timer, dequeue items from queue if they are available
    var queue = new Queue<T>();
    var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value)
        .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => {
            if (queue.Count > 0) { 
                ret.Add(queue.Dequeue());
            }
        });

    // When new items come in from the observable, stuff them in the queue.
    // Using the DeferredScheduler guarantees we'll always access the queue
    // from the same thread.
    FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue);

    // This is a bit clever - keep a running count of the items actually 
    // added and compare them to the final count of items provided by the
    // Observable. Combine the two values, and when they're equal, 
    // disconnect the timer
    ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), 
        (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose());

    return ret;
}