Practical .NET

Create Sophisticated Asynchronous Applications with BlockingCollection

The basic functionality of the BlockingCollection makes creating asynchronous applications easy to do. But you need to use some of the BlockingCollection's other tools to create applications that handle typical real-world problems.

In a previous column, I showed how to create an asynchronous application using the BlockingCollection object (in an even earlier column, I claimed that the benefits of building applications this way means that creating an asynchronous application should be your default choice for any application).

In this column, I'm going to revisit that application and leverage some of the methods on BlockingCollection to provide a more sophisticated application. By the end of this application, I'll have created a standard consumer and producer structure that you can use to create asynchronous applications.

A quick review of the two asynchronous processes in my case study application:

  • A UI (the "producer") that accepts input from the user and uses that information to add Person objects to a BlockingCollection
  • An update method (the "consumer") that waits until a Person object is added to the BlockingCollection, then processes it, removes it from the collection and waits for the next Person object to be added

Setting Limits
In the real world, with the consumer/producer architecture I'm proposing, there's always the possibility that the producer will add objects faster than the consumer can process and remove them.

One way to handle that is to stop the producer until the consumer has had a chance to catch up by setting an upper limit on the number of objects allowed in the BlockingCollection. When you create a BlockingCollection you have the option of passing an upper limit on the number of objects in the collection. This code specifies that the BlockingCollection shouldn't hold more than 10 objects:

Dim bcPeople As New BlockingCollection(Of Person)(10)

Which raises the question of what happens when a BlockingCollection reaches that limit. If you're using the Add method then the method will just idle on the Add method until room opens up on the BlockingCollection. If your producer is in full control of how data is generated/received that may be all you need.

If, however, you want to signal to some other process that you can't accept any more input, you should use TryAdd, which returns False if it's unable to add to the BlockingCollection and moves on to the next line of code -- this enables the producer to do something else. In my case, my producer is a UI so I want to display a message and then disable part of the UI until the consumer has cleared some space on the collection. I could write code like this to do the job:

If Not bcPeople.TryAdd(pers) Then
  Me.Status.Text = "Please wait..."
  Me.SomeGrid.Enabled = False
  Do Until bcPeople.Count < bcPeople.BoundedCapacity
  Loop
  Me.SomeGrid.Enabled = True
  Me.Status.Text = String.Empty
End If

