Practical .NET

Writing Cleaner Code with Reactive Extensions

When you have a process that can return multiple results over time, then .NET Framework Reactive Extensions will let you simplify your code and manage it better.

With most methods you just call them and get back a single result. Some processes don't work this way: These processes run for long periods of time. It gets worse: you also have the methods that not only run for a long time, but sporadically return results while they run. Of course, this is why the Microsoft .NET Framework provides events: Events are a way for one object to run a method in a second object, allowing the first object to pass information to the second object when there's something to share.

But there is a better solution to this problem than events: Reactive Extensions. If you have a long-running process that sporadically returns results, then .NET Framework Reactive Extensions can let you catch those results whenever they occur. Not only is code using Reactive Extensions simpler than the code required by events, Reactive Extensions provide more functionality (you can use LINQ to filter out any data you don't want, for example).

The Reactive Extensions documentation describes it as a way of processing flows of data. When working with the Extensions it's not hard to imagine some process that iterates over a collection of data, periodically finding something of interest and notifying the application of what it has found (this allows the program to react, hence the "reactive" name). For my example, however, I'm going to assume that I want to perform some activity while my application makes a series of changes to a sales order. To support that, I could write a StatusChanged method and call it from every place in the application where a change is made to a sales order. Alternatively, I could add a StatusChanged event to my SalesOrder class and raise that event whenever the sales order has its status changed -- I could just wire up my code to that event.

The Reactive Extensions solution is, I think, not only simpler than either of these, but -- thanks to its integration with LINQ -- more flexible. Certainly, when it comes to connecting or disconnecting from a source of information, the Reactive Extensions solution is much simpler than the equivalent event-driven code.

Setting Up for Reactive Events
My first step is to add a Reactive Extensions package to my application using NuGet. There are several Reactive Extensions packages, including implementations for JavaScript, Android, and handling LINQ requests against OData-enabled Web Services (a reader from an earlier column has a great discussion of the Windows Forms UI package on CodeProject, which uses Reactive Extensions to tie methods to events). In NuGet, to get the package I want, I did a search on "Reactive Extensions" and added the Main Library package to my application when it turned up in the packages found list.

My second step is to decide what data I want to return when a change is made to a sales order. It makes sense to me to have my process return, along with the order's current status, the sales order Id. I package that data up in a class with the properties I need:

Public Class StatusChange
  Public Property OrderId As Integer
  Public Property OrderStatus As String
End Class

The next step is to declare a Reactive Extensions Subject that will work with items of this type. I declare the Subject outside of any method so that it can be accessed throughout my application:

Dim statChange As ISubject(Of StatusChange)

Then, at the start of my application I initialize statChange and call the Subject's Subscribe method to indicate I want to be notified when the Subject has one of its method's called. I pass the Subscribe method a lambda expression specifying what I want to do when the Subject notifies me of a change. In this case, I'll just display the OrderStatus property on my class:

statChange = New Subject(Of StatusChange)
statChange.Subscribe(Function(sc) MessageBox.Show(sc.OrderStatus))

Now, elsewhere in my application, whenever I change the order's status, I create a StatusChange class, set its properties and call the OnNext method of the Subject I created. Typical code to notify me when the SalseOrder is first created might look like this:

Dim so As New SalesOrder
statChange.OnNext(New StatusChange With {.OrderId = 1,
                                         .OrderStatus = "New"})

As each OnNext method is called, the MessageBox I specified in the Subscribe's lambda expression displays the StatusChange OrderStatus property.

Enhancing the Solution
In real life, of course, processing the status change may require more than a single line of code -- or more code than I want to put in a lambda expression, at any rate. Instead of passing a lambda expression to the Subscribe method, I can pass a method's address, like this:

statChange.Subscribe(AddressOf StatusChanged)

My method doesn't have to accept any parameters, but, if it accepts an object of the same type as my Subject was tied to, then the Subscribe method will pass that object to my method. A method like this will let me process each StatusChange object as it's retrieved by my Subject:

Public Sub StatusChanged(sc As StatusChange)
End Sub

If I want to do many different things when an order's status changes I don't have to package them all up into a single method. Instead, I can call the Subscribe method on the Subject multiple times, passing a different method each time:

statChange.Subscribe(AddressOf StatusChanged)
statChange.Subscribe(AddressOf StatusAudit)

With these changes I've loosely coupled the process that makes changes to the sales order status (the application) to the process that handles the status changes (my StatusChanged and StatusAudit methods). The only thing that links the processes is the definition of the StatusChange class, which I can extend with further properties without breaking either process. So far, this isn't much different than using events, except that less code is required.

But the Reactive Extensions technology not only simplifies my code, it lets me go beyond events. To begin with, I may not want to respond to every status change in the order. I may, for example, only want to deal with orders where the status is set to Processing. I can use LINQ to query results that I get through the Subject. Before trying this, though, I need to add an Imports statement for the System.Reactive.LINQ namespace to my application.

Once I add that namespace, however, I'll find that I can write LINQ statements or use LINQ extension methods to select which results I want to process. All three of these examples cause my method to be called only when the OrderStatus is set to "Processing":

statChange.Where(Function(c) c.OrderStatus = 
  "Processing").Subscribe(AddressOf ReportStatusChange)

Dim scs = From sc In statChange
          Where sc.OrderStatus = "Processing"
          Select sc
scs.Subscribe(AddressOf ReportStatusChange)
Dim sub = (From sc In statChange
           Where sc.OrderStatus = "Processing"
           Select sc).Subscribe(AddressOf ReportStatusChange)

In addition, I may want to have special processing take place when the order has a "status error" or when the order is completed. I could figure this out by analyzing the properties in my StatusChange object, but Reactive Extensions provide a better solution: the Subject's OnError and OnCompleted methods. When I call the Subscribe method, I can provide methods (or lambda expressions) to be called when the Subject's OnError or OnCompleted methods are called. In this code, I've changed the name of my methods to make their connection to the Subject's methods more obvious:

statChange.Subscribe(AddressOf OnNext, 
                     AddressOf OnError, 
                     AddressOf OnCompleted)

My OnError method should accept an Exception object while my OnCompleted method must accept nothing at all. Here are some typical examples:

Public Sub OnError([error] As Exception)
   ...exception processing...
End Sub

Public Sub OnCompleted()
   ...end of life processing for the order...
End Sub

Now, when something goes horribly wrong, my process can call the Subject's OnError method to notify my application of the problem. When the application calls the OnError method, it can pass an Exception object with information about the problem (in real life, I would do better than this example):

statChange.OnError(New Exception("Something has gone horribly wrong!"))

When my application is finished with the sales order, it should call the Subject's OnCompleted method. In addition to allowing the Subject to call some "end of life" processing, calling OnCompleted also notifies the Subject that it doesn't have to listen for any more messages (this, by the way, is something else events can't do: Disconnect from the client on the event side of the relationship). The Subject can disengage itself from listening by calling its Dispose method.

Encapsulating Reports
The problem is that I've still got multiple OnNext statements scattered throughout my program: presumably one in every place where I change the SalesOrder's status. I'd prefer to centralize that code, ideally inside the Status property of the SalesOrder object that I'm updating. This will also protect me from missing a spot in the application where I should have put a call to the OnNext method.

Listing 1 has a version of the SalesOrder class that declares a property as ISubject and initializes the property in the class's constructor with a Subject object. I now call the Subject's OnNext method in the class's Status property whenever the Status is changed (I could add some additional code to support calling the Subject's OnError and OnCompleted methods on the Subject).

Listing 1: A SalesOrder Class to Work with Reactive Extensions
Public Class SalesOrder
  Private _status As String
  Public Property statChange As ISubject(Of StatusChange)

  Public Sub New()
    statChange = New Subject(Of StatusChange)
  End Sub

  Public Property Id As Integer

  Public Property Status As String
      Return _status
    End Get
    Set(value As String)
      _status = value
      Dim sc As StatusChange
      sc = New StatusChange With {.OrderId = Me.Id,
                                  .OrderStatus = Me.Status}
    End Set
  End Property

End Class

In my application, I just have to call the Subscribe method on the SalesOrder's Subject property and pass addresses of the methods to execute (I've called my methods OnNext, OnError, OnCompleted):

