C# Corner

Reactive Extensions: Just What the Doctor Ordered (Part 1)

The Reactive Extensions (Rx) Library is a set of extensions for the IObservable<T> and IObserver<T> interfaces that greatly simplifies the orchestration and composition of asynchronous functions and events.

Read the entire series: Part 1 | Part 2 | Part 3

The Reactive Extensions (Rx) Library is a set of extensions for the IObservable<T> and IObserver<T> interfaces that were added in .NET 4.0. The extensions allow for the use of LINQ to query IObservable<T> collections as if they were IEnumerable collections. In addition Rx also greatly simplifies the orchestration and composition of asynchronous functions and events. It is this last bit that is most interesting. Imagine being able to easily observe multiple data services and perform operations on that data in real time. Another likely scenario may be composing a set of events to capture gestures in real-time.

Before we dive too deep into Rx, let's go over the basics of the IObservable<T> and IObserver<T> push interfaces that Rx extends.

The IObservable<T> interface requires the implementation of one method:

IDisposable Subscribe(IObserver<T> observer).

The IObserver<T> interface requires the implementation of the following three methods:

void OnCompleted()
void OnError(Exception error)
void OnNext(T value).

An easy way to think of IObservable<T> and IObserver<T> is that they are the dual of IEnumerable and IEnumerator. Whereas IEnumerable and IEnumerator are used for pulling a data collection, IObservable and IObserver are used for pushing a data collection.

A class that implements IObservable acts a collection of data that can be observed. A class that implements IObserver can subscribe to an observable collection and responds to the OnNext, OnError and OnCompleted events. The OnNext event is fired when more data is available, OnError if an exception occurs, and OnCompleted when there is no additional data.

Setting up Rx
The first thing you'll need to do is download the Reactive Extensions from Microsoft. You can also obtain Rx through NuGet. Once you have Rx downloaded and installed, add a reference to System.Reactive and WindowsBase to your project and import the System, System.Linq and System.Reactive.Linq namespaces in the class where you'll be using Rx.

Observing a Data Collection
Rx includes a handy ToObservable<T> extension method onto IEnumerable<T> that converts a typed collection to a typed Observable collection. This function is particularly useful for observing a fixed chunk of data.

First we create a standard collection to store our set of data to publish, in this case the first 1,000 digits in the Fibonacci sequence. The Fibonacci sequence is defined as 0, 1, and f(n-2) + f(n-1) for each subsequent digit.

List<ulong> fibonacciSeq = GenerateFibonnaci(1000);

Next convert the collection to an observable collection using ToObservable().

IObservable<int> fibonacciSource = fibonacciSeq.ToObservable();

Once the observable collection has been created, we can subscribe to the collection and perform a meaningful task. Rx adds several Subscribe extension methods onto IObservable<T> that allow you to specify Action<T> methods for each of the IObserver events.

Console.WriteLine("Press enter to subscribe");
Console.ReadLine();

 using (var fibonnacciSub = fibonacciSource.SubscribeOn(Scheduler.TaskPool)
     .Subscribe(
     (x) => Console.WriteLine(x),
     (Exception ex) => Console.WriteLine("Error received from source: {0}.", 	       ex.Message),
     () => Console.WriteLine("End of sequence.")
     )
 )
 {
     Console.WriteLine("Press enter again to unsubscribe from the service.");
     Console.ReadLine();
 }

Lastly add the GenerateFibonacci function that creates a finite Fibonacci sequence.

static List<ulong> GenerateFibonnaci(int sequenceLength)
{
     List<ulong> sequence = new List<ulong>();

     for (int i = 0; i < sequenceLength; i++)
     {
         if (i <= 1)
             sequence.Add((ulong)i);
         else
             sequence.Add(sequence[i - 2] + sequence[i - 1]);
     }

     return sequence;
 }

[Click on image for larger view.]
Figure 1.

The example above subscribes to our finite Fibonacci observable collection and defines an observer that outputs each digit on a new line. The observer also outputs any exception message to the console as well as an "End of sequence" notification when the collection has been fully observed. Note that when you run the example that the observable source is evaluated at the point that Subscribe is executed. Currently the subscription will execute on the same thread as it was called.

