Practical .NET

Create Simple, Reliable Asynchronous Apps with BlockingCollection

Dividing your application up into simple processes will make it easier to maintain and extend. Using BlockingCollection to communicate between those processes will let you make those processes run asynchronously.

In an earlier column, I discussed how asynchronous processing can simplify your applications. In fact, I claimed that "asynchronous" should now be your default choice for creating an application. However, creating a reliable application that runs asynchronous processes is only possible with the right structure and the right tools.

In this column I'm going to discuss how one structure (consumer/producer) and one tool (BlockingCollection) makes it possible for mere mortals like you and me to create reliable asynchronous applications.

A Simple Implementation
The simplest structure for implementing an asynchronous application is to have one process that generates data (the "producer") and another process that uses that data (the "consumer"). You might, for example, have some process that's retrieving data from a Web Service (the producer) and, as it retrieves the data, makes it available to be displayed in some other process (the consumer).

For this column, however, I'm going to flip that scenario and use a desktop application that gathers input about people in its UI. I want my UI to be as responsive as possible: The UI should never slow the user down because I'm processing the user's information. So, as the user clicks the update button in a row in the UI (the producer), I will pass the data from the row to a separate, asynchronous process that performs the updates (the consumer). This design gives me two separate, simpler processes that I can maintain and enhance independently. And, by taking the update processing out of the UI processing, I also give the user the more responsive UI I want: A UI where the user will never see a wait icon.

My first step in implementing this solution is to declare a BlockingCollection at the top of my class where it can be accessed by both processes. This collection will hold Person objects created by my UI process until those Person objects are processed by my separate UpdatePerson process. To support asynchronous processing I'll also declare a task that will handle running my UpdatePerson person asynchronously (and also, hopefully, on core other than my UI):

Public Class PHVApplication…
  Dim bcPeople As New BlockingCollection(Of Person)
  Dim updatePerson As Task

Somewhere near the start of my application, I'll create my Task, pointing it at the method I want to run asynchronously, and start the Task running (I might do this in my constructor or the form Load event):

updatePersonTask = New Task((Sub() UpdatePerson()))
updatePersonTask.Start()

Processing Objects: The Consumer and the Producer
Now, in my UpdatePerson method I create a loop based on the BlockingCollection's IsCompleted property. The IsCompleted property returns False as long as items are still being added to the collection or there is something in the collection to process. Initially, the BlockingCollection assumes you're going to add items to it, so IsCompleted returns false and I stay in the loop:

Private Sub UpdatePerson()
  Dim pers As Person
  Dim success As Boolean
  Do While Not bcPeople.IsCompleted

Within the loop, I use the BlockingCollections TryTake method to retrieve items from the collection. I pass the TryTake method a Person variable to be filled with the next object from the Collection. Because I've passed -1 as the second parameter to the TryTake method, TryTake will wait until a Person object shows up in the Collection before moving on to the next line of code (which is why this collection is called the BlockingCollection). When it finds a Person object, the TryTake method will return a True and move onto the next line of code. In that line I check to make sure that an object was retrieved and process it. When I finally exit the loop, I call the BlockingCollection's Dispose method to free up any resources the collection has claimed:

Do While Not bcPeople.IsCompleted
  success = bcPeople.TryTake(pers, -1)
  If success Then
    '...code to update Person information...
  End If
Loop
bcPeople.Dispose

In addition to retrieving an object from the collection, the TryTake method also removes the object from the BlockingCollection. This means I don't have to worry about accidentally processing that object again.

Now that I'm ready to process objects in my BlockingCollection, I need to start adding items to it. The code to add an object to a BlockingCollection looks very much like the code to add any object to any collection: retrieve some data from the screen, format it into a Person object and call the BlockingCollection's Add method, passing the object:

Dim pers As Person
pers = New Person
pers.FirstName = SomeGrid.CurrentRow("FirstName")
'...validate and set the rest of the properties on the Person object...
bcPeople.Add(pers)

There are now two extreme scenarios: The user doesn't do any updates, the UI code doesn't add any objects to the collection and my UpdatePerson method simply idles on the TryTake method waiting for something to do. The UI adds objects faster than UpdatePerson can process them and TryTake always find something to process when it cycles around inside its loop (though, hopefully, my UI will eventually slow down and allow UpdatePerson to catch up and clear the collection). Within those two extremes, the TryTake method sometimes finds items to process, sometimes doesn't and, in general, keeps up with the UI.

Finishing Processing
But, of course, eventually the user will stop working and want to exit the application. As part of exiting the application, I call the BlockingCollection's CompleteAdding method to signal there will be no more objects added to the collection.

