.NET 4.5 Baby Steps, Part 6: ISourceBlock
Introduction
We have spent a number of posts looking at IDataFlowBlock, BatchedBlocks and some other interesting blocks but as a final interesting bit for blocks is the ISourceBlock interface which is implemented on the “publisher” like blocks and it has a very interesting method: LinkTo. The goal of LinkTo is to chain blocks together to do interesting and amazing things!
It's in the name
The final interesting point, and likely the most important is in the name of the assembly all this goodness comes from: System.Threading.Tasks.DataFlow. All of this is handled my multiple tasks (and likely multiple threads) under the covers without you needing to worry about it. When we look at the TPL in .NET 4 and how it made working with multi-threaded processing easier, the final goal of this is to make multi-threaded data processing easier - and it does!
Demo LinkTo
In this demo what we want to do is take in DateTime’s as fast as possible, then convert them to strings, and send them in batches to the three subscribers. Sounds complex, but with the ability to LinkTo this is very easy.
static BufferBlock<DateTime> bufferPublisher = new BufferBlock<DateTime>(); static TransformBlock<DateTime, string> transformPublisher = new TransformBlock<DateTime, string>(d => { return d.ToLongTimeString(); }); static BatchBlock<string> batchPublisher = new BatchBlock<string>(5); static ActionBlock<string[]> subscriber1 = new ActionBlock<string[]>(s => { Console.WriteLine("Subscriber 1: {0}", s); }); static ActionBlock<string[]> subscriber2 = new ActionBlock<string[]>(s => { Console.WriteLine("Subscriber 2: {0}", s); }); static ActionBlock<string[]> subscriber3 = new ActionBlock<string[]>(s => { Console.WriteLine("Subscriber 3: {0}", s); }); static void Main(string[] args) { batchPublisher.AsObservable().Subscribe(subscriber1.AsObserver()); batchPublisher.AsObservable().Subscribe(subscriber2.AsObserver()); batchPublisher.AsObservable().Subscribe(subscriber3.AsObserver()); transformPublisher.LinkTo(batchPublisher); bufferPublisher.LinkTo(transformPublisher); for (int i = 0; i < 100; i++) { bufferPublisher.Post(DateTime.Now); Thread.Sleep(200); } Console.ReadLine(); }
![CropperCapture[1] CropperCapture[1]](https://www.sadev.co.za/files/CropperCapture1_thumb_3.gif)
.NET 4.5 Baby Steps, Part 4: BatchedBlocks
Introduction
We previously looked at the IDataFlowBlock interface in it’s simplest implementation ActionBlock<TInput>, today though we are going to look at more complex implementations that give amazing powers.
BatchBlock<T>
Where ActionBlock<TInput> could only be a subscriber, BatchBlock<T> is more than that – it can be a subscriber and a publisher rolled into one! As such the usage is rather different, we do not pass in the receive method in the constructor. Rather we call the Receive method to get the latest messages past to the subscriber.
The interesting thing about BatchBlock is it batches up messages into groups, so rather than getting each message one at a time, you get groups of messages. As such, you could wait a long time for enough the messages to arrive, so thankfully they also include a ReceiveAsync which works with C# new async methods.
In the sample below you can see how I create a BatchBlock with a batch size of 3, so messages get processed in groups of three. This also means the last two messages (since we are sending 20) are never processed.
static BatchBlock<string> pubSub = new BatchBlock<string>(3); static int counter = 0; static async void Process() { while (true) { var messages = await pubSub.ReceiveAsync(); foreach (var item in messages) { counter++; Console.WriteLine("Got message number {0}: {1}", counter, item); } } } static void Main(string[] args) { Process(); for (int i = 0; i < 20; i++) { Task.Factory.StartNew(() => { Thread.Sleep(new Random().Next(200, 1000)); pubSub.Post(DateTime.Now.ToLongTimeString()); }); } Console.ReadLine(); }
BatchedJoinBlock<T1,T2>
The second block implementation we look at is the BatchedJoinBlock<T1,T2> which is similar to BatchBlock<T> except, that it has multiple type inputs. This allows you to have a single publisher that can send messages to different subscribers based on type! The batching works the same as before, but be careful as batch sizes are based on all messages regardless of types.
static BatchedJoinBlock<string, int> pubSub = new BatchedJoinBlock<string, int>(3); static int stringCounter = 0; static int intCounter = 0; static async void Process() { while (true) { var messages = await pubSub.ReceiveAsync(); foreach (var item in messages.Item1) { stringCounter++; Console.WriteLine("Got STRING message number {0}: {1}", stringCounter, item); } foreach (var item in messages.Item2) { intCounter++; Console.WriteLine("Got INT message number {0}: {1}", intCounter, item); } } } static void Main(string[] args) { Process(); for (int i = 0; i < 20; i++) { Task.Factory.StartNew(() => { Thread.Sleep(new Random().Next(200, 1000)); pubSub.Target1.SendAsync(DateTime.Now.ToLongTimeString()); }); Task.Factory.StartNew(() => { Thread.Sleep(new Random().Next(200, 1000)); pubSub.Target2.SendAsync(new Random().Next(1, 99)); }); } Console.ReadLine(); }
Multiple Subscribers
So what happens when you add multiple subscribers to the system? It handles each processing in a round robin like way. The sample below is the same as the BatchBlock<T> about, but has three subscribers (A, B & C) and a batch size of two.
static BatchBlock<string> pubSub = new BatchBlock<string>(2); static int counter = 0; static async void Process(string id) { while (true) { var messages = await pubSub.ReceiveAsync(); foreach (var item in messages) { counter++; Console.WriteLine("{2} - Got message number {0}: {1}", counter, item, id); } } } static void Main(string[] args) { Process("A"); Process("B"); Process("C"); for (int i = 0; i < 11; i++) { Task.Factory.StartNew(() => { Thread.Sleep(new Random().Next(200, 1000)); pubSub.Post(DateTime.Now.ToLongTimeString()); }); } Console.ReadLine(); }
.NET 4.5 Baby Steps: Series Index
This page lists is just a list of the various posts in the .NET 4.5 Baby Steps series. If the page is grey or not a link then it is because the post is not published yet and you should check back soon.
.NET 4.5 Baby Steps, Part 3: IDataFlowBlock
Introduction
A new interface in .NET is the IDataFlowBlock, which is implemented in many interesting ways, so to look at those we will start off with the simplest implementation. ActionBlock<TInput> is a completely new class in .NET 4.5 and provides a way of working with data in a very task orientated way. I simplistically think of this as the implementation of the IObserver interface we got in .NET 4. but do not limit your thinking to just that.
To use it, you first must add a reference to System.Threading.Tasks.DataFlow
In this simple first example I am doing a fairly simple Pub/Sub demo:
var subscriber = new ActionBlock<string>(input => { Console.WriteLine("Got: {0}", input); }); for (int i = 0; i < 10; i++) { Task.Factory.StartNew(() => { Thread.Sleep(new Random().Next(200, 1000)); subscriber.Post(DateTime.Now.ToLongTimeString()); }); } Console.ReadLine();
![CropperCapture[3] CropperCapture[3]](https://www.sadev.co.za/files/CropperCapture3_thumb.gif)
As IObserver<T>
So the first fantastic feature is that it does have the ability (via extension method) to be an IObsserver<T> so it really solves the need to build up your own subscriber classes when implementing a pub/sub model.
First is the code for the publisher class – this is normal for the IObservable<T> as we had in .NET 4. This just means our new code can play well with our existing code.
public class Publisher : IObservable<string> { List<IObserver<string>> subscribers = new List<IObserver<string>>(); public IDisposable Subscribe(IObserver<string> observer) { subscribers.Add(observer); return null; } public void Send() { foreach (var item in subscribers) { item.OnNext(DateTime.Now.ToLongTimeString()); } } }
For our demo code, which produces the same as above:
var publisher = new Publisher(); var subscriber = new ActionBlock<string>(input => { Console.WriteLine("Got: {0}", input); }); publisher.Subscribe(subscriber.AsObserver()); for (int i = 0; i < 10; i++) { Task.Factory.StartNew(() => { Thread.Sleep(new Random().Next(200, 1000)); publisher.Send(); }); }
Complete
The next awesome feature is the Complete method which can be used to stop accepting of input when called – this is great for services where you want to shut down.
In this demo code it will run until you press enter:
var subscriber = new ActionBlock<string>(input => { Console.WriteLine("Got: {0}", input); }); Task.Factory.StartNew(() => { while (true) { Thread.Sleep(new Random().Next(200, 1000)); subscriber.Post(DateTime.Now.ToLongTimeString()); } }); Console.WriteLine("Press any key to stop input"); Console.ReadLine(); subscriber.Complete();
.NET 4.5 Baby Steps, Part 2: Task timeout cancellation
Introduction
When Tasks where introduced in .NET 4 one of the fantastic abilities was to be able to pass in a CancellationToken and use that to cancel/break out of tasks (think like a cancel button on a file copy).
So in the following code we create a cancellation source and pass the token to the task and it will output the date/time until you press enter. Then we call the Cancel method and it stops.
var cancelTokenSource = new System.Threading.CancellationTokenSource(); Task.Factory.StartNew(() => { while (!cancelTokenSource.IsCancellationRequested) { Console.WriteLine(DateTime.Now.ToLongTimeString()); Thread.Sleep(1000); } }, cancelTokenSource.Token); Console.WriteLine("Press any key to cancel"); Console.ReadLine(); cancelTokenSource.Cancel(); Console.WriteLine("Done"); Console.ReadLine();
What is new in .NET 4.5?
.NET 4.5 adds a FANTASTIC new feature to this, the ability to cancel automatically after a set timeout! So all we need to is change the constructor and set the time out. In the demo below it is set to three seconds.
It is also important to note that it is time from when you create the token source and not time from when the task starts.
var cancelTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(3))
Below in the screen capture, see how the word Done does not appear but processing stops? That is because it is cancelled (processing stopped) but I never pressed any keys – so it is still waiting for the readline above Done.
.NET 4.5 Baby Steps, Part 1: ThreadLocal<T>
Introduction
ThreadLocal<T> was introduced in .NET 4 and didn’t get much attention because it didn’t do much over the ThreadStaticAttribute which we have had since version 1 of the framework, so let’s just review what it does. In short it gives every unique thread that uses it, it’s own global field. Let’s look at this code:
static ThreadLocal<int> balances = new ThreadLocal<int>(() => { return 10; }); static void Main(string[] args) { for (int i = 0; i < 10; i++) { new Thread(AddMoney).Start(); } Console.ReadLine(); } static void AddMoney() { Console.WriteLine("Before {0}", balances.Value); balances.Value += new Random().Next(0, 1000); Console.WriteLine("After {0}", balances.Value); }
Which produces:
Note that ever Before is set to 10 and that is because the lambda method that we pass to the ThreadLocal<T> constructor is run for each unique thread.
What’s new in .NET 4.5?
.NET 4.5 improves the usefulness of this by including the .Values parameter which allows you to list the results from each thread! To make use of this you need to opt-in in the constructor by adding true:
static ThreadLocal<int> balances = new ThreadLocal<int>(() => { return 10; }, true);And then in my demo I will output the results using:
foreach (var item in balances.Values) { Console.WriteLine("Balance at end: {0}", item); }
![CropperCapture[2] CropperCapture[2]](https://www.sadev.co.za/files/CropperCapture2_thumb.gif)
This is VERY useful when working with threads and doing individual calculations and then collating the results at the end!
Warning
ThreadLocal<T> only works with unique threads! So using with the TPL or ThreadPool which reuse threads will not work as expected!
South African Postal Codes for Windows Phone
I can NEVER EVER remember the postal code for where I work or where I live – that little four digit number just eludes my brain. So to solve that I thought, why can’t I have EVERY postal code with me always? So that is what I made happen with this simple Windows Phone application: Postal codes!
It is dog simple: one input the name of what you want then hit search and boom results. It includes both street and box codes
For the developers out there, this application source code is available too at: https://bitbucket.org/rmaclean/postal-codes
Windows 8 for the .NET developer
Last night I presented on Windows 8 for the .NET developer at the fantastic Developer User Group! We had a bumper crowd there which was great and really had some interesting discussions during and after the talk. Thank you to all that attended!
For those looking for the slides, demo script and demo bits they are below!
IntelliTrace vs. PreEmptive Analytics
IntelliTrace
Visual Studio 2010 introduced an amazing feature: IntelliTrace which allows for deep debugging experiences inside Visual Studio by collecting an AMAZING amount of information (basically a stack trace for every call in your code + meta data) and allowing you to use it later to replay the way the application was used. With this feature you could eliminate those “No Repro” bugs! The catch in 2010 was it was NOT allowed to be used in production. In Visual Studio 11 that has changed and we can use it in production: http://msdn.microsoft.com/en-us/library/hh398365(v=vs.110).aspx & http://msdn.microsoft.com/en-us/hh440472
PreEmptive Analytics
This change in licensing may seem to put IntelliTrace in direct competition with another great tool, PreEmptive Analytics (PA). I have mentioned this amazing tool before and with Visual Studio 11 it is included “in the box” so there seems to be a conflict brewing – but there isn’t.
Two sides of the same coin
These two tools are both part of the collect information so you can react to it later and fix bugs set of tools, but they have very different use cases. IntelliTrace is specific to the scenario of replaying an application for diagnosis and debugging purposes. It is not meant to be an always on tool and it is a tool that writes to a local file that needs to be collected some how.
PA on the other hand is a tool to always have on, it does capture error information but nothing more than the simple Exception + Stack which is not as useful, detailed or integrated into VS when compared to IntelliTrace. In addition PA allows me to do a lot of a lot of analytics on my application that are not possible in IntelliTrace:
- what features are people using
- where in the world are they
- when are they using it
- what are their machines like
In addition the PA reports get automatically sent to a server (that they run or that you can run if you have privacy/security concerns) so there is not need to waddle around collecting files.
I can also see scenarios that these two work hand in hand – PreEmptive getting higher level info that shows a lot of users having issue X, then the support guys contact some users and do a more detailed capture of the issue with IntelliTrace.
.NET 4.5 and how it sits in the .NET ecosystem
tl;dr
- .NET 4.5 – 8th major release.
- .NET 4.5 is an in place replacement of .NET 4.0.
- Installing it could cause issues for .NET 4.0, but is very unlikely and likely shows your app is using undocumented features or using features incorrectly.
- .NET vesions, CLR versions & language versions are not in sync.
- There is an awesome chart below which makes it easy to see the relationships in the ecosystem.
Introduction
.NET 4.5 is the next release and it is important to take a look how it fits in the .NET ecosystem. This is the 8th major release of .NET! What do I mean by major release? I mean any release that is not a patch/support only release, or put another way a major release it included major new features in the CLR and/or a new language options.
SxS vs. Replacement
In .NET we are lucky that many versions can run side by side (SxS) provided they have different versions of the CLR, however if a new major release shares the same CLR it is a replacement/additive version. For example: .NET 3.0 used the same CLR as .NET 2.0 (the CLR 2.0) and when installed replaced many of the files in .NET 2.0 with new versions and it is only via compiler directives that some things are turned on and off. The advantage of the SxS model is installing a new version doesn’t influence apps on the previous version in any way (i.e. if the app is 1.1 and works fine before .NET 2.0 was installed, it will keep working fine after .NET 2.0 is installed).
The problem with replacement model is that there is a chance that installing a new version breaks apps on the original version – however Microsoft does a RIDICULOUS number of testing to make sure this doesn’t happen, so that chance is very small. In fact if you happen to hit one, the chance is higher you are using undocumented features or using features incorrectly.
The reason for this explanation of SxS vs. replacement is that .NET 4.5 is an in place replacement for .NET 4.
Version Naming
Part of the confusion I suspect around me saying that .NET 4.5 is the eighth release is because Microsoft naming of versions is about as far from logic as you can get – the worst examples are the .NET Version 3.5 SP 1 is a major release labelled a Service Pack 1?! and the fact we do not have a version 3 of the CLR, it was just skipped?!
The other aspect is that versions of the CLR, versions of the Framework and versions of the languages are completely out of sync, so .NET 4.5 runs on the CLR version 4 and we write code in C# version 5.0 or VB version 11.0 – cause that makes sense :S
Awesome Poster
Here is an awesome poster to help remind you of all the above!