This behavior can be easily changed though by specifying that the subscription should execute using a specified IScheduler. For example to subscribe on the TaskPool, we can use the SubscribeOn extension method and pass it a value of Scheduler.TaskPool. Make sure that you have added a reference to System.Reactive.Concurrency. The updated code should look like this:

  using (var fibonnacciSub = fibonacciSource.SubscribeOn(Scheduler.TaskPool)
                .Subscribe(
                (x) => Console.WriteLine(x),
                (Exception ex) => Console.WriteLine("Error received from source: {0}.", ex.Message),
                () => Console.WriteLine("End of sequence.")
                )
            )
            {
                Console.WriteLine("Press enter to unsubscribe from the service.");
                Console.ReadLine();
            }

Now when the user presses the enter key, the subscription will be unsubscribed and disposed.

So far we have gone over how to observe a cold observable or rather an observable collection that is not always running. Rx can also be used to observe hot observable or events that are always being published regardless if anyone is observing them. A classic example of a hot observable is an input event such as a mouse movement or key press.

Observing an Event
Rx adds a FromEvent<TEventArgs> extension method to IObservable<T>, which as the name implies creates an implementation of IObservable for a given event. For example to observe mouse movements, you would use the FromEvent extension method with a TEventArgs type of MouseEventArgs and define how to subscribe and unsubscribe from the event.

var mouseMovements =
Observable.FromEvent<MouseEventArgs>(
  ev =>
  {
      this.MouseMove += (s, e) => ev(e);
  },
  ev =>
  {
      this.MouseMove -= (s, e) => ev(e);
  });

Now that we have an IObservable<MouseEventArgs> we can subscribe and react to the observable data source. For example if we wanted to display the user's current position using a label on a form, we could use LINQ to project the mouse position from the observable collection, like so:

var mousePoints = from m in mouseMovements select m.GetPosition(this);
which will create an IObservable<Point> that observes the user's mouse position.

Finally we can subscribe to observable mousePoints and display the mouse position on the form.

mousePoints.ObserveOnDispatcher().Subscribe((pos) => mousePos.Content = pos.ToString());

[Click on image for larger view.]
Figure 2.

As we are modifying the UI thread in our observer, we have to either observe or subscribe on the dispatcher thread to ensure that the label is only modified from the UI thread. The really nice thing about Rx is that this can be done with minimal code.

Conclusion
As you can see, Reactive Extensions makes it much easier to work with complex events and other push-based data sources. It really shines its ability to filter and project pushed data using LINQ.

Please stay tuned for the rest of the Rx series, where I will cover how to observe asynchronous methods and events, as well as how to compose and filter multiple observable collections. Both code examples are included in the code drop on the top of the page.

About the Author

Eric Vogel is a Senior Software Developer for Red Cedar Solutions Group in Okemos, Michigan. He is the president of the Greater Lansing User Group for .NET. Eric enjoys learning about software architecture and craftsmanship, and is always looking for ways to create more robust and testable applications. Contact him at [email protected].

comments powered by Disqus

Featured

  • Hands On: New VS Code Insiders Build Creates Web Page from Image in Seconds

    New Vision support with GitHub Copilot in the latest Visual Studio Code Insiders build takes a user-supplied mockup image and creates a web page from it in seconds, handling all the HTML and CSS.

  • Naive Bayes Regression Using C#

    Dr. James McCaffrey from Microsoft Research presents a complete end-to-end demonstration of the naive Bayes regression technique, where the goal is to predict a single numeric value. Compared to other machine learning regression techniques, naive Bayes regression is usually less accurate, but is simple, easy to implement and customize, works on both large and small datasets, is highly interpretable, and doesn't require tuning any hyperparameters.

  • VS Code Copilot Previews New GPT-4o AI Code Completion Model

    The 4o upgrade includes additional training on more than 275,000 high-quality public repositories in over 30 popular programming languages, said Microsoft-owned GitHub, which created the original "AI pair programmer" years ago.

  • Microsoft's Rust Embrace Continues with Azure SDK Beta

    "Rust's strong type system and ownership model help prevent common programming errors such as null pointer dereferencing and buffer overflows, leading to more secure and stable code."

  • Xcode IDE from Microsoft Archrival Apple Gets Copilot AI

    Just after expanding the reach of its Copilot AI coding assistant to the open-source Eclipse IDE, Microsoft showcased how it's going even further, providing details about a preview version for the Xcode IDE from archrival Apple.

Subscribe on YouTube

Upcoming Training Events