UI Code Expert

Essential Reactive Extensions with .NET 4.5 and LINQ

The Rx library utilizes the power of LINQ to make difficult asynchronous problems simple.

Processing collections of data is a very common requirement when developing software. Fortunately, the Microsoft .NET Framework 3.0 gave us LINQ and this improved manipulating such collections, even to the point of making it easy in many cases. With the .NET Framework 4 and 4.5, asynchronous programming was the focus -- especially given the new async pattern in 4.5. Unfortunately, the combination of both collections and asynchronous programming is still filled with significant complexity. It's this problem domain that the Reactive Extensions (Rx) library is designed to address.

Rx is a Domain-Specific Language (DSL) built on top of event-based asynchronous programming. It utilizes the power of LINQ to make difficult asynchronous problems simpler. Consider the following example: Imagine needing to implement search in an application with a simple textbox alongside a search button. If the UI thread performs the search, the users will be blocked waiting for the search to complete. Preferably, the search occurs asynchronously and the results pass back to the UI thread as each search result arrives.

Rx is an ideal framework for this scenario. By turning the search items into an observable, the UI is able to subscribe to the results as they appear, rather than waiting for all the results and returning them at once. For example, consider the code in Listing 1.

Listing 1. Convert an enumerable into an observable.
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
  // ...
  private IEnumerable<string> GetSearchItems()
  {
    // ...
    yield return searchItem;
  }
      
  private void BeginSearch()
  {
    IEnumerable<string> searchItems = GetSearchItems();
    IObservable<string> observable = searchItems
             .ToObservable( Scheduler.Default );
    observable.Subscribe( searchItem => { /*...*/ } );
  }
  // ...

To achieve the asynchronous search, searchItems.ToObservable( Scheduler.Default ) converts the search items into an observable, and the iteration of GetSearchItems (via deferred execution) is scheduled on a separate thread and triggered in the invocation of Subscribe. As GetSearchItems yields a result, Subscribe's delegate then handles the result -- the key being that the results are processed as they return individually, rather than waiting for the entire result set to be complete synchronously before iterating over the collection.

Now consider a UI improvement: Rather than relying on the button click to trigger the search, imagine if the search was automatically activated on each keystroke. As the user types in the search box, Rx starts a search against what's entered so far. If the user types too fast, it automatically throttles the keystrokes so as to only search when the user pauses -- even if only for an instant.

Rx simplifies difficult asynchronous logic by allowing developers to focus on high-level functionality rather than the asynchronous plumbing intricacies needed to make the functionality a reality. Algorithms for buffering data while simultaneously throwing away inconsequential events, for example, can be challenging. With Rx, however, the implementation may be as simple as a call to Buffer(TimeSpan.FromSeconds(/* ... */)).

In this article we'll demonstrate not only the possibilities that Rx enables, but how to achieve these possibilities within your own code.

Referencing Reactive Extensions via NuGet
The first step in using Rx is to add a reference to it. Unless you've previously installed it, this is done through the right-click context menu on the solution, by clicking "Manage NuGet Packages for Solution" and then doing an online search for Rx.

After Rx finishes installing, all that remains is association between "Reactive Extensions - Query Library" and whichever project from which you want to reference it.

Understanding IObservable
The key to understanding Rx is to comprehend the System.IObservable<T> interface, whose definition is shown here:

public interface IObservable<out T>
  {
    IDisposable Subscribe(IObserver<T> observer);
  }

This seemingly simple interface, with a single Subscribe method that takes a System.IObserver<T>, is the key to all of Rx. In summary, IObservable<T> represents a stream of data.

Furthermore, the IObserver<T> interface is just as simple:

public interface IObserver<in T>
{
  void OnNext(T value);

  void OnError(Exception error);

  void OnCompleted();
}

When the IObervable<T> Subscribe is called with an observer, a notification mechanism is established and triggered with each new item via IObserver<T>.OnNext. The result is that rather than iterating over a collection -- effectively "polling" the collection -- IObservable<T> establishes a publish pattern by which changes in the stream (such as a new item) are identified via notifications. Similarly, errors and completion events are published through the OnError and OnCompleted methods, respectively.

Observable States: Hot and Cold
Observables have two states: hot and cold. A hot observable represents a stream of data that sends out notifications regardless of whether there are any subscribers. Examples of a hot stream of data include MouseMove events: as long as a user moves the mouse, MouseMove events will be fired. Subscribers to an observable MouseMove event will receive notifications of such events starting with the next MouseMove event following subscription -- but events that occurred before the subscription was activated will not be received by the subscriber.

In contrast, a cold observable is a stream of data that always reports events from the beginning of the stream, regardless of when the subscription is activated. If you have a cold observable and you subscribe multiple times, you'll get multiple identical streams of data even if subscriptions occurred well after the first event occurred. The IEnumerable<T> interface provides an example of a cold stream of data. When you iterate over an IEnumerable<T> multiple times, you'll begin iterating from the first to the last item in the collection.

