C# Corner

A .NET 4.5 Parallel Dataflow Primer

The Task Parallel Dataflow (TDF) library helps developers tackle complex parallel use cases.

The Task Parallel Dataflow (TDF) library is built upon the existing Task Parallel Library (TPL) included in the .NET 4.0 Framework. Although the TPL provides a lot of functionality to help parallelize an application, it doesn't make it overly easy to tackle complex parallel use cases such as consumer/provider and agent-based models.

The TDF, on the other hand, provides higher-level abstractions in the form of  generic code blocks that generate and schedule the needed Task objects to handle simple to complex data flows.

The TDF is still in a preview state; it can be installed in Visual Studio 2012 RC through NuGet, as shown in Figure 1.


[Click on image for larger view.]
Figure 1. Installing TPL Dataflow NuGet package.

Once the package is installed, you should see a reference to System.Threading.Tasks.Dataflow in your project, as seen in Figure 2.

In the TDF, all blocks implement the common IDataflowBlock interface. The IDataflowBlock interface contains  Complete and Fault methods, and a Completion property getter. The Complete method is called upon successful completion of the dataflow block's task. The Fault method is invoked if an exception occurs and the Completion property returns the Task that will be executed asynchronously by the dataflow block.

All the dataflow blocks also implement either the ISourceBlock<T> or ITargetBlock<T> interface, or both. The ISourceBlock<T> interface defines a contract for offering data, whereas the ITargetBlock<T> interface defines an interface for receiving data.


[Click on image for larger view.]
Figure 2. The Soution Explorer, showing the new Dataflow reference.

The ActionBlock
I'll begin with the ActionBlock, which implements the ITargetBlock<T> interface. In turn, the ISourceBlock and ITargetBlock interfaces both inherit the base IDataflowBlock interface. Later I'll go over the TransformBlock  that implements both the ISourceBlock and ITargetBlock interfaces.

The ActionBlock calls a given Action<T> for each item it receives. Items may be posted to an ActionBlock either through its Post or SendAsync methods. To get a feel for how the ActionBlock works, I’ll create an ActionBlock that calculates the square of an integer and appends it to a string builder . Start out by creating a StringBuilder.

var squareStringBuilder = new System.Text.StringBuilder();

Then create an ActionBlock<int> and pass it a lambda that calculates the square of the number and appends it to the squareStringBuilder object:

var squareStringBuilder = new System.Text.StringBuilder();
var squareBuilderBlock = new ActionBlock<int>((x) =>

 {
     int result = x * x;
     squareStringBuilder.AppendLine(result.ToString());

 }, new ExecutionDataflowBlockOptions()

 {

     MaxDegreeOfParallelism = 4

 });

The MaxDegreeOfParellelism setting specifies how many input values may be processed in parallel by the ActionBlock. For this demo, I've set the property to four to match the number of cores assigned to my virtual machine. The default MaxDegreeOfParellilism is one. Be sure to keep the default value if your data must be processed sequentially.

You can post data to the ActionBlock through its Post or SendAsync methods. The Post method is ideal if you already have all the data ahead of time to process, or would like to guarantee items are received in the order they're sent. The SendAsync method is good to use if the order of processing doesn't matter. For example, to post a set of 10 numbers to the squareBuilderBlock in order, you could run:

for (int i = 1; i <= 10; i++)

{

    squareBuilderBlock.Post(i);

}

To post the number asynchronously, you would run:

for (int i = 1; i <= 10; i++)

{

   await squareBuilderBlock.SendAsync(i);

}

The ActionBlock will continue processing items until you call its Complete method:

squareBuilderBlock.Complete();

The Completion property of the ActionBlock can be used to check for its completion. The Completion property returns a Task, so you have the flexibility of waiting either synchronously or asynchronously for the block's completion. For example, to wait synchronously you could use:

squareBuilderBlock.Completion.Wait(). 

To wait asynchronously, you can leverage await:

await squareBuilderBlock.Completion;

Once the ActionBlock has finished its processing, you can retrieve the result via the squareStringBuilder:

String result = squareStringBuilder.ToString();
The TransformBlock
Now let's take a look at the TransformBlock. This is both a source and target block; its primary task is to receive a number, process it and return a result. Unlike the ActionBlock, the TransformBLock is able to both send and receive items.

A TransformBlock is created by passing it a Function<TInput,TResult>. The following code creates a TransformBlock that square roots the posted number:

var squareRootBlock = new TransformBlock<double, double>(x => Math.Sqrt(x));

Like an ActionBlock, a value is sent to a TransformBlock through its Post or SendAsync methods.

await squareRootBlock.SendAsync(25.0);

A single value may be received from a TransformBlock synchronously via its Receive or TryReceive methods:

double  result = squareRootBlock.Receive();

double sqrt;

squareRootBlock.TryReceive(out sqrt);

In addition, a result can be retrieved asynchronously from a TransformBlock  by way of its ReceiveAsync method.

double resultReceived = await squareRootBlock.ReceiveAsync();

Additionally, all items may be easily retrieved from a TransformBlock synchronously or asynchronously. The TryReceiveAll method is used to receive all items from a TransformBlock synchronously:

IList<double> allAvailableResults;
squareRootBlock.TryReceiveAll(out allAvailableResults);

To easily receive all available items asynchronously, you can check the OutputAvailableAsync method on the TransformBlock:

double numReceived;

while (await squareRootBlock.OutputAvailableAsync())

{

    numReceived = await squareRootBlock.ReceiveAsync();

}
Setting the Foundation
This article's covered the ActionBlock and TransformBlock dataflow blocks that allow for large amounts of data to be processed and transformed asynchronously. As you can see the TDF provides the building blocks necessary tocreate simple to complex data flows. It also provides the means to model both synchronous and asynchronous data flows. Stay tuned for the next installment of the series, in which I cover how to link dataflow blocks to create more complex code to tackle more complex data flow scenarios. As always, all code samples are available in the code drop link.

About the Author

Eric Vogel is a Senior Software Developer for Red Cedar Solutions Group in Okemos, Michigan. He is the president of the Greater Lansing User Group for .NET. Eric enjoys learning about software architecture and craftsmanship, and is always looking for ways to create more robust and testable applications. Contact him at [email protected].

comments powered by Disqus

Featured

Subscribe on YouTube