With Rx, how do I ignore all-except-the-latest value when my Subscribe method is running
Using Reactive Extensions, I want to ignore messages coming from my event stream that occur while my Subscribe
method is running. I.e. it sometimes takes me longer to process a message than the time between message, so I want to drop the messages I don't have time to process.
However, when my Subscribe
method completes, if any messages did come through I want to process the last one. So I always process the most recent message.
So, if I have some code which does:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
and if we assume the '100' takes a long time to process. Then I want the '2' to be processed when the '100' completes. The '1' should be ignored because it was superseded by the '2' while the '100' was still being processed.
Here's an example of the result I want using a background task and Latest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
However, Latest() is a blocking call and I'd prefer not to have a thread sitting waiting for the next value like this (there will sometimes be very long gaps between messages).
I can also get the result I want by using a BroadcastBlock
from TPL Dataflow, like this:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
but this feels like it should be possible directly in Rx. What's the best way to go about doing it?
Here is a method that is similar to Dave's but uses Sample
instead (which is more appropriate than buffer). I've included a similar extension method to the one I added to Dave's answer.
The extension:
public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
var sampler = new Subject<Unit>();
var sub = source.
Sample(sampler).
ObserveOn(Scheduler.ThreadPool).
Subscribe(l =>
{
action(l);
sampler.OnNext(Unit.Default);
});
// start sampling when we have a first value
source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));
return sub;
}
Note that it's simpler, and there is no 'empty' buffer that's fired. The first element that is sent to the action actually comes from the stream itself.
Usage is straightforward:
messages.SubscribeWithoutOverlap(n =>
{
Console.WriteLine("start: " + n);
Thread.Sleep(500);
Console.WriteLine("end: " + n);
});
messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing
And results:
source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
Here is an attempt using "just" Rx. The timer and the subscriber are kept independent by observing on the threadpool and I've used a subject to provide feedback on completing the task.
I don't think this is a simple solution, but I hope it might give you ideas for improvement.
messages.
Buffer(() => feedback).
Select(l => l.LastOrDefault()).
ObserveOn(Scheduler.ThreadPool).
Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
feedback.OnNext(Unit.Default);
});
feedback.OnNext(Unit.Default);
There is one slight problem -- the buffer is first closed when it's empty so it generates the default value. You could probably solve it by doing the feedback after the first message.
Here it is as an extension function:
public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
var feedback = new Subject<Unit>();
var sub = source.
Buffer(() => feedback).
ObserveOn(Scheduler.ThreadPool).
Subscribe(l =>
{
action(l.LastOrDefault());
feedback.OnNext(Unit.Default);
});
feedback.OnNext(Unit.Default);
return sub;
}
And usage:
messages.SubscribeWithoutOverlap(n =>
{
Thread.Sleep(1000);
Console.WriteLine(n);
});