An alternative is to have multiple BlockingCollections (a primary collection plus a secondary collection that's used when the primary one is full). The BlockingCollection's static AddToAny method accepts a collection of BlockingCollections and writes to the first collection that has space (or writes to none if all of the collections are full). There's an equivalent TakeFromAny that accepts a collection of BlockingCollections and reads from the first collection it finds with an object.

Processing with For…Each
You can use a For…Each loop with a BlockingCollection to process all the items in the collection. In fact, you can use many of the extension methods associated with .NET Framework collections with a BlockingCollection. However, to have the items removed from the collection as you process them, you want to use that loop with the BlockingCollection's GetConsumingEnumerable method.

A For…Each loop can be useful if, as is typically the case, you want a different process to handle any "leftover" objects in the collection after CompleteAdding is called. In my case study in my previous column I made the user wait until all the leftover objects were processed before leaving the application after the user terminates processing. In this scenario, the decent thing to do would be to provide the user with a progress bar that shows how many objects were left to process before the application finally exits.

Two changes are required to my consumer process to implement this. First, instead of checking IsCompleted to see if I should exit my loop, I check IsAddingCompleted (IsCompleted returns True only after CompleteAdding is called and the collection is empty; IsAddingCompleted returns True as soon as CompleteAdding is called). The second change is to add another loop at the end of my consumer that uses GetConsumingEnumerable to process (and remove) whatever objects are left in the collection. I'll show that code at the end of the next section.

Canceling Processing
In my earlier column, I allowed for an orderly termination of processing by stopping the producer and letting the consumer finish processing any remaining items on the queue. However, you may want to provide the option of canceling all processing -- including skipping any objects still in the collection. The easiest way to do that is to pass a cancellation token to whichever of the Take, TryTake, Add or TryAdd methods you're using.

My first step here is to declare a CancellationSource to manage my CancellationToken:

Dim cncl As New CancellationTokenSource

Here are examples of a CancellationToken from that source being passed to various methods (for the TryAdd and TryTake examples, I've used -1 as the timeout period to cause the methods to wait forever):

bcPeople.Add(pers, cncl.Token)
If Not bcPeople.TryAdd(pers, -1, cncl.Token) Then

pers = bcPeople.Take(cncl.Token)
If bcPeople.TryTake(pers, -1, cncl.Token) Then

To cancel these operations, you just call the Cancel method on the ConcellationSource, like this:

cncl.Cancel()

Calling the Cancel method will cause the methods that were passed the Token to finish and to throw an OperationCanceledException. Judicious placement of your Try…Catch blocks, coupled with that Exception, allows you to easily skip all further processing.

Putting cancellation together with a For…Each loop for processing leftovers, gives a basic consumer structure that looks like the code in Listing 1. The Try…Catch block that catches my OperationCanceledException encloses all of my processing so that raising the Exception skips everything.

Listing 1: A Standard BlockingCollection Consumer
Dim pers As Person
Try
  Do While Not bcPeople.IsAddingCompleted
    If bcPeople.TryTake(pers, -1, cncl.Token) Then
      Try
        '...update Person object...
      Catch ex As Exception
        '...handle update errors...
        ex.Data.Add("PersonId", pers.Id)
        bcErrors.Add(ex)
      End Try
    End If
  Loop
  For Each p In bcPeople.GetConsumingEnumerable
      Try
        '...update p object with notification...
      Catch ex As Exception
        '...handle update errors...
        ex.Data.Add("PersonId", p.Id)
        bcErrors.Add(ex)
      End Try
  Next
  Catch ocx As OperationCanceledException
    '...handle cancellation (probably do nothing)...
  Catch ex As Exception
    '...handle all other errors
End Try
bcPeople.Dispose()
Listing 2 shows the typical structure for a producer.
Listing 2: A Standard BlockingCollection Producer
Try
  If Not bcPeople.TryAdd(pers, -1, cncl.Token) Then
    '...stop inputs
    Do Until bcPeople.Count < bcPeople.BoundedCapacity

    Loop
    '...resume inputs
  End If
  Catch ocx As OperationCanceledException
    '...handle cancellation (probably do nothing)...
  Catch ex As Exception
    '...handle other errors...
End Try

One final note: If you don't like the way the default BlockingCollection works, you can change it because the BlockingCollection is pluggable -- you can create your own object that implements the IProducerConsumerCollection interface and pass it to the BlockingCollection when you create it. If you're interested check out the MSDN Library article, "IProducerConsumerCollection<T> Interface" with a sample implementation. Really, only two of the required methods (TryAdd and TryTake) are challenging.

I have to ask: What's not to like?

About the Author

Peter Vogel is a system architect and principal in PH&V Information Services. PH&V provides full-stack consulting from UX design through object modeling to database design. Peter tweets about his VSM columns with the hashtag #vogelarticles. His blog posts on user experience design can be found at http://blog.learningtree.com/tag/ui/.

comments powered by Disqus

Featured

  • IDE Irony: Coding Errors Cause 'Critical' Vulnerability in Visual Studio

    In a larger-than-normal Patch Tuesday, Microsoft warned of a "critical" vulnerability in Visual Studio that should be fixed immediately if automatic patching isn't enabled, ironically caused by coding errors.

  • Building Blazor Applications

    A trio of Blazor experts will conduct a full-day workshop for devs to learn everything about the tech a a March developer conference in Las Vegas keynoted by Microsoft execs and featuring many Microsoft devs.

  • Gradient Boosting Regression Using C#

    Dr. James McCaffrey from Microsoft Research presents a complete end-to-end demonstration of the gradient boosting regression technique, where the goal is to predict a single numeric value. Compared to existing library implementations of gradient boosting regression, a from-scratch implementation allows much easier customization and integration with other .NET systems.

  • Microsoft Execs to Tackle AI and Cloud in Dev Conference Keynotes

    AI unsurprisingly is all over keynotes that Microsoft execs will helm to kick off the Visual Studio Live! developer conference in Las Vegas, March 10-14, which the company described as "a must-attend event."

  • Copilot Agentic AI Dev Environment Opens Up to All

    Microsoft removed waitlist restrictions for some of its most advanced GenAI tech, Copilot Workspace, recently made available as a technical preview.

Subscribe on YouTube