C# Corner
Reactive Extensions: Just What the Doctor Ordered (Part 2)
In Part 1, VSM columnist Eric Vogel covered the basics of the Reactive Extensions (Rx) library. In this installment he explores how to observe asynchronous methods, tasks and events, as well as how to compose observable sequences using LINQ.
Read the rest of the series: Part 1 | Part 2 | Part 3
In Part 1 of this series, I covered the basics of the Reactive Extensions (Rx) library. In this installment I explore how to observe asynchronous methods, tasks and events, as well as how to compose observable sequences using LINQ.
Observing an Asynchronous Method
Rx includes an extension method name FromAsynchPattern that can convert an asynchronous method that uses the Asynchronous Programming Model (APM) pattern to an IObservable. A few likely scenarios would be retrieving data from a Web service or reading or writing to a stream asynchronously. Let's look at an example for the former. We'll be calling the Bing Translation Service to translate a piece of text using a generated asynchronous proxy method.
To get started, create a new C# Console Application and add a reference to System.Reactive. Next add a service reference to http://api.microsofttranslator.com/V2/Soap.svc.
[Click on image for larger view.] |
Figure 1. |
Make sure to generate asynchronous operations. You can do this by clicking on the Advanced button on the Add Service Reference and checking the "Generate asynchronous operations" checkbox.
Next we setup the parameters for our web service method call.
string appId = "88E025E2D1E4C4CD930DD6C31AB82E3C49552CBB";
string text = "Hello";
string fromLanguage = "en";
string toLanguage = "fr";
string contentType = "text/plain";
string category = "general";
Then we construct our translation service proxy.
var proxy = new BingTranslatorService.LanguageServiceClient();
After that we create a wrapper around the BeginTranslate and EndTranslate APM service methods. The FromAsychPattern needs to know the method parameters as well as the return type of the method that is to be the wrapper -- in our case the Translate method takes six strings and returns a string. The created getTranslation method will be called by passing all of the Translate method parameters and will return an IObservable<string>.
An easy way to get the correct method signature is to use IntelliSense on the generated synchronous version of the proxy method.
var getTranslation = Observable.FromAsyncPattern<string, string, string, string, string, string, string>(proxy.BeginTranslate, proxy.EndTranslate);
Now we call the IObservable<string> from the getTranslation method that will push our translated result.
var result = getTranslation(appId, text, fromLanguage, toLanguage, contentType, category);
Lastly, we subscribe to the result Observable and output the translated version of the text. If an error occurs we output the exception message. To increase code clarity I opted to use named parameters to call out the onNext and onError handlers. Named parameters have been in VB.NET since its inception but was just recently added to C#.
result.Subscribe(
onNext: translatedText => Console.WriteLine(translatedText),
onError: ex => Console.WriteLine(ex.Message));
Observing a Task<T>
Rx adds a ToObservable<T> extension method to Task<T> that allows converting from a Task<T> to an IObservable<T>. An ideal case for using ToObservable is if you have multiple Tasks that have results that are to be composed. For example if you have two long running computations that can be computed in parallel that are later integrated.
First we import the System.Reactive, System.Reactvie.Linq, System.Threading.Tasks, and System.Reactive.Threading.Tasks namespaces.
using System.Reactive.Linq;
using System.Reactive;
using System.Threading.Tasks;
using System.Reactive.Threading.Tasks;
Then we create a Task<double> for each computation.
double angle = 30;
Task<double> tCos = Task.Factory.StartNew<double>(() => Math.Cos(angle));
Task<double> tSin = Task.Factory.StartNew<double>(() => Math.Sin(angle));
Next we convert each Task to an IObserable<double> and create a new IObservable<double> that is the product of each executed task.
var result = from x in tCos.ToObservable()
from y in tSin.ToObservable()
select x * y;
Once we have composed our formula we can subscribe to it and work with the result. In this case we simply just output the result to the console in the OnNext event.
result.Subscribe(x => Console.WriteLine(x));
Observing a Composed Event
The classic example of a composed event is drag and drop. We would like to observe when a user is holding down a mouse button over an element and moving the mouse. The event should stop when the user releases the mouse button. We can compose a drag and drop event by joining three event sources, MouseDown, MouseMove and MouseUp. Our composite event will capture the position of an element while the user is clicking down on it. We'll create a re-usable function that will allow any control to be dragged and dropped.
To get started, create a new WPF or Silverlight C# Application and update your MainWindow.xaml file to include the following markup.
<Canvas>
<Label Content="Drag Me" Height="28" HorizontalAlignment="Left" Margin="0,0,0,0" Name="lblDragMe" Background="Black" Foreground="White" VerticalAlignment="Top" Canvas.Left="45" Canvas.Top="45" />
</Canvas>
The default Grid has been replaced with a Canvas that will be used to reposition the label. Next we create our re-usable DragNDrop function, which will allow us to observe a drop position for any Control.
private IObservable<Point> DragNDrop(Control elem)
{
First we create an observable collection of mouse down positions.
var mouseDownPos = from evt in Observable.FromEvent<MouseButtonEventArgs>(
ev => elem.MouseDown += (s, e) => ev(e),
ev => elem.MouseDown -= (s, e) => ev(e))
select evt.GetPosition(elem);
Next we create an observable collection of mouse move positions.
var mouseMovePos = from evt in Observable.FromEvent<MouseEventArgs>(
ev => elem.MouseMove += (s, e) => ev(e),
ev => elem.MouseMove -= (s, e) => ev(e))
select evt.GetPosition(this);
Then we create an observable collection to capture mouse up events.
var mouseMovePos = from evt in Observable.FromEvent<MouseEventArgs>(
ev => this.MouseMove += (s, e) => ev(e),
ev => this.MouseMove -= (s, e) => ev(e))
select evt.GetPosition(this);
Next we compose an event that pushes the drop position for the element.
var dropPos = from elemOffset in mouseDownPos
from pos in mouseMovePos.TakeUntil(mouseUp)
select new Point
{
X = pos.X - elemOffset.X,
Y = pos.Y - elemOffset.Y
};
return dropPos;
}
Lastly we subscribe to the event and update the position of the element we would like to drag and drop.
DragNDrop(lblDragMe).Subscribe(dropPos =>
{
Canvas.SetLeft(lblDragMe, dropPos.X);
Canvas.SetTop(lblDragMe, dropPos.Y);
});
Conclusion
Now that you know how to both call an asynchronous method using Rx, as well as how to how to compose observable sequences, you can start to see the power of Rx . Stay tuned for the next and final installment, where I will show how to pull together what we learned so far to create a truly reactive application.
All code examples are included in the code drop on the top of the page.
About the Author
Eric Vogel is a Senior Software Developer for Red Cedar Solutions Group in Okemos, Michigan. He is the president of the Greater Lansing User Group for .NET. Eric enjoys learning about software architecture and craftsmanship, and is always looking for ways to create more robust and testable applications. Contact him at [email protected].