Data Flow Programming Tutorial (NAIAD)

Setting up an EC2 instance

Authoring a Naiad program

Using the Naiad library

Running jobs on a cluster

Extensions

In this tutorial, we will learn how to make use of the Naiad distributed computation framework on Amazon’s cloud computing platform.

Setting up an EC2 instance

You should have received some instructions containing a link to a tarball that holds the necessary files for this tutorial. Inside the archive you will find:

You will first need to extract the contents of the archive to a folder on your local machine. This will create a folder named ec2-bin. The ec2-config.sh script defines a set of environment variables that are needed in order to utilise the EC2 command line tools.

tar zxf <tarball>.tgz
cd ec2-bin
. ec2-config.sh
(NB that’s a dot followed by the script)

You may need to alter the ec2-config.sh script to point to the location of the Java Runtime Environment on your machine (which is used by the EC2 API tools).

The environment variables have now been loaded into the current shell session. The next step is to create a personal private key for your own cluster. The following command will request and download a private key with the filename pk-[userid] into the current folder.

./create-key.sh pk-$USER

Your machine will be distinguished from other students' machines using this private key. The next step is to actually provision the cluster.

./cluster-setup.sh pk-$USER

The above command will request an "on-demand" instance and provision it with a basic Ubuntu system image. For this tutorial, only "micro" instances with minimal resources will be used. The script creates three files in the local directory:

After the command has completed, the request will take some further time to process (usually a few seconds).

Following the successful completion of the software deployment process, we can connect to the machine.

master=`cat hosts-pub`
ssh -i pk-$USER ubuntu@$master

You will be connected to your virtual machine, and have full sudo access. You may want to install some additional packages using sudo apt-get install. The nano and vim text editors are installed by default, and these can be used to perform the tasks in the tutorial. Alternatively, you can edit files locally on your machine, and copy to/from your EC2 instances using scp.

Once your instance is running, you will need to install Mono to compile and run Naiad programs. Run the following while logged into your instance:

sudo apt-get install mono-complete

We can then download and build Naiad from the GitHub repository:

sudo apt-get install git

git clone https://github.com/MicrosoftResearch/Naiad.git

cd Naiad

git checkout tags/v0.4.2

sh ./build_mono.sh

To begin with, you will utilise a single EC2 instance to familiarise yourself with Naiad. Later in the tutorial, we will join the collective instances together to form a cluster.

Authoring a Naiad program

Documentation for the Naiad SDK library is available at the following URL (alternatively, it can be built directly from the source code, if you have access to Visual Studio).

http://microsoftresearch.github.io/Naiad/

To begin with, read through “Writing your first Naiad program”.  At a low level, Naiad programs consist of dataflow vertices (also known as “operators”), which are composed into partitioned segments called “stages”. Notice however that none of this terminology is used in the source code on that page. This is because Naiad provides a number of wrappers around these basic building blocks for the sake of convenience. A number of higher-level languages (or DSLs) are also provided within the Naiad library.

See if you can build and run the example source code on that page. Note that you will need to include a few more lines for syntactic correctness.

To compile the program, copy the Naiad core library built earlier into the same directory as your source code (it should be in ~/Naiad/Naiad/bin/Debug/Microsoft.Research.Naiad.dll).

Then use the following command:

dmcs <program.cs> /reference:Microsoft.Research.Naiad.dll

Replace the filename with the name of your source file. If it compiles successfully, you can execute the program using:

mono <program.exe>

Using the Naiad library

Depending on the level of customisation required, there are a number of different ways to utilise the library. We will examine a few of the Naiad example programs to highlight the various approaches. Source code for the examples can be found by navigating to the following directory:

cd ~/Naiad/Examples

To begin with, look at the “Throughput” example (Examples/Naiad/Throughput.cs). This example is designed to test the limits of the Naiad pipeline, by pumping messages as quickly as possible between two trivial dataflow vertices. In this example, the “producer” and “consumer” vertices are created by subclassing the standard “Vertex” abstract class.

Examine the source code and try to answer these questions:

In most cases, it won’t be necessary to to construct custom vertices in this manner. Instead, a number of general-purpose dataflow vertices are available, which allow the developer to focus on the actual computation rather than the distribution mechanism.

Now look at the “Wordcount” example in the same directory (Examples/Naiad/Wordcount.cs). This example makes use of the C# “extension method” syntax (described further on MSDN). Instead of overriding the entire class, this allows the developer to make use of a generic “Unary vertex” operator, and define a custom method to perform the actual computation.

The same example is also implemented separately using “NaiadLINQ”, in the differential dataflow examples directory (Examples/DifferentialDataflow/Wordcount.cs). LINQ (Language-INtegrated Query) is a DSL that defines a number of relational-style operations on input streams. These operators have been implemented in the Naiad framework, using the approaches defined above.

Running jobs on a cluster

Notice that each example program is instantiated using a “Computation” object. The “FromArgs” method allows configuration parameters to be passed to the Naiad execution engine. It is also possible to run multiple computations using a “Controller” object. Run the Examples.exe program without parameters to see a listing of Naiad engine options.

To run a multi-machine distributed computation, it is necessary to copy the executable to each machine, and launch each instance with a separate process ID (the -p option).

This approach is somewhat cumbersome, however, Naiad also contains library hooks to allow programs to make use of Windows Azure compute and storage directly from source.

Extensions

In this section we will make use of the concepts introduced thus far to design a more realistic dataflow computation. Twitter exposes a streaming API that allows developers to download public tweets as they are published. These streams are marked up using JSON syntax. The following commands will give you access to a set of files containing data recently downloaded from Twitter (you will need to substitute [SERVER] with a hostname provided by the supervisor).

apt-get install nfs-common
modprobe nfs
mkdir pubstore
mount [SERVER]:/pubstore pubstore

The pubstore directory will now contain several text files, each filled with approximately 100MB of tweets in JSON format.

Using the LINQ-based Wordcount example as a template, try to write a program that calculates the most popular hash tags in the downloaded sample. You will have to do the following:

Once you are satisfied that it is working in LINQ, try to implement the hashtag counter using extension methods (as in the second example above).

Once you have both versions of the program working, use the method described in the previous section to run the computation on multiple machines. Have each machine process a separate chunk of the file. Compare the runtime performance of the LINQ-based and extension-method implementations.