Practical .NET

Leveraging Reactive Extensions for Asynchronous Processing

Reactive Extensions will let you catch interim results from a long running process. Coupled with the Microsoft .NET Framework 4.5 support for asynchronous processing, you don't even have to lock up your client while you process those results.

With most methods, you just want to call the method to get back a single result. Sometimes, however, you need to call a method that returns multiple results. And, sometimes, you'd like to process those results as the method produces them, rather than wait and get all the results at the end. In one of last month's columns, I showed how Microsoft Reactive Extensions could be used to solve that problem by catching notifications about a series of changes made by a process that performed operations on a SalesOrder object.

But one of the underlying assumptions of my example is that SalesOrder processing is a long-running operation: It must, after all, take more than a few milliseconds for the SalesOrder to run through all of its changes (otherwise you'd just wait until the process was done and get the results at the end). The issue with a long-running process is that you don't want to lock up your client while the process is running: You want to run the process asynchronously. That means you'll also want to deal with the results sent by the process asynchronously -- if you deal with the results synchronously, you'll end up locking up the client, just in a different way. Fortunately, Reactive Extensions makes it relatively easy to catch these results and process them without locking up your client's UI.

Quick Review: Setting up the Object and the Client
If you read (and remember) last month's column, you can skip this section. If not, here's a brief review of how my SalesOrder object reports changes to its status while the process is operating on it.

My SalesOrder objects reports its current status through a property called statChange of type ISubject. My statChange property is tied to a class I called StatusChange that I use to hold the information about the current state of the SalesOrder (in my example, that's just the order's id and current status). In my SalesOrder object's constructor, I load the statChange property with a Reactive Extensions Subject object (which is also tied to my StatusChange class). Putting that all together, the start of my SalesOrder class looks like this:

Public Class SalesOrder

  Public Property statChange As ISubject(Of StatusChange)

  Public Sub New()
    statChange = New Subject(Of StatusChange)
  End Sub

When the SalesOrder's Status property is updated, code in its Status property sends notifications through the SalesOrder's statChange property by calling the Subject's OnNext method, passing a StatusChange class that holds the current information. The property's notification-related code looks like this:

Dim _status As String
Public Property Status As String
  Set(value As String)
    Dim sc As New StatusChange With {.OrderId = Me.Id,
                                     .OrderStatus = Me.Status}

To catch the notifications in my client when I instantiate my SalesOrder object, I create a client-side Subject (also tied to the StatusChange class):

Public Class SalesOrderProcessingForm
  Dim statChange As ISubject(Of StatusChange)
  Dim order As New SalesOrder()

  Private Sub Form1_Load(…
    order = New SalesOrder() With {.Id = 1}
    statChange = New Subject(Of StatusChange)

Processing First and Last
First things first: Let's have my client run the sales order processing asynchronously. The sales order processing is started by a method called ProcessOrder that accepts the sales order to process. To have ProcessOrder run asynchronously, I put this code in the Click event of the Button that the user clicks to start processing:

Task.Factory.StartNew(Sub() ProcessOrder(order))

ProcessOrder will now run in parallel to my client's UI with the Order object firing notifications as its status changes and my client catches those notifications.

Because I'm executing ProcessOrder asynchronously, I also want to catch and process the SalesOrder notifications asynchronously. Reactive Extensions provides several LINQ (or LINQ-like) extension methods that attach themselves to the Subject object (I took advantage of LINQ in last month's column to control which notifications I would receive, for example). These extension methods include eight methods that allow you to handle the notifications from your process asynchronously. For this example, I'll assume the only processing I want to do is to update the client's UI with the current status of the process, but, of course, I could do any processing I wanted on any notification.

To begin with, I'll assume the client only needs to do some "start of life processing" when the SalesOrder gets its first status change: I want to catch the notification sent the first time the SalesOrder object calls the OnNext method on its Subject. To do that, in my client, I use the Subject object's FirstAsync method on the statChange property. The FirstAsync won't complete until that first status shows up so I use the Await keyword with the statement so that, after I call FirstAsync, control will return to whatever code called this method. Because I've put this code in a Button's Click event, that means control will return to the Form itself, freeing up the UI for the user.

When my FirstAsync method does finally complete (when SalesOrder first calls the OnNext method on its Subject), my method will advance to its next line of code and that's where I'll do my processing. To support using the Await statement, I have to declare the method holding the code as Async (for more on Async and Await see my June column on simple asynchronous processing and its follow-up on managing asynchronous tasks).

The following code shows a Button Click method that takes care of all that -- notice I don't have to use the Subscribe method (described in last month's column) to catch notifications because FirstAsync incorporates that functionality:

