C# Corner

.NET Framework Parallel Dataflow, Part 3

How to implement a custom Task Parallel Dataflow block.

Welcome to the third and final installment of the Parallel Dataflow series. In Part 1, I covered the basics of installing using Task Parallel Dataflow (TDF). In Part 2, I went over how to combine Dataflow blocks to model more complex data flows. Today I'll cover how to create a custom dataflow block in two ways.

Before we get started, it's best to mention that I'm using version 4.5.6 of TDF, which is the release version. First, let's go over the easy way to create a custom dataflow block via the DataFlow Encapsulate() extension method. The custom block will be a composite of the BroadcastBlock and ActionBlock. When a message is posted to the custom block, it will update a given UI element on the UI thread to immediately display the posted datum. The Encapsulate method creates a new propagator block from given target and source dataflow blocks.

private static IPropagatorBlock<TInput, TOutput> CreateUpdateUiBlock<TInput, TOutput>(TextBlock element)
{
    var source = new BroadcastBlock<TOutput>(x => x);
    var target = new ActionBlock<TInput>(x => element.Text = x.ToString(),
         new ExecutionDataflowBlockOptions() 
        { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() });
    return DataflowBlock.Encapsulate(target, source);
}

As you can see, the encapsulate method is very handy for creating reusable dataflow blocks that may be used independently or within another dataflow. The major downside to this approach is that you don't have full control over how the target and source blocks are linked. Furthermore, you can only combine one target and one source block with Encapsulate().

The more customizable, albeit more complicated, approach is to implement the ITargetBlock<in TInput>, ISourceBlock<out TOutput> or IPropagator<in TInput, out Toutput> interfaces. For example the very same custom block may be created by implementing the IPropagator<TInput,TOutput> interface directly (Listing 1).

To get started, create a new class named UiUpdateBlock<T>.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Windows.UI.Xaml.Controls;
public class UiUpdateBlock<T> : IPropagatorBlock<T, T>

Next, create ITargetBlock<T> and ISourceBlock<T> class members.

private readonly ITargetBlock<T> _target;

private readonly ISourceBlock<T> _source;

In the constructor, the _target and _source are set to ActionBlock<T> and BroadCastBlock<T> implementations as follows. The target block will update the TextBlock's Text property with the posted data from the source BroadCast block.

public UiUpdateBlock(TextBlock element)
{
    _target = new ActionBlock<T>(x => element.Text = x.ToString(),
        new ExecutionDataflowBlockOptions() 
        { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() });
    _source = new BroadcastBlock<T>(x => x);
}

Next, the IPropagatorBlock interface methods must be implemented. First the OfferMessage method is implemented, which gives the target block its sent datum upon a post:

public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
    T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
    return _target.OfferMessage(messageHeader, messageValue,
        source, consumeToAccept);
}

The Complete() method simply notifies the target block that it should no longer handle any further posts:

public void Complete()
{
    _target.Complete();
}

The block is completed once the source block's Task has finished:

public Task Completion
 {
     get { return _source.Completion; }
 }

The target block handles any exceptions that may occur:

public void Fault(Exception exception)
{
    _target.Fault(exception);
}

The source block is responsible for consuming any linked Dataflow block messages:

public T ConsumeMessage(DataflowMessageHeader messageHeader,
    ITargetBlock<T> target, out bool messageConsumed)
{
    return _source.ConsumeMessage(messageHeader,
        target, out messageConsumed);
}

Similarly, the source block is the source for any linked Dataflow blocks:

public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
    return _source.LinkTo(target, linkOptions);
}

The source block is also responsible for handling the ReleaseReservation, to release a formerly kept message:

public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
    _source.ReleaseReservation(messageHeader, target);
}

Last, the ReserveMessage method is handled by the source block to reserve the given dataflow message.

public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
    return _source.ReserveMessage(messageHeader, target);
}

Finally, you can test out the custom Dataflow blocks. Open up MainPage.xaml and add the markup from StackPanel in Listing 2.

Now in the MainPage.xaml code-behind, you can quickly test out the custom dataflow blocks by updating the OnNavigatedTo to create two custom dataflow blocks: one to update the Message TextBlock, and other to update EncapsulatedMessage TextBlock.

protected async override void OnNavigatedTo(NavigationEventArgs e)
{
    UiUpdateBlock<int> test = new UiUpdateBlock<int>(Message);
    var customBLock = CreateUpdateUiBlock<int, string>(EncapsulateMessage);

    for (int i = 0; i < 100; i++)
    
    {
        await Task.Delay(200);
        await test.SendAsync(i);
        await customBLock.SendAsync(i);
    }
}

See Listing 3 for the completed MainPage class source.

You should now be able to run the demo application and see both TextBlocks get updated accordingly, via the custom dataflow blocks.


[Click on image for larger view.]
Figure 1. The finished sample application.
Use Cases
As you can see the Task Parallel Dataflow library can be extended to suit your parallel processing needs. Furthermore, through the use of custom data blocks you can encapsulate a complex dataflow for reuse.

When modeling a dataflow, first try to use the existing dataflow blocks; then, if needed, reach for the Encapsulate custom block approach. I recommend implementing a custom dataflow block through interface implementation for only the most complex dataflow cases.

About the Author

Eric Vogel is a Senior Software Developer for Red Cedar Solutions Group in Okemos, Michigan. He is the president of the Greater Lansing User Group for .NET. Eric enjoys learning about software architecture and craftsmanship, and is always looking for ways to create more robust and testable applications. Contact him at [email protected].

comments powered by Disqus

Featured

  • IDE Irony: Coding Errors Cause 'Critical' Vulnerability in Visual Studio

    In a larger-than-normal Patch Tuesday, Microsoft warned of a "critical" vulnerability in Visual Studio that should be fixed immediately if automatic patching isn't enabled, ironically caused by coding errors.

  • Building Blazor Applications

    A trio of Blazor experts will conduct a full-day workshop for devs to learn everything about the tech a a March developer conference in Las Vegas keynoted by Microsoft execs and featuring many Microsoft devs.

  • Gradient Boosting Regression Using C#

    Dr. James McCaffrey from Microsoft Research presents a complete end-to-end demonstration of the gradient boosting regression technique, where the goal is to predict a single numeric value. Compared to existing library implementations of gradient boosting regression, a from-scratch implementation allows much easier customization and integration with other .NET systems.

  • Microsoft Execs to Tackle AI and Cloud in Dev Conference Keynotes

    AI unsurprisingly is all over keynotes that Microsoft execs will helm to kick off the Visual Studio Live! developer conference in Las Vegas, March 10-14, which the company described as "a must-attend event."

  • Copilot Agentic AI Dev Environment Opens Up to All

    Microsoft removed waitlist restrictions for some of its most advanced GenAI tech, Copilot Workspace, recently made available as a technical preview.

Subscribe on YouTube