C# Corner
Reactive Extensions: Just What the Doctor Ordered (Part 1)
The Reactive Extensions (Rx) Library is a set of extensions for the IObservable<T> and IObserver<T> interfaces that greatly simplifies the orchestration and composition of asynchronous functions and events.
Read the entire series: Part 1 | Part 2 | Part 3
The Reactive Extensions (Rx) Library is a set of extensions for the IObservable<T> and IObserver<T> interfaces that were added in .NET 4.0. The extensions allow for the use of LINQ to query IObservable<T> collections as if they were IEnumerable collections. In addition Rx also greatly simplifies the orchestration and composition of asynchronous functions and events. It is this last bit that is most interesting. Imagine being able to easily observe multiple data services and perform operations on that data in real time. Another likely scenario may be composing a set of events to capture gestures in real-time.
Before we dive too deep into Rx, let's go over the basics of the IObservable<T> and IObserver<T> push interfaces that Rx extends.
The IObservable<T> interface requires the implementation of one method:
IDisposable Subscribe(IObserver<T> observer).
The IObserver<T> interface requires the implementation of the following three methods:
void OnCompleted()
void OnError(Exception error)
void OnNext(T value).
An easy way to think of IObservable<T> and IObserver<T> is that they are the dual of IEnumerable and IEnumerator. Whereas IEnumerable and IEnumerator are used for pulling a data collection, IObservable and IObserver are used for pushing a data collection.
A class that implements IObservable acts a collection of data that can be observed. A class that implements IObserver can subscribe to an observable collection and responds to the OnNext, OnError and OnCompleted events. The OnNext event is fired when more data is available, OnError if an exception occurs, and OnCompleted when there is no additional data.
Setting up Rx
The first thing you'll need to do is download the Reactive Extensions from Microsoft. You can also obtain Rx through NuGet. Once you have Rx downloaded and installed, add a reference to System.Reactive and WindowsBase to your project and import the System, System.Linq and System.Reactive.Linq namespaces in the class where you'll be using Rx.
Observing a Data Collection
Rx includes a handy ToObservable<T> extension method onto IEnumerable<T> that converts a typed collection to a typed Observable collection. This function is particularly useful for observing a fixed chunk of data.
First we create a standard collection to store our set of data to publish, in this case the first 1,000 digits in the Fibonacci sequence. The Fibonacci sequence is defined as 0, 1, and f(n-2) + f(n-1) for each subsequent digit.
List<ulong> fibonacciSeq = GenerateFibonnaci(1000);
Next convert the collection to an observable collection using ToObservable().
IObservable<int> fibonacciSource = fibonacciSeq.ToObservable();
Once the observable collection has been created, we can subscribe to the collection and perform a meaningful task. Rx adds several Subscribe extension methods onto IObservable<T> that allow you to specify Action<T> methods for each of the IObserver events.
Console.WriteLine("Press enter to subscribe");
Console.ReadLine();
using (var fibonnacciSub = fibonacciSource.SubscribeOn(Scheduler.TaskPool)
.Subscribe(
(x) => Console.WriteLine(x),
(Exception ex) => Console.WriteLine("Error received from source: {0}.", ex.Message),
() => Console.WriteLine("End of sequence.")
)
)
{
Console.WriteLine("Press enter again to unsubscribe from the service.");
Console.ReadLine();
}
Lastly add the GenerateFibonacci function that creates a finite Fibonacci sequence.
static List<ulong> GenerateFibonnaci(int sequenceLength)
{
List<ulong> sequence = new List<ulong>();
for (int i = 0; i < sequenceLength; i++)
{
if (i <= 1)
sequence.Add((ulong)i);
else
sequence.Add(sequence[i - 2] + sequence[i - 1]);
}
return sequence;
}
[Click on image for larger view.] |
Figure 1. |
The example above subscribes to our finite Fibonacci observable collection and defines an observer that outputs each digit on a new line. The observer also outputs any exception message to the console as well as an "End of sequence" notification when the collection has been fully observed. Note that when you run the example that the observable source is evaluated at the point that Subscribe is executed. Currently the subscription will execute on the same thread as it was called.
This behavior can be easily changed though by specifying that the subscription should execute using a specified IScheduler. For example to subscribe on the TaskPool, we can use the SubscribeOn extension method and pass it a value of Scheduler.TaskPool. Make sure that you have added a reference to System.Reactive.Concurrency. The updated code should look like this:
using (var fibonnacciSub = fibonacciSource.SubscribeOn(Scheduler.TaskPool)
.Subscribe(
(x) => Console.WriteLine(x),
(Exception ex) => Console.WriteLine("Error received from source: {0}.", ex.Message),
() => Console.WriteLine("End of sequence.")
)
)
{
Console.WriteLine("Press enter to unsubscribe from the service.");
Console.ReadLine();
}
Now when the user presses the enter key, the subscription will be unsubscribed and disposed.
So far we have gone over how to observe a cold observable or rather an observable collection that is not always running. Rx can also be used to observe hot observable or events that are always being published regardless if anyone is observing them. A classic example of a hot observable is an input event such as a mouse movement or key press.
Observing an Event
Rx adds a FromEvent<TEventArgs> extension method to IObservable<T>, which as the name implies creates an implementation of IObservable for a given event. For example to observe mouse movements, you would use the FromEvent extension method with a TEventArgs type of MouseEventArgs and define how to subscribe and unsubscribe from the event.
var mouseMovements =
Observable.FromEvent<MouseEventArgs>(
ev =>
{
this.MouseMove += (s, e) => ev(e);
},
ev =>
{
this.MouseMove -= (s, e) => ev(e);
});
Now that we have an IObservable<MouseEventArgs> we can subscribe and react to the observable data source. For example if we wanted to display the user's current position using a label on a form, we could use LINQ to project the mouse position from the observable collection, like so:
var mousePoints = from m in mouseMovements select m.GetPosition(this);
which will create an IObservable<Point> that observes the user's mouse position.
Finally we can subscribe to observable mousePoints and display the mouse position on the form.
mousePoints.ObserveOnDispatcher().Subscribe((pos) => mousePos.Content = pos.ToString());
As we are modifying the UI thread in our observer, we have to either observe or subscribe on the dispatcher thread to ensure that the label is only modified from the UI thread. The really nice thing about Rx is that this can be done with minimal code.
Conclusion
As you can see, Reactive Extensions makes it much easier to work with complex events and other push-based data sources. It really shines its ability to filter and project pushed data using LINQ.
Please stay tuned for the rest of the Rx series, where I will cover how to observe asynchronous methods and events, as well as how to compose and filter multiple observable collections. Both code examples are included in the code drop on the top of the page.
About the Author
Eric Vogel is a Senior Software Developer for Red Cedar Solutions Group in Okemos, Michigan. He is the president of the Greater Lansing User Group for .NET. Eric enjoys learning about software architecture and craftsmanship, and is always looking for ways to create more robust and testable applications. Contact him at [email protected].