C# Corner

.NET Framework Parallel Dataflow, Part 3

How to implement a custom Task Parallel Dataflow block.

Welcome to the third and final installment of the Parallel Dataflow series. In Part 1, I covered the basics of installing using Task Parallel Dataflow (TDF). In Part 2, I went over how to combine Dataflow blocks to model more complex data flows. Today I'll cover how to create a custom dataflow block in two ways.

Before we get started, it's best to mention that I'm using version 4.5.6 of TDF, which is the release version. First, let's go over the easy way to create a custom dataflow block via the DataFlow Encapsulate() extension method. The custom block will be a composite of the BroadcastBlock and ActionBlock. When a message is posted to the custom block, it will update a given UI element on the UI thread to immediately display the posted datum. The Encapsulate method creates a new propagator block from given target and source dataflow blocks.

private static IPropagatorBlock<TInput, TOutput> CreateUpdateUiBlock<TInput, TOutput>(TextBlock element)
{
    var source = new BroadcastBlock<TOutput>(x => x);
    var target = new ActionBlock<TInput>(x => element.Text = x.ToString(),
         new ExecutionDataflowBlockOptions() 
        { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() });
    return DataflowBlock.Encapsulate(target, source);
}

As you can see, the encapsulate method is very handy for creating reusable dataflow blocks that may be used independently or within another dataflow. The major downside to this approach is that you don't have full control over how the target and source blocks are linked. Furthermore, you can only combine one target and one source block with Encapsulate().

The more customizable, albeit more complicated, approach is to implement the ITargetBlock<in TInput>, ISourceBlock<out TOutput> or IPropagator<in TInput, out Toutput> interfaces. For example the very same custom block may be created by implementing the IPropagator<TInput,TOutput> interface directly (Listing 1).

To get started, create a new class named UiUpdateBlock<T>.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Windows.UI.Xaml.Controls;
public class UiUpdateBlock<T> : IPropagatorBlock<T, T>

Next, create ITargetBlock<T> and ISourceBlock<T> class members.

private readonly ITargetBlock<T> _target;

private readonly ISourceBlock<T> _source;

In the constructor, the _target and _source are set to ActionBlock<T> and BroadCastBlock<T> implementations as follows. The target block will update the TextBlock's Text property with the posted data from the source BroadCast block.

public UiUpdateBlock(TextBlock element)
{
    _target = new ActionBlock<T>(x => element.Text = x.ToString(),
        new ExecutionDataflowBlockOptions() 
        { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() });
    _source = new BroadcastBlock<T>(x => x);
}

Next, the IPropagatorBlock interface methods must be implemented. First the OfferMessage method is implemented, which gives the target block its sent datum upon a post:

public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
    T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
    return _target.OfferMessage(messageHeader, messageValue,
        source, consumeToAccept);
}

The Complete() method simply notifies the target block that it should no longer handle any further posts:

public void Complete()
{
    _target.Complete();
}

The block is completed once the source block's Task has finished:

public Task Completion
 {
     get { return _source.Completion; }
 }

The target block handles any exceptions that may occur:

public void Fault(Exception exception)
{
    _target.Fault(exception);
}

The source block is responsible for consuming any linked Dataflow block messages:

public T ConsumeMessage(DataflowMessageHeader messageHeader,
    ITargetBlock<T> target, out bool messageConsumed)
{
    return _source.ConsumeMessage(messageHeader,
        target, out messageConsumed);
}

Similarly, the source block is the source for any linked Dataflow blocks:

public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
    return _source.LinkTo(target, linkOptions);
}

The source block is also responsible for handling the ReleaseReservation, to release a formerly kept message:

public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
    _source.ReleaseReservation(messageHeader, target);
}

Last, the ReserveMessage method is handled by the source block to reserve the given dataflow message.

public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
    return _source.ReserveMessage(messageHeader, target);
}

Finally, you can test out the custom Dataflow blocks. Open up MainPage.xaml and add the markup from StackPanel in Listing 2.

Now in the MainPage.xaml code-behind, you can quickly test out the custom dataflow blocks by updating the OnNavigatedTo to create two custom dataflow blocks: one to update the Message TextBlock, and other to update EncapsulatedMessage TextBlock.

protected async override void OnNavigatedTo(NavigationEventArgs e)
{
    UiUpdateBlock<int> test = new UiUpdateBlock<int>(Message);
    var customBLock = CreateUpdateUiBlock<int, string>(EncapsulateMessage);

    for (int i = 0; i < 100; i++)
    
    {
        await Task.Delay(200);
        await test.SendAsync(i);
        await customBLock.SendAsync(i);
    }
}

See Listing 3 for the completed MainPage class source.

You should now be able to run the demo application and see both TextBlocks get updated accordingly, via the custom dataflow blocks.


[Click on image for larger view.]
Figure 1. The finished sample application.
Use Cases
As you can see the Task Parallel Dataflow library can be extended to suit your parallel processing needs. Furthermore, through the use of custom data blocks you can encapsulate a complex dataflow for reuse.

When modeling a dataflow, first try to use the existing dataflow blocks; then, if needed, reach for the Encapsulate custom block approach. I recommend implementing a custom dataflow block through interface implementation for only the most complex dataflow cases.

About the Author

Eric Vogel is a Senior Software Developer for Red Cedar Solutions Group in Okemos, Michigan. He is the president of the Greater Lansing User Group for .NET. Eric enjoys learning about software architecture and craftsmanship, and is always looking for ways to create more robust and testable applications. Contact him at [email protected].

comments powered by Disqus

Featured

  • Compare New GitHub Copilot Free Plan for Visual Studio/VS Code to Paid Plans

    The free plan restricts the number of completions, chat requests and access to AI models, being suitable for occasional users and small projects.

  • Diving Deep into .NET MAUI

    Ever since someone figured out that fiddling bits results in source code, developers have sought one codebase for all types of apps on all platforms, with Microsoft's latest attempt to further that effort being .NET MAUI.

  • Copilot AI Boosts Abound in New VS Code v1.96

    Microsoft improved on its new "Copilot Edit" functionality in the latest release of Visual Studio Code, v1.96, its open-source based code editor that has become the most popular in the world according to many surveys.

  • AdaBoost Regression Using C#

    Dr. James McCaffrey from Microsoft Research presents a complete end-to-end demonstration of the AdaBoost.R2 algorithm for regression problems (where the goal is to predict a single numeric value). The implementation follows the original source research paper closely, so you can use it as a guide for customization for specific scenarios.

  • Versioning and Documenting ASP.NET Core Services

    Building an API with ASP.NET Core is only half the job. If your API is going to live more than one release cycle, you're going to need to version it. If you have other people building clients for it, you're going to need to document it.

Subscribe on YouTube