Dim order As New SalesOrder()

order.statChange.Subscribe(AddressOf OnNext,
                           AddressOf OnError,
                           AddressOf OnCompleted)

I can also encapsulate my monitoring code into a class of its own and pass it to the Subscribe method. I just create a class that implements IObserver and move my OnNext, OnError and OnComplete code into its methods (Listing 2). This change implements the single responsibility principle: My original class is now just dedicated to processing the sales order and this class is dedicated to monitoring the sales order's progress.

Listing 2: A Class for Processing Reactive Reports
Public Class StatusMonitor
  Implements IObserver(Of StatusChange)

  Public Sub OnNext(value As StatusChange) Implements IObserver(Of StatusChange).OnNext
  End Sub

  Public Sub OnError([error] As Exception) Implements IObserver(Of StatusChange).OnError
  End Sub

  Public Sub OnCompleted() Implements IObserver(Of StatusChange).OnCompleted
  End Sub

End Class

My Subscribe method gets simpler because I just have to pass my monitoring class to the Subscribe method:

order.statChange.Subscribe(New StatusMonitor)

By leveraging Reactive Extensions I've got a simpler design, encapsulated my code, implemented the single responsibility principle and provided additional functionality through LINQ. But I'm not done -- next month, I'll turn this into asynchronous code, and show how to use Reactive Extensions with a wider variety of classes.

comments powered by Disqus
Upcoming Events

.NET Insight

Sign up for our newsletter.

I agree to this site's Privacy Policy.