Reactive Extensions Events
As discussed in the introduction, one of the powerful patterns available through Rx is the combination of asynchronous event handling and LINQ all in one. Consider the StockTicker class:

public class StockTicker
{
  public event EventHandler<PriceChangedEventArgs> OnPriceChanged;
}

Also consider this subscription:

StockTicker stockTicker = 
  new StockTicker("MSFT", "APPL", "YHOO", "GOOG");
stockTicker.OnPriceChanged += (sender, args ) => 
{
  Console.WriteLine(
    "{0}: Price - {1}  Volume - {2}", 
    args.StockSymbol, args.Price, args.Volume);
};

This subscriber is a simple anonymous method that displays a particular stock's price changes to the console. However, changes in the stock price may be too frequent; the processing of each change event may result in the subscriber falling behind, even though new changes while processing the previous event are irrelevant or only needed for specific time intervals (rather than every event).

Acquiring price changes over an interval of time may make this information more understandable, but the work to do this manually could be painful. It would require storing a timestamp of each event and then amortizing the price across a particular time interval. Fortunately, Rx provides an alternative by leveraging the fact that events can serve as observables (see Listing 2).

Listing 2. Creating an observable from an event.
StockTicker stockTicker = 
  new StockTicker("MSFT", "APPL", "YHOO", "GOOG");

IObservable<PriceChangedEventArgs> priceChangedObservable = 
  Observable.FromEventPattern<PriceChangedEventArgs>(
    eventHandler => stockTicker.OnPriceChanged += eventHandler,
    eventHandler => stockTicker.OnPriceChanged -= eventHandler )
      .Select( eventPattern => eventPattern.EventArgs );
priceChangedObservable.Subscribe(args => Console.WriteLine( 
  "{0}: Price - {1}  Volume - {2}", 
    args.StockSymbol, args.Price, args.Volume ) );

Rx includes a set of extension methods on System.Reactive.Linq.Observable, including the signature FromEventPattern(Action<EventHandler>, Action<EventHandler>). Via this method, the code in Listing 2 captures the EventHandler typed event (OnPriceChanged) and establishes an observable-based subscription to PriceChangedEventsArgs. Leveraging the resulting IObservable<T>, all that remains is providing a delegate to handle the price changes via the Subscribe method.

In this instance, the Subscribe method is similar to the += operator that's used to attach to the event. However, Subscribe serves two purposes. First, it connects the action that will process the event data and, second, it will trigger the observable logic subscription. In this case, the Observable.FromEventPattern won't attach an EventHandler to the OnPriceChanged event until after Subscribe has been invoked. This means that it's possible to build up an observable query and not have to worry about it activating until Subscribe is called.

The result is powerful, because it enables using LINQ to filter and transform the result before processing it. For example, as shown in Listing 3, we can filter out all changes where the volume is negligible or is project data from a StockTicker stream (the symbol and price, for example).

Listing 3. Filtering price changes that don't have a minimum volume.
priceChangedObservable
  .Where(stock => stock.Volume >= minimumVolumeBoundary )
  .Subscribe(args => Console.WriteLine( 
    "{0}: Price - {1}  Volume - {2}", 
    args.StockSymbol, args.Price, args.Volume ) );
priceChangedObservable
  .Select(stock => new {stock.StockSymbol, stock.Price} )
  .Subscribe( args => Console.WriteLine( "{0}: Price - {1}",
    args.StockSymbol, args.Price ) );

Leveraging LINQ, Rx provides a fluent API that clearly identifies what is subscribed to and how the observed data is processed. In Listing 2, it's clear that we only want price changes over the threshold, and only the StockSymbol and Price in particular.

Filtering or projecting data streams aren't the only scenarios that Rx simplifies. In addition, System.Reactive.Linq.Observable includes a host of extension methods. One such method is Throttle, a method designed specifically to reduce a rapid fire of data down to a subset corresponding to a specific cadence:

priceChangedObservable
  .Where(stock => stock.Volume >= minimumVolumeBoundary )
  .Throttle( TimeSpan.FromSeconds(1) )
  .Select(stock => new {stock.StockSymbol, stock.Price}  )
  .Subscribe( args => Console.WriteLine( /* output */ ) );

Given the call to Throttle, the frequency of propagation to a Subscribe invocation will be throttled to no more than once a second. However, this may mean that you miss changes for one stock because a different stock is trading more accurately. The previous code snippet provides the same filter as Listing 3 but does so for each individual stock symbol, rather than across all symbols that the stock ticker is reporting. The following code throttles event notifications to once per second for each stock:

priceChangedObservable
  .Select( stock => new { stock.StockSymbol, stock.Price } )
  .GroupBy( stock => stock.StockSymbol )
  .Subscribe( group =>
    group.Throttle( TimeSpan.FromSeconds( 1 ) )
      .Subscribe( stock => Console.WriteLine( /* ... */ ) ) );

