Practical .NET
Create Sophisticated Asynchronous Applications with BlockingCollection
The basic functionality of the BlockingCollection makes creating asynchronous applications easy to do. But you need to use some of the BlockingCollection's other tools to create applications that handle typical real-world problems.
In a previous column, I showed how to create an asynchronous application using the BlockingCollection object (in an even earlier column, I claimed that the benefits of building applications this way means that creating an asynchronous application should be your default choice for any application).
In this column, I'm going to revisit that application and leverage some of the methods on BlockingCollection to provide a more sophisticated application. By the end of this application, I'll have created a standard consumer and producer structure that you can use to create asynchronous applications.
A quick review of the two asynchronous processes in my case study application:
- A UI (the "producer") that accepts input from the user and uses that information to add Person objects to a BlockingCollection
- An update method (the "consumer") that waits until a Person object is added to the BlockingCollection, then processes it, removes it from the collection and waits for the next Person object to be added
Setting Limits
In the real world, with the consumer/producer architecture I'm proposing, there's always the possibility that the producer will add objects faster than the consumer can process and remove them.
One way to handle that is to stop the producer until the consumer has had a chance to catch up by setting an upper limit on the number of objects allowed in the BlockingCollection. When you create a BlockingCollection you have the option of passing an upper limit on the number of objects in the collection. This code specifies that the BlockingCollection shouldn't hold more than 10 objects:
Dim bcPeople As New BlockingCollection(Of Person)(10)
Which raises the question of what happens when a BlockingCollection reaches that limit. If you're using the Add method then the method will just idle on the Add method until room opens up on the BlockingCollection. If your producer is in full control of how data is generated/received that may be all you need.
If, however, you want to signal to some other process that you can't accept any more input, you should use TryAdd, which returns False if it's unable to add to the BlockingCollection and moves on to the next line of code -- this enables the producer to do something else. In my case, my producer is a UI so I want to display a message and then disable part of the UI until the consumer has cleared some space on the collection. I could write code like this to do the job:
If Not bcPeople.TryAdd(pers) Then
Me.Status.Text = "Please wait..."
Me.SomeGrid.Enabled = False
Do Until bcPeople.Count < bcPeople.BoundedCapacity
Loop
Me.SomeGrid.Enabled = True
Me.Status.Text = String.Empty
End If
An alternative is to have multiple BlockingCollections (a primary collection plus a secondary collection that's used when the primary one is full). The BlockingCollection's static AddToAny method accepts a collection of BlockingCollections and writes to the first collection that has space (or writes to none if all of the collections are full). There's an equivalent TakeFromAny that accepts a collection of BlockingCollections and reads from the first collection it finds with an object.
Processing with For…Each
You can use a For…Each loop with a BlockingCollection to process all the items in the collection. In fact, you can use many of the extension methods associated with .NET Framework collections with a BlockingCollection. However, to have the items removed from the collection as you process them, you want to use that loop with the BlockingCollection's GetConsumingEnumerable method.
A For…Each loop can be useful if, as is typically the case, you want a different process to handle any "leftover" objects in the collection after CompleteAdding is called. In my case study in my previous column I made the user wait until all the leftover objects were processed before leaving the application after the user terminates processing. In this scenario, the decent thing to do would be to provide the user with a progress bar that shows how many objects were left to process before the application finally exits.
Two changes are required to my consumer process to implement this. First, instead of checking IsCompleted to see if I should exit my loop, I check IsAddingCompleted (IsCompleted returns True only after CompleteAdding is called and the collection is empty; IsAddingCompleted returns True as soon as CompleteAdding is called). The second change is to add another loop at the end of my consumer that uses GetConsumingEnumerable to process (and remove) whatever objects are left in the collection. I'll show that code at the end of the next section.
Canceling Processing
In my earlier column, I allowed for an orderly termination of processing by stopping the producer and letting the consumer finish processing any remaining items on the queue. However, you may want to provide the option of canceling all processing -- including skipping any objects still in the collection. The easiest way to do that is to pass a cancellation token to whichever of the Take, TryTake, Add or TryAdd methods you're using.
My first step here is to declare a CancellationSource to manage my CancellationToken:
Dim cncl As New CancellationTokenSource
Here are examples of a CancellationToken from that source being passed to various methods (for the TryAdd and TryTake examples, I've used -1 as the timeout period to cause the methods to wait forever):
bcPeople.Add(pers, cncl.Token)
If Not bcPeople.TryAdd(pers, -1, cncl.Token) Then
pers = bcPeople.Take(cncl.Token)
If bcPeople.TryTake(pers, -1, cncl.Token) Then
To cancel these operations, you just call the Cancel method on the ConcellationSource, like this:
cncl.Cancel()
Calling the Cancel method will cause the methods that were passed the Token to finish and to throw an OperationCanceledException. Judicious placement of your Try…Catch blocks, coupled with that Exception, allows you to easily skip all further processing.
Putting cancellation together with a For…Each loop for processing leftovers, gives a basic consumer structure that looks like the code in Listing 1. The Try…Catch block that catches my OperationCanceledException encloses all of my processing so that raising the Exception skips everything.
Listing 1: A Standard BlockingCollection Consumer
Dim pers As Person
Try
Do While Not bcPeople.IsAddingCompleted
If bcPeople.TryTake(pers, -1, cncl.Token) Then
Try
'...update Person object...
Catch ex As Exception
'...handle update errors...
ex.Data.Add("PersonId", pers.Id)
bcErrors.Add(ex)
End Try
End If
Loop
For Each p In bcPeople.GetConsumingEnumerable
Try
'...update p object with notification...
Catch ex As Exception
'...handle update errors...
ex.Data.Add("PersonId", p.Id)
bcErrors.Add(ex)
End Try
Next
Catch ocx As OperationCanceledException
'...handle cancellation (probably do nothing)...
Catch ex As Exception
'...handle all other errors
End Try
bcPeople.Dispose()
Listing 2 shows the typical structure for a producer.
Listing 2: A Standard BlockingCollection Producer
Try
If Not bcPeople.TryAdd(pers, -1, cncl.Token) Then
'...stop inputs
Do Until bcPeople.Count < bcPeople.BoundedCapacity
Loop
'...resume inputs
End If
Catch ocx As OperationCanceledException
'...handle cancellation (probably do nothing)...
Catch ex As Exception
'...handle other errors...
End Try
One final note: If you don't like the way the default BlockingCollection works, you can change it because the BlockingCollection is pluggable -- you can create your own object that implements the IProducerConsumerCollection interface and pass it to the BlockingCollection when you create it. If you're interested check out the MSDN Library article, "IProducerConsumerCollection<T> Interface" with a sample implementation. Really, only two of the required methods (TryAdd and TryTake) are challenging.
I have to ask: What's not to like?
About the Author
Peter Vogel is a system architect and principal in PH&V Information Services. PH&V provides full-stack consulting from UX design through object modeling to database design. Peter tweets about his VSM columns with the hashtag #vogelarticles. His blog posts on user experience design can be found at http://blog.learningtree.com/tag/ui/.