Data Flow Programming Tutorial (NAIAD)

Before you start

Before you start be sure you understand the following points:

Setting up an EC2 instance

You should have received some instructions containing a link to a tarball that holds the necessary files for this tutorial. Inside the archive you will find:

You will first need to extract the contents of the archive to a folder on your local machine. This will create a folder named ec2-bin. The script defines a set of environment variables that are needed in order to utilise the EC2 command line tools.

tar zxf <tarball>.tgz
cd ec2-bin

(NB that's a dot followed by the script)

You may need to alter the script to point to the location of the Java Runtime Environment on your machine (which is used by the EC2 API tools).

The environment variables have now been loaded into the current shell session. The next step is to create a personal private key for your own cluster. The following command will request and download a private key with the filename pk-[USER] into the current folder.

./ pk-USER

This script is executing the ec2-create-keypair command and dumping the result into a file called USER. This file contains your private key. Your public key is stored at the EC2 and it will be automatically added to new instances so you can access through SSH. Your machine will be distinguished from other students' machines using this private key. The next step is to actually provision the cluster.

./ pk-USER

The above command will request an "on-demand" instance and provision it with a basic Ubuntu system image. For this tutorial, only "micro" instances with minimal resources will be used. The script creates three files in the local directory:

After the command has completed, the request will take some further time to process (usually a few seconds).

Following the successful completion of the software deployment process, we can connect to the machine.

master=`cat hosts-pub`
ssh -i pk-USER ubuntu@\$master

You will be connected to your virtual machine, and have full sudo access. You may want to install some additional packages using sudo apt-get install. The nano and vim text editors are installed by default, and these can be used to perform the tasks in the tutorial. Alternatively, you can edit files locally on your machine, and copy to/from your EC2 instances using scp -i pk-USER source destination.

Once your instance is running, you will need to install Mono to compile and run Naiad programs. Run the following while logged into your instance:

sudo apt-get install mono-complete

We can then download and build Naiad from the GitHub repository:

sudo apt-get install git
git clone
cd Naiad
git checkout tags/v0.4.2
sh ./

To begin with, you will utilise a single EC2 instance to familiarise yourself with Naiad. Later in the tutorial, we will join the collective instances together to form a cluster.

Authoring a Naiad program

Documentation for the Naiad SDK library is available at the following URL (alternatively, it can be built directly from the source code, if you have access to Visual Studio).

To begin with, read through ``Writing your first Naiad program''. At a low level, Naiad programs consist of dataflow vertices (also known as ``operators''), which are composed into partitioned segments called ``stages''. Notice however that none of this terminology is used in the source code on that page. This is because Naiad provides a number of wrappers around these basic building blocks for the sake of convenience. A number of higher-level languages (or DSLs) are also provided within the Naiad library.

The code below corresponds to the example shown in the aforementioned tutorial.

using Microsoft.Research.Naiad;
using Microsoft.Research.Naiad.Input;
public class TestProgram{
	static void Main(string[] args)
	// 1. allocate a new dataflow computation.
	using (var computation = NewComputation.FromArgs(ref args))
		// 2. define an object which accepts input strings.
		var source = new BatchedDataSource<string>();

		// 3. convert the data source into a Naiad stream of strings.
		var input = computation.NewInput(source);

		// 4.request a notification for each batch of strings received.
		var output = input.Subscribe(x =>
			foreach (var line in x)
		// 5. start the computation, fixing the structure of the dataflow graph.
		// 6. read inputs from the System.Console as long as the user supplies them.
		for (var l = System.Console.ReadLine(); l.Length > 0; l = System.Console.ReadLine())
		// 7. signal that the source is now complete.
        // 8. block until all work is finished.

Copy and paste the code above into a file named program.cs. To compile the program, copy the Naiad core library built earlier into the same directory as your source code (it should be in /Naiad/Naiad/bin/Debug/Microsoft.Research.Naiad.dll).

Then use the following command:

dmcs program.cs /reference:Microsoft.Research.Naiad.dll

Then, you can execute the program using:

mono program.exe

Using the Naiad library

Depending on the level of customisation required, there are a number of different ways to utilise the library. We will examine a few of the Naiad example programs to highlight the various approaches. Source code for the examples can be found by navigating to the following directory:

cd ~/Naiad/Examples

To begin with, look at the ``Throughput'' example (Examples/Naiad/Throughput.cs). This example is designed to test the limits of the Naiad pipeline, by pumping messages as quickly as possible between two trivial dataflow vertices. In this example, the ``producer'' and ``consumer'' vertices are created by subclassing the standard ``Vertex'' abstract class.

Examine the source code and try to answer these questions:

In most cases, it won't be necessary to to construct custom vertices in this manner. Instead, a number of general-purpose dataflow vertices are available, which allow the developer to focus on the actual computation rather than the distribution mechanism.

Now look at the ``Wordcount'' example in the same directory (Examples/Naiad/Wordcount.cs). This example makes use of the C# ``extension method'' syntax (described further on MSDN). Instead of overriding the entire class, this allows the developer to make use of a generic ``Unary vertex'' operator, and define a custom method to perform the actual computation.

The same example is also implemented separately using ``NaiadLINQ'', in the differential dataflow examples directory (Examples/DifferentialDataflow/Wordcount.cs). LINQ (Language-INtegrated Query) is a DSL that defines a number of relational-style operations on input streams. These operators have been implemented in the Naiad framework, using the approaches defined above.

Running jobs on a cluster

Now we are going to run Naiads using several machines. First, we need to locate our public key. In your local machine you can find your private key (remember that you created that file under name pk-USER). Do not share that key, that's only for you. Your public key is stored in the set of allowed keys that can access your EC2 instance via SSH. Run the following commands to get your public key:
USER="your username"
cat ~/.ssh/authorized_keys > ~/$USER"".pub

Now a plain text file containing your public key should be in your home folder. Find a partner to exchange your public key with. Simply send the file via email to your partner. Your partner must add your public key to the authorized_keys in his EC2 instance. To do that:

USER="id of my requesting partner"
cat \$USER"".pub >> ~/.ssh/authorized_keys

Now from your local machine you should be able to SSH your partner's machine.

PARTNER_MACHINE="IP of your partner's machine"

Now that if you can SSH other machine you can simultaneously run two NAIADS instances. Simply open another command line console in your local machine. Using your private key now you can run a distributed NAIAD workflow.

Your EC2 instance:

master=`cat hosts-pub`
ssh -i USER ubuntu@$master
cd ~/Naiad/Examples/bin/Debug
mono Examples.exe wordcount -n 2 -p 0 -h IP1:2101 IP2:2101

Your partner's instance:

partner="your partner machine ip"
ssh -i USER ubuntu@$partner
cd ~/Naiad/Examples/bin/Debug
mono Examples.exe wordcount -n 2 -p 1 -h IP1:2101 IP2:2101

Similarly you can run the other examples. Be sure to indicate the IPs in the same order and to indicate different process numbers (p parameter) for each instance.


In this section we will make use of the concepts introduced thus far to design a more realistic dataflow computation. Twitter exposes a streaming API that allows developers to download public tweets as they are published. These streams are marked up using JSON syntax. The following commands will give you access to a set of files containing data recently downloaded from Twitter (you will need to substitute [SERVER] with a hostname provided by the supervisor).

apt-get install nfs-common
modprobe nfs
mkdir pubstore
mount [SERVER]:/pubstore pubstore

The pubstore directory will now contain several text files, each filled with approximately 100MB of tweets in JSON format.

Using the LINQ-based Wordcount example as a template, try to write a program that calculates the most popular hash tags in the downloaded sample. You will have to do the following:

Once you are satisfied that it is working in LINQ, try to implement the hashtag counter using extension methods (as in the second example above).

Once you have both versions of the program working, use the method described in the previous section to run the computation on multiple machines. Have each machine process a separate chunk of the file. Compare the runtime performance of the LINQ-based and extension-method implementations.