By using GroupBy, we're filtering this data into multiple observables. Each observable can have its own unique operations. Because of grouping, Throttling now acts independently on each stock symbol. Only stocks with identical StockSymbols will be filtered within the given throttle time span.

Throttling can be done based on the stream data itself (rather than just a time span). For example, in the following code, the DistinctUntilChanged method filters on a specified field or value:

priceChangedObservable
  .GroupBy( stock => stock.StockSymbol )
  .Subscribe( group =>
    group.DistinctUntilChanged( 
      stock => Math.Ceiling(stock.Volume/1000.0) ) 
      .Subscribe( stock => Console.WriteLine( /* ... */ ) ) );

In this case, DistinctUntilChanged ignores any data where the change in volume is less than 1,000. (This extension method can also take an IEqualityComparer<T> for more-complex comparisons.)

Using Reactive Extensions in the UI
Windows Presentation Foundation (WPF) can benefit significantly from Rx as well, especially when bindings are involved. The code in Listing 4 utilizes Rx to update a bound XAML object such as a list box (for instance, <ListBox ItemsSource="{Binding StockChanges}" DisplayMemberPath="Display" />). The ListBox will be bound to StockChanges.

Listing 4. The StockChanges property.
private ObservableCollection<StockChange> _StockChanges = 
  new ObservableCollection<StockChange>();

public ObservableCollection<StockChange> StockChanges
{
  get { return _StockChanges; }
  set { _StockChanges = value; }
} 

StockChanges is an ObservableCollection of StockChange:
public class StockChange
{
  public string StockSymbol { get; set; }
  public double Price { get; set; }
  public string Display
  {
    get { return string.Format( "{0,-10}   {1,-20:C}",
      StockSymbol, Price); }
  }
} 

StockChange is an entity that contains common properties that are useful for a stock ticker application.

Now that the entity and the ListBox binding are set up, the next step is to acquire stock price data. Utilizing the previous examples allows you to create an observable on OnPriceChanged for the StockTicker. The next step is to group this information by StockSymbol, ignore data that falls outside of a particular price change, project the data into the StockChange entity and then add the entity to StockChanges. A view model for StockChange would be perfect for handling this work (see Listing 5).

Listing 5. The StockChangeViewModel class.
public class StockViewModel
{
  public StockViewModel()
  {
    StockTicker = new StockTicker("MSFT", "APPL", "YHOO", "GOOG");
    GetPriceChanges(StockTicker)
      .GroupBy( stockArgs => stockArgs.StockSymbol )
      .Subscribe( groupedStocks => 
        groupedStocks.DistinctUntilChanged(stockArgs =>
          Math.Ceiling(stockArgs.Price/10.0))
        .ObserveOnDispatcher()
        .Subscribe( stockArgs => 
          StockChanges.Add( 
            new StockChange 
              {
                StockSymbol = stockArgs.StockSymbol,
                Price = stockArgs.Price
              } ) ) );
  }
 
  private IObservable<PriceChangedEventArgs> GetPriceChanges(StockTicker stockTicker)
  {
    return Observable.FromEventPattern<PriceChangedEventArgs>(
      eventHandler => stockTicker.OnPriceChanged += eventHandler,
      eventHandler => stockTicker.OnPriceChanged -= eventHandler)
        .Select(eventPattern => eventPattern.EventArgs);
  }
 
  private StockTicker StockTicker { get; set; }
 
  private ObservableCollection<StockChange> _StockChanges = 
    new ObservableCollection<StockChange>();
  public ObservableCollection<StockChange> StockChanges
  {
    get { return _StockChanges; }
    set { _StockChanges = value; }
  }
} 

After the new stockChange is added to StockChanges, the bound ListBox will receive a notification from the ObservableCollection (see Listing 4), and the ListBox will then update itself with the new items from StockChanges. In this example, we're utilizing the ObserveOnDispatcher, which is specific to the "Reactive Extensions - WPF Helpers" NuGet package. Any Observable extension method call after ObserveOnDispatcher will marshal each observed item onto the Dispatcher thread. This is important when new items are added to StockChanges because without it, there's no guarantee what thread you'll be in once Subscribe is called -- and WPF will throw an exception if a control is updated outside of the Dispatcher thread. To avoid this, use ObserveOnDispatcher.

Wrapping Up
Rx provides a significantly simpler fluent API for handling complex, asynchronous, event-based logic. In combination with LINQ and the deferred execution capabilities of LINQ, Rx provides a DSL for working with streaming data as though it were another collection. However, by leveraging the additional extension methods of Rx, the LINQ-based event processing is enhanced with additional throttling and transformation functions not native to the traditional LINQ APIs.

comments powered by Disqus

Featured

Subscribe on YouTube