Private Async Sub ProcessOrderButton_Click(...
  Dim order As SalesOrder
  order = New SalesOrder() With {.Id = 1}
  Dim tsk As New Task(Sub() ProcessOrderAsync(order))
  Dim finalState As StatusChange
  finalState = Await order.statChange.FirstAsync()
  Me.StatusLabel.Text = finalState.OrderStatus

Now, when the user clicks the button that starts processing, control returns to the UI at the FirstAsync line, thanks to the Await keyword, which unlocks the UI. When the SalesOrder reports its first state, the code following the FirstAsync line will finally execute and update the UI with the SalesOrder's status.

Presumably, I wouldn't have to wait very long for the first status notification, so asynchronous reporting for that notification might not be essential -- the UI wouldn't be locked for very long if I just waited for the result. However, if I wanted to do some "end of life processing" when the order's final status is reached, asynchronous notification would be essential to avoid locking up the UI during the whole process. As you might expect, Reactive Extensions has a LastAsync method that returns the last notification the SalesOrder provides.

However, there's a wrinkle: The SalesOrder must call the OnComplete method on its Subject before LastAsyc will return its result (though the notification sent is for the last status set through the OnNext method). Because my SalesOrder object sends notifications from its Status property, I just have to enhance the Status property's setter to ensure the OnCompleted method is called when SalesOrder processing is completed:

Set(value As String)
  Dim sc As New StatusChange With {.OrderId = Me.Id,
                                   .OrderStatus = Me.Status}
  If value = "Completed" Then
  End If
End Set

If I wanted both "start of life" and "end of life" processing, I could issue two subscriptions, calling LastAsync after FirstAsync. However, Reactive Extensions provides better tools for handling multiple notifications.

Processing In-Between
In fact, I suspect that "handling multiple notifications" is the more typical case: The client needs to do some processing on each notification (rather than just the first and the last). Reactive Extensions provides a number of ways to handle this, but I'm going to look at two: ForEachAsync and RunAsync.

The ForEachAsync extension method accepts a lambda expression that's executed on each notification. I can put all the processing code in the lambda expression or call the method that will do the processing from the lambda expression. Once again, in this example, I'm just updating the client UI, so I put my single line of code in the lambda expression (and, because I'm crossing threads here to get to my UI thread, I use the Form's Invoke method to update my Label):

Await order.statChange.ForEachAsync(Sub(sc) Invoke(Sub() StatusLabel.Text = sc.OrderStatus)

ForEachAsync accepts a cancellation token, which gives you the ability to allow users to stop processing. The first step in supporting cancellation is to declare a CancellationTokenSource outside of any of the client's methods or properties:

Dim cancelSource As New CancellationTokenSource

The next step is to retreive a CancellationToken from that source and pass it as the second parameter to the ForEachAsync method. When the user cancels processing, ForEachAsync will throw an exception so you need to catch that, check to see if the exception is caused by a cancellation and deal with the exception when it's not a cancellation. The resulting block of code looks like this:

Dim cancel As CancellationToken
cancel = cancelSource.Token
  Await order.statChange.ForEachAsync(Sub(sc) Invoke(Sub() Me.StatusLabel.Text = sc.OrderStatus), 
Catch ex As Exception
  If Not cancel.IsCancellationRequested Then
    Throw New Exception("Processing failed: " & ex.Message)
  End If
End Try

Now, in the Click event of some other button, you can allow the user to terminate processing with code like this:


If, instead of using ForEachAsync, you use the RunAsync method, you get to use the Subscribe method to catch notifications as I described in last month's column. After setting up your subscription, just call the RunAsync method from your Subject using the Await keyword to return control to the calling code. Like ForEachAsync, RunAsync accepts a cancellation token, however, in this example, I've chosen to pass Nothing instead of a token:

order.statChange.Subscribe(Sub(sc) Invoke(Sub() StatusLabel.Text = sc.OrderStatus))
Await order.statChange.RunAsync(Nothing)

The code in the Subscribe's lambda expression will execute as each notification is received without locking up your UI.

If you're trying to select between ForEachAsync and Subscribe/RunAsync, it seems to me that ForEachAsync is less "mysterious": With ForEachAsync the code that will be used to process each notification is right there in the ForEachAsync (even if all that's in the ForEachAsync is a call to some other method).

With Subscribe/RunAsync, however, the subscription (the call to Subscribe) and the execution (the call to RunAsync) could be widely separated in the code, making it hard for the next programmer to see the relationship between them. Having said that, using Subscribe/RunAsync also lets you put multiple subscriptions on a single Subject. This example takes advantage of that feature to perform two activities on each notification:

order.statChange.Subscribe(Sub(sc) Invoke(Sub() StatusLabel.Text = sc.OrderStatus))
order.statChange.Subscribe(Sub(sc) UpdateLog(sc))
Await order.statChange.RunAsync(Nothing)

As I showed in my previous column, compared to events, Reactive Extensions provides a more flexible and easier-to-use mechanism to repeatedly receive information from a long-running process. And, as this column has shown, if you want to call the process asynchronously, thanks to the various *Async methods in Reactive Extensions, it's also easy to process those results asynchronously. May you never lock up your UI again.

comments powered by Disqus
Upcoming Events

.NET Insight

Sign up for our newsletter.

I agree to this site's Privacy Policy.