I want to ensure my application stays in memory while the consumer process finishes processing. So, behind my UIs "Exit" button, after calling the CompleteAdding method, I call the Task's Wait method to idle my application until the Task containing my consumer finishes:

bcPeople.CompleteAdding()
updatePersonTask.Wait
Me.Close()

There are two possible scenarios for finishing the Task. The first scenario is that my consumer has kept up with my producer: There are either no Person objects left to process or the consumer is processing the last Person object added to the collection. If there are no objects, calling CompleteAdding will cause TryTake to stop blocking, move on to next line of code, and return False.

However, in this scenario, there will be no Person object to process (if you've been wondering why I checked the return value of the TryTake in UpdatePerson method, this is the reason: I'm checking if TryTake was terminated because CompleteAdding was called). Calling CompleteAdding with the collection emptied of objects will also cause the BlockingCollection's IsComplete property to return True, terminating my loop and allowing UpdatePerson (and the Task it's part of) to complete.

The other scenario is that there are still Person objects waiting in the collection to be processed because the consumer has fallen behind the producer. However, the BlockingCollection's IsComplete property will continue to return True until the Collection is empty, so I'll stay in the loop until my TryTake has retrieved all of the Person objects. Only when TryTake removes the last Person object from the BlockingCollection will IsComplete return True and my loop (and my Task) will terminate.

Handling Errors
But things can go wrong in my update process and I need some way to communicate that to my UI process. Here, again, a blocking collection can provide a simple, reliable way to loosely couple UpdatePerson (which will now be a producer of Exception objects) and a consumer process that will inform the user about the problem.

My first step is to add a new BlockingCollection as a field at the top of my class, this time to hold exceptions generated in UpdatePerson:

Dim bcErrors As New BlockingCollection(Of Exception)

My second step, in UpdatePerson, is to add an Exception to this BlockingCollection when something goes wrong. I also store the Id of the Person object with the error in the Exception's Data collection:

If success Then
  Try
    '...code to update Person information...
  Catch ex As Exception
    ex.Data.Add("PersonId", pers.Id)
    bcErrors.Add(ex)
  End Try
End If

Now, in my UI, I need to regularly check for any exceptions in my Exception collection: A Timer control (which automatically runs asynchronously) will do that for me. I add a Timer control and set its Interval to 1000 so that it will check my Exception collection every second. I can also call this method from any other place in my UI where I want to check for errors.

However, I don't want to use the blocking version of TryTake here because, if there's nothing in the collection to process, my UI component has something else to do: Gather input from the user. Instead, I want to see if there is an exception in the BlockingCollection and, if there isn't, move on to doing something else.

In this scenario, therefore, I pass a value as the second parameter to TryTake that specifies that TryTake is to spend 100 ms looking for an Exception object in the BlockingCollection. If I do find an exception, I display the message, and find the relevant item in the grid using the Person Id I added to the Exception's Data collection.

In this example, I've simplified the code that uses TryTake. Because I have no use for the return value from TryTake other than to decide if I found an exception, there's no real need to save that return value in a Boolean variable. So, in this example, I've just used TryTake inline with my If statement:

Private Sub Timer1_Tick(sender As Object, e As EventArgs) Handles Timer1.Tick
  Dim ex As Exception

  If bcErrors.TryTake(ex, 100) Then
    '...process errors...
  End If
End Sub

Extending the Application
Essentially, I now have three loosely coupled processes (UI, update, error handling) that are connected through BlockingCollections. Of course, real applications are more complicated than this and a typical application will do many things with each Person object. Rather than making either of my existing processes more complicated, my preferred application architecture is to set up another asynchronous consumer within my application to handle each processing task.

Because each consumer automatically removes Person objects from the BlockingCollection as part of processing the Person object, I need to create a dedicated BlockingCollection for each consumer (otherwise the first consumer to process the Person object would prevent any other consumer from seeing that Person object).

I have two design options here. My producer could selectively add Person objects to the specific BlockingCollections required for each kind of processing (centralizing the logic for invoking consumers in the producer). Alternatively, my producer could add the Person object to all the BlockingCollections (simplifying the producer) and let each consumer figure out if it actually needs to do anything (unifying consumer processing). Your choice as to which you prefer but, either way, the producer doesn't have to know anything about the consumers.

While it doesn't make sense for my case study, you can also extend the application by adding additional producers, all of which add to the same BlockingCollection. This allows a single consumer to process inputs from multiple sources without having to be aware of all the producers.

Even ignoring the additional functionality a real application would require, this app is probably still a little too simple.

Next time, I'll leverage some of the other tools on BlockingCollection to create a standard consumer and producer that are more like what you'll need in the real world.

comments powered by Disqus
Upcoming Events

.NET Insight

Sign up for our newsletter.

I agree to this site's Privacy Policy.