C# Corner

.NET Framework Parallel Dataflow, Part 3

How to implement a custom Task Parallel Dataflow block.

Welcome to the third and final installment of the Parallel Dataflow series. In Part 1, I covered the basics of installing using Task Parallel Dataflow (TDF). In Part 2, I went over how to combine Dataflow blocks to model more complex data flows. Today I'll cover how to create a custom dataflow block in two ways.

Before we get started, it's best to mention that I'm using version 4.5.6 of TDF, which is the release version. First, let's go over the easy way to create a custom dataflow block via the DataFlow Encapsulate() extension method. The custom block will be a composite of the BroadcastBlock and ActionBlock. When a message is posted to the custom block, it will update a given UI element on the UI thread to immediately display the posted datum. The Encapsulate method creates a new propagator block from given target and source dataflow blocks.

private static IPropagatorBlock<TInput, TOutput> CreateUpdateUiBlock<TInput, TOutput>(TextBlock element)
{
    var source = new BroadcastBlock<TOutput>(x => x);
    var target = new ActionBlock<TInput>(x => element.Text = x.ToString(),
         new ExecutionDataflowBlockOptions() 
        { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() });
    return DataflowBlock.Encapsulate(target, source);
}

As you can see, the encapsulate method is very handy for creating reusable dataflow blocks that may be used independently or within another dataflow. The major downside to this approach is that you don't have full control over how the target and source blocks are linked. Furthermore, you can only combine one target and one source block with Encapsulate().

The more customizable, albeit more complicated, approach is to implement the ITargetBlock<in TInput>, ISourceBlock<out TOutput> or IPropagator<in TInput, out Toutput> interfaces. For example the very same custom block may be created by implementing the IPropagator<TInput,TOutput> interface directly (Listing 1).

To get started, create a new class named UiUpdateBlock<T>.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Windows.UI.Xaml.Controls;
public class UiUpdateBlock<T> : IPropagatorBlock<T, T>

Next, create ITargetBlock<T> and ISourceBlock<T> class members.

private readonly ITargetBlock<T> _target;

private readonly ISourceBlock<T> _source;

In the constructor, the _target and _source are set to ActionBlock<T> and BroadCastBlock<T> implementations as follows. The target block will update the TextBlock's Text property with the posted data from the source BroadCast block.

public UiUpdateBlock(TextBlock element)
{
    _target = new ActionBlock<T>(x => element.Text = x.ToString(),
        new ExecutionDataflowBlockOptions() 
        { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() });
    _source = new BroadcastBlock<T>(x => x);
}

Next, the IPropagatorBlock interface methods must be implemented. First the OfferMessage method is implemented, which gives the target block its sent datum upon a post:

public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
    T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
    return _target.OfferMessage(messageHeader, messageValue,
        source, consumeToAccept);
}

The Complete() method simply notifies the target block that it should no longer handle any further posts:

public void Complete()
{
    _target.Complete();
}

The block is completed once the source block's Task has finished:

public Task Completion
 {
     get { return _source.Completion; }
 }

The target block handles any exceptions that may occur:

public void Fault(Exception exception)
{
    _target.Fault(exception);
}

The source block is responsible for consuming any linked Dataflow block messages:

public T ConsumeMessage(DataflowMessageHeader messageHeader,
    ITargetBlock<T> target, out bool messageConsumed)
{
    return _source.ConsumeMessage(messageHeader,
        target, out messageConsumed);
}

Similarly, the source block is the source for any linked Dataflow blocks:

public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
    return _source.LinkTo(target, linkOptions);
}

The source block is also responsible for handling the ReleaseReservation, to release a formerly kept message:

public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
    _source.ReleaseReservation(messageHeader, target);
}

Last, the ReserveMessage method is handled by the source block to reserve the given dataflow message.

public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
    return _source.ReserveMessage(messageHeader, target);
}

Finally, you can test out the custom Dataflow blocks. Open up MainPage.xaml and add the markup from StackPanel in Listing 2.

Now in the MainPage.xaml code-behind, you can quickly test out the custom dataflow blocks by updating the OnNavigatedTo to create two custom dataflow blocks: one to update the Message TextBlock, and other to update EncapsulatedMessage TextBlock.

protected async override void OnNavigatedTo(NavigationEventArgs e)
{
    UiUpdateBlock<int> test = new UiUpdateBlock<int>(Message);
    var customBLock = CreateUpdateUiBlock<int, string>(EncapsulateMessage);

    for (int i = 0; i < 100; i++)
    
    {
        await Task.Delay(200);
        await test.SendAsync(i);
        await customBLock.SendAsync(i);
    }
}

See Listing 3 for the completed MainPage class source.

You should now be able to run the demo application and see both TextBlocks get updated accordingly, via the custom dataflow blocks.


[Click on image for larger view.]
Figure 1. The finished sample application.
Use Cases
As you can see the Task Parallel Dataflow library can be extended to suit your parallel processing needs. Furthermore, through the use of custom data blocks you can encapsulate a complex dataflow for reuse.

When modeling a dataflow, first try to use the existing dataflow blocks; then, if needed, reach for the Encapsulate custom block approach. I recommend implementing a custom dataflow block through interface implementation for only the most complex dataflow cases.

About the Author

Eric Vogel is a Senior Software Developer for Red Cedar Solutions Group in Okemos, Michigan. He is the president of the Greater Lansing User Group for .NET. Eric enjoys learning about software architecture and craftsmanship, and is always looking for ways to create more robust and testable applications. Contact him at [email protected].

comments powered by Disqus

Featured

  • VS Code Copilot Previews New GPT-4o AI Code Completion Model

    The 4o upgrade includes additional training on more than 275,000 high-quality public repositories in over 30 popular programming languages, said Microsoft-owned GitHub, which created the original "AI pair programmer" years ago.

  • Microsoft's Rust Embrace Continues with Azure SDK Beta

    "Rust's strong type system and ownership model help prevent common programming errors such as null pointer dereferencing and buffer overflows, leading to more secure and stable code."

  • Xcode IDE from Microsoft Archrival Apple Gets Copilot AI

    Just after expanding the reach of its Copilot AI coding assistant to the open-source Eclipse IDE, Microsoft showcased how it's going even further, providing details about a preview version for the Xcode IDE from archrival Apple.

  • Introduction to .NET Aspire

    Two Microsoft experts will present on the cloud-native application stack designed to simplify the development of distributed systems in .NET at the Visual Studio Live! developer conference coming to Las Vegas next month.

  • Microsoft Previews Copilot AI for Open-Source Eclipse IDE

    Catering to Java jockeys, Microsoft is yet again expanding the sprawling reach of its Copilot-branded AI assistants, previewing a coding tool for the open-source Eclipse IDE.

Subscribe on YouTube

Upcoming Training Events