# Julia Distributed Computing in the Cloud

### Abstract.

This brief note is intended to illustrate why the programming language Julia is so interesting to a growing number of computational and data scientists.  Julia is designed to deliver high performance on modern hardware while retaining the interactive capabilities that make it well suited for Jupyter-style scientific exploration. This paper illustrates, with some very unscientific examples, how Julia can be deployed and with Docker and Kubernetes in the cloud.

# Introduction

In all our previous posts we have used Python to build applications that interact with cloud services.  We used Python because everybody knows it.  However, as many scientists have now discovered, for scientific applications the programming language Julia is a better alternative.  This note is not a Julia tutorial, but rather, it  is intended to illustrate why Julia is so interesting. Julia was launched in 2012 by Jeff Bezanson, Stefan Karpinski, Viral B. Shah, and Alan Edelman and is now gaining a growing collection of users in the science community.  There is a great deal of Julia online material and a few books (and some of it is up to date).   In a previous blog post we looked at Python Dask for distributed computing in the cloud.   In this article we focus on how Julia can be used for those same parallel and distributed computing tasks in the Cloud. Before we get to the cloud some motivational background is in order.

# 1.    Julia is Fast!

There is no reason a language for scientific computing must be as dull as Fortran or C.   Languages with lots of features like Java, Python and Scala are a pleasure to use but they are slow.   Julia is dynamic (meaning it can run in a read-eval-print-loop in Jupyter and other interactive environments), it has a type system that support parametric polymorphism and multiple dispatch.   It is also garbage collected and extremely extensible.   It has a powerful package system, macro facilities and a growing collection of libraries.   It can call C and Python directly if it is needed.

And it generates fast code.   Julia uses a just-in-time compiler that optimized your program depending on how you use it.    To illustrate this point, consider a simple function of the form

function addem(x)
# do some simple math with x
x += x*x
return x
end


Because we have not specified the exact type of x, this defines a generic function: it will work with arguments of any type that has meaning for the math operations used.   But when we invoke it with a specific type, such as int or float, a version of the function is compiled on the fly that is specialized to that type.  This specialization takes a bit of time, but when we invoke the function a second time  with that argument type, we used the specialized version.   As illustrated in Figure 1, we called the function twice with integer arguments.  The second call is substantially faster than the first.  Calling it with a floating point argument is slow until the specialized version for floating point variables is created, then the function runs fast.

Figure 1.   Using a timing macro to measure the speed of function calls.   In step 23 the generic version is called with an integer.   The second call uses the version optimized for integers.   In  steps 25 an 26 the specialized version for floating point numbers is generated and run.

The Julia team has put together a benchmark set of small examples written in several languages.  We extracted the benchmarks for C, Python and Julia and ran them.   The resulting execution time are shown below.   As you can see, the Julia code generation is arguably as good as  C (compiled with optimized gcc).  Relative to Python it is as much as 50 times faster.  Figure 2 makes this more explicit.  The only benchmark where Python is within a factor of two is  matrix multiply and that is because Python is using the optimized numpy.linalg libraries.

 Benchmark python gcc – O julia recursion_fibonacci 2.977 0.000 0.037 parse_integers 2.006 0.117 0.197 userfunc_mandelbrot 6.294 0.068 0.065 recursion_quicksort 12.284 0.363 0.364 iteration_pi_sum 558.068 22.604 22.802 matrix_statistics 82.952 11.200 10.431 matrix_multiply 70.496 41.561 41.322 print_to_file 72.481 17.466 8.100

Figure 2.   Speed-up of Julia and C relative to Python

# 2.    Julia Math and Libraries

Doing basic linear algebra and matrix computation in Julia is trivial.  The following operations each take less than one second in Jupyter using the package LinearAlgebra.jl.

M = rand(1000, 1000); # generates a 1000 by 1000 matrix of random floats
N = M^-1                        # compute the inverse of M
C = M*M’                       # M’ is the transpose so the product is symmetric.
Q = svd(C)                     # computes the singular value decomposition of C.
Q.S                                 # are the singular values.

Sparse matrices are also supported along with a large number of other matrix creation and manipulation operations.

### Differential Equations

The differential equation package built by a team led by Christopher Rackauckas is one of the most impressive.  To illustrate this we consider an example from their excellent tutorial.   The Lorenz attractor is a fascinating example of a chaotic solution to a system of differential equations that model convection.   The system involves the evolution in 3D of a system that is characterized by three parameters.  As shown below the equations are expressed exactly as you would describe them mathematically (dx is dx/dt) and the three parameters ar sigma, rho and beta.   The initial point is (1,0,0) and the region is integrated over [0,100].   The package automatically picks an appropriate solver and the output is plotted as shown in Figure 3.   Running this on a Mac mini took about 5 seconds.  We used another package “Plots” to render the image.

g = @ode_def LorenzExample begin
dx = σ*(y-x)
dy = x*(ρ-z) – y
dz = x*y – β*z
end σ ρ β

u0 = [1.0;0.0;0.0]
tspan = (0.0,100.0)
p = [10.0,28.0,8/3]
prob = ODEProblem(g,u0,tspan,p)
sol = solve(prob)
plot(sol)

Figure 3.  Plot of the Lorenz attractor solution. (yes, Greek Unicode character names are valid.)

Note: Thomas Breloff provides a more explicit integration with a Gif animation in this Plots tutorial.  As with the above example, Breloff’s solution works well in Jupyter.

# Julia Distributed Computing in the Cloud

Julia has several packages that support parallel computing.   These include a library for CUDA programming, OpenMP, Spark and  MPI.jl for MPI programming.  MPI is clearly the most reliably scalable parallel computing model for tasks involving 10000 or more cores.  It is based on low-latency, two-sided communication in which coordinated, synchronized send-receive pairs and collective operations are executed in parallel across large clusters equipped with specialized networking.  While Julia can be used with MPI, the natural model of communication in Julia is based on one-sided communication based on threads, tasks, futures and channels.  We will use the package Distributed.jl to demonstrate this.

The Julia distributed computing model is based on distributing tasks and arrays to a pool of workers.  Worker are either Julia processes running on the current host or on other hosts in your cluster.   (In the paragraphs that follow we show how to launch local workers, then workers on other VMs in the cloud and finally workers as docker containers running in a Kubernetes cluster.

Let’s begin with a trivial example.   Create a function which will flip a coin “n” times and count the number of heads.

Here we generated random Boolean values and converted them to 0 or 1 and added them up.   Now let’s add some workers.

First we include the “Distributed” package and count the current number of workers.   Next using the “addproc( )” function we added two new worker processes on the same server running this Jupyter notebook.  Workers that our notebook process knows about are identified with an integer and we can list them.   (The notebook process, called here the master, is not a worker)

The worker processes are running Julia but they don’t know about the things we have defined in the master process.  Julia has a very impressive macro facility that is used extensively in the distributed computing package to distributed code objects to the workers and  to launch remote computations.

When we create a function in the master that we want executed in the workers we must make sure it also gets defined in that worker.   We use the “@everywhere” macro to  make sure things we define locally are also defined in each worker.   We even must tell the workers we are using the Distributed package.  In the code below we created a new version of our count_heads function and distributed it.

Julia uses the concept of futures to launch computation on the workers.   The “@spawnat” macro takes two parameters: the ID of a worker and a computation to be launched.   What is returned is a future: a placeholder for the final result.   By “fetch( a)” we can grab the result of the future computation and return it to our calling environment.  (Some library functions like “printf()” when executed on the workers are automatically mapped back to the master.)

In the following example we create two local workers and define a function for them to execute.  Workers each have a unique integer ID and we print them.   Then we use @spawnat to launch the function on each worker in turn.

We can easily measure the cost of this remote function evaluation with the @time macro as follows. We enclose the call and fetch in a block and time the block.  (Notice we have increased the number of coins to 109 from 104.)

If we want both workers working together, we can compose a list of spawned invocations with a construct of the form

[ @spawn at i expr(i) for i in range]


This returns a list (technically, in Julia it is an array) of futures.   We can then grab the result of each future in turn.   The result is shown below.  This is run on a dual core desktop machine, so parallelism is limited but, in this case, the parallel version is 1.85 times faster than the individual call.   The reason that it is not exactly two time faster is partly due to the overhead in sequentially launching and resolving the futures.  But it is also due to communication delays and other OS scheduling delays.

A more compact and elegant way to write this parallel program is to use the Julia distributed parallel map function  pmap(f, args).  pmap takes a function and applies it to each element of a set of arguments and uses the available workers to do the work.   The results are returned in an array.

In this case count_headse did not need an argument so we constructed an anonymous function with one parameter to provide to the pmap function.  In this execution we were only concerned with dividing the work into two parts and then letting the system schedule and execute them with the available worker resources.   We chose 2 parts because we know there is two workers.   However, we could have divided into 10 parts and applied 10 argument values and the task would have been accomplished using the available workers.

## Julia distributed across multiple host machines.

To create a worker instance on another host Julia uses secure shell (ssh) tunnels to talk to it.  Hence you need five things:  the IP address of the host, the port that secure shell uses, the identity of the “user” on that host and the private ssh key for that user.  The ssh key pair must be password-less.  The location of the Julia command on that host is also needed.

In this and the next example we simplify the task of deploying Julia on the remote host by deploying our Julia package as a docker container launched on that host.  To make the ssh connection work we have mapped the ssh port 22 on the docker container to port 3456 on the host.   (We describe the container construction and how it is launched in the next section.)

In the  previous section we provided “addprocs()” with a single integer representing the number of worker we wanted launched locally.   As shown below, the remote version requires a bit more.  We supply an array of tuples to addprocs() where each tuple provides the contact point and the number of workers we want there.  In this example we spawn 2 workers on one remote node and one worker on the other.  We also provide the local location of the private key (here called pubkey) in the sshflags argument.

We also want each worker to have the Distributed.jl package and another package called “PyCall” which enables calling python library functions.  We demonstrate the python call with a call to socket.hostname() in each worker.   Notice that the remote function invocation returns the “print” output to the master process.  The strange hostnames that are printed are the synthetic host names from the docker containers.

This example did not address performance.  We treat that subject briefly at the end of the next section.

### Channels

In addition to spawning remote tasks Julia supports a remote channel mechanism.   This allows you to declare a channel that can carry messages of a given type and hold them for remote listeners to pickup.  In the example below we declare a remote channel that carries string messages and a function defined for the listeners to use.   The worker can use the “take!()” function to remove an item from the channel and the master uses “put!()” to fill the channel.  The message “-1”  tells the listener to stop listening.

Using the channel mechanism one can use Julia to have a group of workers respond to a queue of incoming messages.   In the Github site for this post we have put an example where the workers take the channel messages and put the results into an Azure table.

## Julia Distributed Computing on Kubernetes

Depending upon your favorite flavor of cloud there are usually three or four ways to spin up a cluster of nodes to run a distributed computation.   The most basic way to do this is to launch a group of virtual machines where each has the needed resources for the problem you want to solve.   For Julia, the best thing to do is launch instances of a ‘data science VM” that is available on AWS or Azure.  There are two things your VM needs: an installation of Julia version 1.0.0 or later and the ability to ssh to it without using a password.

Here is how to do this on Azure.   First, run “ssh-keygen” and it will step you through the process of generating a key-pair and give you the option of having no password.  Then from the Azure portal select “create a resource” and search for Linux data science VM.   As you go through the installation process when it asks for a key, paste in the public key you just generated.   When the VM is up you can ssh to it to verify that it has the needed version of Julia installed by typing “Julia” at the command line.  If it can’t find Julia you will need to download and install it.   While you are logged and running Julia, you should also install some of the libraries your distributed program will need.  For example, Distributed.jl and PyCall.jl or any package specific to your application.   If you have a large cluster of VMs, this is obviously time consuming.

A better solution is to package your worker as a Docker container and then use Kubernetes to manage the scale-out step.   We have set up a docker container dbgannon/juliacloud2 in the docker hub that was be used for the following experiments.  This container is based on Jupyter/datascience-notebook so it has a version of Jupyter and Julia 1.0.0 already installed.    However to make it work has a Julia distributed worker it must be running the OpenSsh daemon sshd. A passwordless key pair has been preloaded into the appropriate ssh directory.  We have also installed the Jullia libraries Distributed.jl, PyCall.jl and IJulia.jl.   PyCall is needed because we need to call some python libraries and Ijulia.jl is critical for running Julia in Jupyter.  We have included all the files needed to build and test this container in Github.

## Launching the container directly on your local network

The docker command to launch the container on your local network is

docker run -it -d -p 3456:22 -p 8888:8888  dbgannon/juliacloud2


This exposes the ssh daemon on port 3456 and, if you run the jupyter notebook that is on port 8888.  To connect to the server you will need the ssh key which is found on the Github  site.   (If you wish to use your own passwordless keypair you will need to rebuild the docker container using the files in Github. Just replace pubkey and pubkey.pub. and rebuild the container.)   To connect to the container and start Jupyter use the key pubkey.

ssh -i pubkey jovyan@localhost -p 3456
…

# Conclusion

Cloud data centers are getting high performance networks (with latencies of only a few microseconds in the case of Azure Brainwave) and immense computing capacity such as the tensor processing capability of Google’s TPU.  At the same time designers of supercomputers are having to deal with more failure resilience and complexity in the design of the first exascale supercomputers.  For the next generation exascale systems the nodes will be variations on a theme of multicore and GPU-style accelerators.

Observed from a distance, one might conclude the architectures of cloud data centers and the next generation of supercomputers are converging.  However, it is important to keep in mind that the two are designed for different purposes. The cloud is optimized for fast response for services supporting many concurrent globally distributed clients. Supers are optimized for exceptionally fast execution of programs on behalf of a small number of concurrent users.   However, it may be the case that an exascale system may be so large that parts of it can run many smaller parallel jobs at once.  Projects like Singularity provide a solution for running containerized application on supercomputers in a manner similar to the way microservices are run on a cloud.

## Possible Futures

### The continuum: edge+cloud+supercomputer

There are interesting studies showing how supercomputers are very good at training very large, deep neural networks.  Specifically, NERSC scientists have show the importance of this capability in many science applications[4]. However, if you need to perform inference on models that are streamed from the edge you need the type of edge+cloud strategy described here.   It not hard to imagine scenarios where vast numbers of instrument streams are handled by the edge and fed to inference models on the cloud and those models are being continuously improved on a back-end supercomputer.

### A data garden

In the near future, the most important contribution clouds can make to science is to provide access to important public data collections.  There is already reasonable start.   AWS has an opendata registry that has 57 data sets covering topics ranging from astronomy to genomics.   Microsoft Research has a Data Science for Research portal with a curated collection of datasets relating to human computer interaction, data mining, geospatial, natural language processing and more.  Google cloud has a large collection of public genomics datasets.  The US NIH has launch three new cloud data and analytics projects.  They include the Cancer Genomics Cloud led by the Institute for Systems Biology with Google’s cloud, FireCloud from the Broad Institute also using Google’s cloud and Cancer Genomics Cloud (CGC), powered by Seven Bridges.   These NIH facilities also provide analytics frameworks designed to help research access and effective use the resources.

I am often asked about research challenges in cloud computing that student may wish to undertake.   There are many.  The fact that the IEEE cloud computing conference being held in San Francisco in July received nearly 300 submissions shows that the field is extremely active.   I find the following topics very interesting.

1. Find new ways to extract knowledge from the growing cloud data garden.   This is a big challenge because the data is so heterogeneous and discovery of the right tool to use to explore it requires expert knowledge.  Can we capture that community knowledge so that non-experts can find their way?  What are the right tools to facility collaborative data exploration?
2. There are enormous opportunities for systems research in the edge-to-cloud-to-supercomputer path.  How does one create a system to manage and optimize workflows of activities that span this continuum?  Is there a good programming model for describing computations involving the edge and the cloud?  Can a program be automatically decomposed into the parts that are best run on the edge and the parts on cloud?  Can such a decomposition be dynamically adjusted to account for load, bandwidth constraints, etc.?
3. Concerning the intelligent assistant for research, there are a number of reasonable projects short of build the entire thing.  Some may be low hanging fruit, and some may be very hard.  For example, ArXiv, Wikipedia and Google search and Bing are great for discovery but in different ways.   Handling complex queries like “what is the role of quantum entanglement in the design of a quantum computer?” should lead to a summary of the answer with links.   There is a lot of research on summarization and there are a lot of sources of data.  Another type of query is “How can I access data on genetic indicators related to ALS?”  Google will go in the right direction, but it takes more digging to find data.

These are rather broad topics, but progress on even the smallest part may be fun.

[1] L. Ramakrishnan, P. T. Zbiegel, S. Campbell, R. Bradshaw, R. S. Canon, S. Coghlan, I. Sakrejda, N. Desai, T. Declerck, and A. Liu. Magellan: Experiences from a science cloud. In 2nd International Workshop on Scientiﬁc Cloud Computing, pages49–58., ACM, 2011.

P. Mehrotra, J. Djomehri, S. Heistand, R. Hood, H. Jin, A. Lazanoﬀ, S. Saini, and R. Biswas. Performance evaluation of Amazon EC2 for NASA HPC applications. In 3rd Workshop on Scientiﬁc Cloud Computing, pages 41–50. ACM, 2012

[3] https://blogs.microsoft.com/green/2018/05/23/achievement-unlocked-nearly-200-million-images-into-a-national-land-cover-map-in-about-10-minutes/  from Lucas Joppa – Chief Environmental Scientist, Microsoft

# Parallel Programming in the Cloud with Python Dask

I am always looking for better ways to write parallel programs.  In chapter 7 of our book “Cloud Computing for Science and Engineering” we looked at various scalable parallel programming models that are used in the cloud.   We broke these down into five models: (1) HPC-style “Single Program Multiple Data” (SPMD) in which a single program communicates data with copies of itself running in parallel across a cluster of machines, (2) many task parallelism that uses many nearly identical workers processing independent data sets, (3) map-reduce and bulk synchronous parallelism in which computation is applied in parallel to parts of a data set and intermediate results of a final solution are shared at well defined, synchronization points,  (4) graph dataflow transforms a task workflow graph into sets of parallel operators communicating according to the workflow data dependencies and (5) agents and microservices  in which a set of small stateless services process incoming data messages and generate messages for other microservices to consume.  While some applications that run in the cloud are very similar to the batch style of HPC workloads, parallel computing in the cloud is often driven by different classes application requirements.  More specifically, many cloud applications require massive parallelism to respond external events in real time.  This includes thousands of users that are using apps that are back-ended by cloud compute and data.   It also includes applications that are analyzing streams of data from remote sensors and other instruments.   Rather than running in batch-mode with a start and end, these applications tend to run continuously.

A second class of workload is interactive data analysis.   In these cases, a user is exploring a large collection of cloud resident data.   The parallelism is required because the size of the data: it is too big to download and if you could the analysis would be too slow for interactive use.

We have powerful programming tools that can be used for each of the parallel computing models described above but we don’t have a single programming tool that support them all.   In our book we have used Python to illustrate many of the features and services available in the commercial clouds.  We have taken this approach because Python and Jupyter are so widely used in the science and data analytics community.  In 2014 the folks at Continuum (now just called Anaconda, Inc) and a several others released a Python tool called Dask which supports a form of parallelism similar to at least three of the five models described above.  The design objective for Dask is really to support parallel data analytics and exploration on data that was too big to keep in memory.   Dask was not on our radar when we wrote the drafts for our book,  but it certainly worth discussing now.

This is not intended as a full Dask tutorial.   The best tutorial material is the on-line YouTube videos of talks by Mathew Rocklin from Anaconda.   The official  tutorials from Anaconda are also available.  In the examples we will discuss here we used three different Dask deployments.  The most trivial (and the most reliable) deployment was a laptop installation.  This worked on a Windows 10 PC and a Mac without problem.  As Dask is installed with the most recent release of Anaconda, simply update your Anaconda deployment and bring up a Jupyter notebook and “import dask”.    We also used the same deployment on a massive Ubuntu linux VM on a 48 core server on AWS.  Finally, we deployed Dask on Kubernetes clusters on Azure and AWS.

Our goal here is to illustrate how we can use Dask to illustrate several of the cloud programming models described above.    We begin with many task parallelism, then explore bulk synchronous and a version of graph parallelism and finally computing on streams.  We say a few words about SPMD computing at the end, but the role Dask plays there is very limited.

## Many Task Parallelism and Distributed Parallel Data Structures

Data parallel computing is an old important concept in parallel computing.  It describes a programming style where a single operation is applied to collections of data as a single parallel step. A number of important computer architectures supported data parallelism by providing machine instructions that can be applied to entire vectors or arrays of data in parallel.  Called Single instruction, multiple data (SIMD) computers, these machines were the first supercomputers and included the Illiac IV and the early Cray vector machines.  And the idea lives on as the core functionality of modern GPUs.   In the case of clusters computers without a single instruction stream we traditionally get data parallelism by distributed data structures over the memories of each node in the cluster and then coordinating the application of the operation in a thread on each node in parallel.   This is an old idea and it is central to Hadoop, Spark and many other parallel data analysis tools.   Python already has a good numerical array library called numpy, but it only supports sequential operations for array in the memory of a single node.

Dask computations are carried out in two phases.   In the first phase the computation is rendered into a graph where the nodes are actual computations and the arcs represent data movements.   In the second phase the graph is scheduled to run on a set of resources.  This is illustrated below.  We will return to the details in this picture later.

Figure 1.  Basic Dask operations: compile graph and then schedule on cluster

There are three different sets of “resources” that can be used.   One is a set of threads on the host machine.   Another is a set of process and the third is a cluster of machines.   In the case of threads and local processes the scheduling is done by the “Single machine scheduler”.   In the case of a cluster it called the distributed cluster.  Each scheduler consumes a task graph and executes it on the corresponding host or cluster.   In our experiments we used a 48 core VM on AWS for the single machine scheduler. In the cluster case the preferred host is a set of containers managed by Kubernetes.   We deployed two Kubernetes clusters:  a three node cluster on Azure and a 6 node cluster on AWS.

Python programmers are used to numpy arrays, so Dask takes the approach to distributing arrays by maintaining as much of the semantics of numpy as possible.  To illustrate this idea consider the following numpy computation that creates a random 4 by 4 array, then zeros out all elements lest than 0.5 and computes the sum of the array with it’s transpose.

x = np.random.random((4,4))
x[x<0.5] = 0
y = x+x.T


We can use Dask to make a distributed version of the same matrix and perform the same computations in parallel.

Import dask.array as da
x = da.random.random(size = (4,4), chunks =(4,1))
x[x<0.5] = 0
y = x+x.T


The important new detail here is that we give explicit instructions on how we want the array to be distributed by specifying the shape of the chunks on each node.   In this case we have said we want each “chunk” to be a 4×1 slice of the 4×4 array.   We could have partitioned it into square blocks of size 2×2.   Dask takes care of managing each chunk and the needed communication between the processes that handle each chunk.   The individual chunks are managed on each thread/process/worker as numpy arrays.

As stated above, there are two parts to a dask computation.   The first phase is the construction of a graph representing the computation involving each chunk. We can actually take a look at the graph.   For example, in the computation above we can use the “visualize()” method as follows.

y = x+x.T
y.visualize()


Figure 2.   Sample Dask Graph for x+x.T

The nodes represent data or operations and the lines are data movements from one node to another.  As can be seen this is a rather communication intensive graph.   This is becase the transpose operation requires element on the rows (which are distributed) must be moved to columns on the appropriate node to do the addition.  The way we chunck the array can have a huge impact on the complexity of the distributed computation.  For example, 2×2 chuncking makes this one very easy.   There are 4 chunks and doing the transpose involves only a simple swap of the “off diagonal” chunks.   In this case the graph is much simpler (and easier to read!)

Figure 3.  Task graph for x+x.T with 2×2 chunking of data

The second step for Dask is to send the graph to the scheduler to schedule the subtasks and execute them on the available resources. That step is accomplished with a call to the compute method.

y.compute()


Dask arrays support almost all the standard numpy array operations except those that involve complex communications such as sorting.

In addition to numpy-style arrays, Dask also has a feature called Dask dataframes that are distributed versions of Pandas dataframes.   In this case each Dask dataframe is partitioned by blocks of rows where each block is an actual Pandas dataframe.  In other words, Dask dataframes operators are wrappers around the corresponding Pandas wrappers in the same way that Dask array operators are wrappers around the corresponding numpy array operators.    The parallel work is done primarily by the local Pandas and Numpy operators working simultaneously on the local blocks and this is followed by the necessary data movement and computation required to knit the partial results together.  For example, suppose we have a dataframe, df, where each row is a record consisting of a name and a value and we would like to compute the sum of the values associated with each name.   We assume that names are repeated so we need to group all records with the same name and then apply a sum operator.  We set this up on a system with three workers.  To see this computational graph we write the following.

df.groupby(['names']).sum().visualize()


Figure 4.  Dataframe groupby reduction

Figure 5.  Processing Yellow Cab data for New York City

The persist method moves the dataframe into memory as a persistent object that can be reused without being recomputed.  (Note:  the read_cvs method did not work on our kubernetes clusters because of a missing module s3fs in the dask container, but it did work on our massive shared memory VM which has 200 GB of memory.)

Having loaded the data we can now follow the dask demo example and compute the best hour to be a taxi driver based on the fraction of tip received for the ride.

Figure 6.  New York City cab data analysis.
As you can see, it is best to be a taxi driver about 4 in the morning.

A more general distributed data structure is the Dask Bag that can hold items of less structured type than array and dataframes.   A nice example http://dask.pydata.org/en/latest/examples/bag-word-count-hdfs.html illustrates using Dask bags to explore the Enron public email archive.

One of the more interesting Dask operators is one that implements a version of the old programming language concept of a future   A related concept is that of lazy evaluation and this is implemented with the dask.delayed function.   If you invoke a function with the delayed operator it simply builds the graph but does not execute it.  Futures are different.    A future is a promise to deliver the result of a computation later.  The future computation begins executing but the calling thread is handed a future object which can be passed around as a proxy for the result before the computation is finished.

The following example is a slightly modified version of one of the demo programs.   Suppose you have four functions

def foo(x):
return result
def bar(x):
return result
def linear(x, y):
return result
def three(x, y, z):
return result


We will use the distributed scheduler to illustrate this example. We first must create a client for the scheduler. Running this on our Azure Kubernetes cluster we get the following.


c = Client()
c


To illustrate the delayed interface, let us build a graph that composes our example functions

from dask import visualize, delayed
i = 3
x = delayed(foo)( I )
y = delayed(bar)( x )
z = delayed(linear)(x, y)
q = delayed(three)( x, y, z)
q.visualize(rankdir='LR')


In this example q is now a placeholder for the graph of a delated computation.   As with the dask array examples, we can visualize the graph (plotting it from Left to Right).

Figure 7.  Graph of a delayed computation.

A call to compute will evaluate our graph.   Note that we have implemented the  four functions each with about 1 second of useless computational math (computing the sum of a geometric series) so that we can measure some execution times.   Invoking compute on our delayed computation gives us

which shows us that there is no parallelism exploited here because the graph has serial dependences.

To create a future, we “submit” the function and its argument to the scheduler client.  This immediately returns a reference to future value and starts the computation.  When you need the result of the computation the future has a method “result()” that can be invoked and cause the calling thread to wait until the computation is done.

Now let us consider the case where the we need to evaluate this graph on 200 different values and then sum the results.   We can use futures to kick off a computation for each instance and wait for them to finish and sum the results.   Again, following the example in the Dask demos, we ran the following on our Azure Kubernetes cluster:

Ignore the result of the computation (it is correct). The important result is the time. Calculating the time to run this sequentially (200*4.19 = 838 seconds) and dividing by the parallel execution time we get a parallel speed-up of about 2, which is not very impressive. Running the same computation on the AWS Kubernetes cluster we get a speed-up of 4. The Azure cluster has 6 cores and the AWS cluster has 12, so it is not surprising that it is twice as fast. The disappointment is that the speed-ups are not closer to 6 and 12 respectively.

Results with AWS Kubernetes Cluster

However, the results are much more impressive on our 48 core AWS virtual machine.

Results with AWS 48-core VM

In this case we see a speed-up of 24.   The difference is the fact that the scheduling is using shared memory and threads.

Dask futures are a very powerful tool when used correctly.   In the example above, we spawned off 200 computations in less than a second.   If the work in the individual tasks is large, that execution time can mask much of the overhead of scheduler communication and the speed-ups can be much greater.

Dask has a module called streamz that implements a basic streaming interface that allows you to compose graphs for stream processing.   We will just give the basic concepts here.   For a full tour look at https://streamz.readthedocs.io.   Streamz graphs have sources,  operators and sinks.   We can start by defining some simple functions as we did for the futures case:

def inc(x):
return x+13
def double(x):
return 2*x
def fxy(x): #expects a tuple
return x[0]+ x[1]
return x+y
from streamz import Stream
source = Stream()


The next step will be to create a stream object and compose our graph.   We will describe the input to the stream later.   We use four special stream operators here.    Map is how we can attach a function to the stream.   We can also merge two streams with a zip operator.   Zip waits until there is an available object on each stream and then creates a tuple that combines both into one object.   Our function fxy(x) above takes a tuple and adds them.   We can direct the output of a stream to a file, database, console output or another stream with the sink operator.  Shown below our graph has two sink operators.

Figure 8.  Streamz stream processing pipeline.

Visualizing the graph makes this clear.   Notice there is also an accumulate operator.   This allows state flowing through the stream to be captured and retained.   In this case we use it to create a running total.  To push  something into the stream we can use the emit() operator as shown below.

The emit() operator is not the only way to send data into a stream. You can create the stream so that it takes events from kafka, or reads lines from a file or it can monitor a file system directory looking for new items. To illustrate that we created another stream to look at the home director of our kubernetes cluster on Azure. Then we started this file monitor. The names of the that are there are printed. Next, we added another file “xx” and it picked it up. Next, we invoked the stream from above and then added another file “xxx”.

# Handling Streams of Big Tasks

Of the five types of parallel programming Dask covers 2 and a half:  many task parallelism, map-reduce and bulk synchronous parallelism and part of graph dataflow.   Persistent microservices  are not part of the picture.   However, Dask and Streamz can be used together to handle one of the use cases for microservices.  For example, suppose you have a stream of tasks and you need to do some processing on each task but the arrival rate of tasks exceed the rate at which you can process them.   We treated this case with Microservices while processing image recognition with MxNet and the resnet-152 deep learning model (see this article.)  One can  use the Streams sink operation to invoke a future to spawn the task on the Kubernetes  cluster.   As the tasks finish the results can be pushed to other processes for further work or to a table or other storage as illustrated below.

Figure 9 Extracting parallelism from a stream.

In the picture we have a stream called Source which gathers the events from external sources.  We then map it to a function f() for initial processing. The result of that step is sent to a function called spawn_work which creates a future around a function that does some deep processing and sends a final result to an AWS DynamoDB table.   (The function putintable(n) below shows an example.  It works by invoking a slow computation then create the appropriate DynamoDB metadata and put the item in the table “dasktale”.)

def putintable(n):
import boto3
e = doexp(n*1000000)
dyndb = boto3.resource('dynamodb', … , region_name='us-west-2' )
table.put_item(Item= item )
return e

def spawn_work(n):
x = cl.submit(putintable, n)


This example worked very well. Using futures allowed the input stream to work at full speed by exploiting the parallelism. (The only problem is that boto3 needs to be installed on all the kubernetes cluster processes. Using the 48 core shared memory machine worked perfectly.)
Dask also has a queue mechanism so that results from futures can be pushed to a queue and another thread can pull these results out. We tried as well, but the results were somewhat unreliable.

# Conclusion

There are many more stream, futures, dataframe and bag operators that are described in the documents.   While it is not clear if this stream processing tool will be robust enough to replace any of the other systems current available, it is certainly a great, easy-to-use teaching tool.   In fact, this statement can be made about the entire collection of Dask related tools.   I would not hesitate to use it in an undergraduate course on parallel programming.   And I believe that Dask Dataframes technology is very well suited to the challenge of big data analytics as is Spark.

The example above that uses futures to extract parallelism from a stream challenge is interesting because it is completely adaptive. However, it is essential to be able to launch arbitrary application containers from futures to make the system more widely applicable.   Some interesting initial work has been done on this at the San Diego Supercomputer center using singularity to launch jobs on their resources using Dask.   In addition the UK Met Office is doing interesting things with autoscaling dask clusters.   Dask and StreamZ are still young.   I expect them to continue to evolve and mature in the year ahead.

# Cloud-Native Applications – call for papers

The IEEE Cloud Computing Journal is going to publish a special issue on the topic of Cloud-Native applications. This is an extremely interesting topic and it cuts to the heart of what makes the cloud a platform that is, in many ways, fundamentally different from what we have seen before.

What is “cloud-native”? That is what we want you to tell us. Roger Barga from Amazon, Neel Sundaresan from Microsoft and this blogger have been invited to be guest editors. But we want you to tell us soon. The deadline is March 1, 2017. The papers do not need to be long (3,000 to 5,000 words) and some of the topics possible include:

• Frameworks to make it easier for industry to build cloud-native applications;
• Educational approaches and community based organizations that can promote cloud-native design concepts;
• The role of open source for building cloud-native applications;
• VM and container orchestration systems for managing cloud-native designs;
• Efficient mechanisms to make legacy applications cloud-native;
• Comparing applications – one cloud-native and the other not – in terms of performance, security, reliability, maintainability, scalability, etc.;

# CNTK Revisited. A New Deep Learning Toolkit Release from Microsoft

In a pair of articles from last winter (first article, second article) we looked at Microsoft’s “Computational Network Toolkit” and compared it to Google’s Tensorflow.   Microsoft has now released a major upgrade of the software and rebranded it as part of the Microsoft Cognitive Toolkit.  This release is a major improvement over the initial release.  Because these older articles still get a fair amount of web traffic we wanted to provide a proper update.

There are two major changes from the first release that you will see when you begin to look at the new release.   First is that CNTK now has a very nice Python API and, second, the documentation and examples are excellent.   The core concepts are the same as in the initial release.   The original programming model was based on configuration scripts and that is still there, but it has been improved and renamed as “Brain Script”.  Brain Script is still an excellent way to build custom networks, but we will focus on the Python API which is very well documented.

Installing the software from the binary builds is very easy on both Ubuntu Linux and Windows.   The process is described in the CNTK github site.    On a Linux machine, simply download the gziped tared binary and execute the installer.

$wget https://cntk.ai/'BinaryDrop/CNTK-2-0-beta2-0-Linux-64bit-CPU-Only.tar.gz’$tar -xf CNTK-2-0-beta2-0-Linux-64bit-CPU-Only.tar.gz
$cd cntk/Scripts/linux$./install-cntk.sh


This will install everything including a new version of Continuum’s Anaconda Python distribution.  It will also create a directory called “repos’’.   To start Jupyter in the correct conda environment do the following.

$source “your-path-to-cntk/activate-cntk"$cd ~/repos/cntk/bindings/python/tutorials
$Jupyter notebook  A very similar set of commands will install CNTK on your Windows 10 box. (If you are running Jupyter on a virtual machine or in the cloud you will need additional arguments to the Jupyter notebook command such as “-ip 0.0.0.0 –no browser” and then then you can navigate you host browser to the VM ip address and port 8888. Of course, if it is a remote VM you should add a password. ) What you will see is an excellent set of tutorials as shown in Figure 1. Figure 1. CNTK tutorial Jupyter notebooks. ## CNTK Python API CNTK is a tool for building networks and the Python and Brain Script bindings are very similar in this regard. You use the Python program to construct a network of tensors and then train and test that network through special operations which take advantage of underlying parallelism in the hardware such as multiple cores or multiple GPUs. You can load data into the network through Python Numpy arrays or files. The concept of constructing a computation graph for later execution is not new. In fact, it is an established programming paradigm used in Spark, Tensorflow, and Python Dask. To illustrate this in CNTK consider the following code fragment that creates two variables and a constructs a trivial graph that does matrix vector multiplication and vector addition. We begin by creating three tensors that will hold the input values to the graph and then tie them to the matrix multiply operator and vector addition. import numpy as np import cntk X = cntk.input_variable((1,2)) M = cntk.input_variable((2,3)) B = cntk.input_variable((1,3)) Y = cntk.times(X,M)+B  In this X is a 1×2 dimensional tensor, i.e. a vector of length 2 and M is a matrix that is 2×3 and B is a vector of length 3. The expression Y=X*M+B yields a vector of length 3. However, no computation has taken place. We have only constructed a graph of the computation. To invoke the graph we input values for X, B and M and then apply the “eval’’ operator on Y. We use Numpy arrays to initialize the tensors and supply a dictionary of bindings to the eval operator as follows x = [[ np.asarray([[40,50]]) ]] m = [[ np.asarray([[1, 2, 3], [4, 5, 6]]) ]] b = [[ np.asarray([1., 1., 1.])]] print(Y.eval({X:x, M: m, B: b})) array([[[[ 241., 331., 421.]]]], dtype=float32)  CNTK has several other important tensor containers but two important ones are • Constant(value=None, shape=None, dtype=None, device=None, name=”): a scalar, vector or other multi-dimensional tensor. • Parameter(shape=None, init=None, dtype=None, device=None, name=”): a variable whose value is modified during network training. There are many more tensor operators and we are not going to go into them here. However, one very important class is the set of operators that can be used to build multilevel neural networks. Called the “Layers Library’’ they form a critical part of CNTK. One of the most basic is the Dense(dim) layer which creates a fully connected layer of output dimension dim. As shown in Figure 2. Figure 2. A fully connected layer created by the Dense operator with an implicit 3×6 matrix and a 1×6 vector of parameters labeled here M and B. The input dimension is taken from the input vector V. The activation here is the default (none), but it could be set to ReLu or Sigmod or another function. There are many standard layer types including Convolutional, MaxPooling, AveragePooling and LSTM. Layers can also be stacked with a very simple operator called “sequential’’. Two examples taken directly from the documentation is a standard 4 level image recognition network based on convolutional layers. with default_options(activation=relu): conv_net = Sequential ([ # 3 layers of convolution and dimension reduction by pooling Convolution((5,5), 32, pad=True), MaxPooling((3,3), strides=(2,2)), Convolution((5,5), 32, pad=True), MaxPooling((3,3), strides=(2,2)), Convolution((5,5), 64, pad=True), MaxPooling((3,3), strides=(2,2)), # 2 dense layers for classification Dense(64), Dense(10, activation=None) ])  The other fun example is a slot tagger based on a recurrent LSTM network. tagging_model = Sequential ([ Embedding(150), # embed into a 150-dimensional vector Recurrence(LSTM(300)), # forward LSTM Dense(labelDim) # word-wise classification ])  The Sequential operator can be thought of as a concatenation of the layers that in the given sequence. In the case of the slot tagger network, we see two additional important operators: Embedding and Recurrence. Embedding is used for word embeddings where the inputs are sparse vectors of size equal to the word vocabulary (item i = 1 if the word is the i-th element of the vocabulary and 0 otherwise) and the embedding matrix is of size vocabulary-dimension by, in this case, 150. The embedding matrix may be passed as a parameter or learned as part of training. The Recurrence operator is used to wrap the correct LSTM output back to the input for the next input to the network. ## A Closer Look at One of Tutorials. The paragraphs above are intended to give you the basic feel of what CNTK looks like with its new Python interface. The best way to learn more is to study the excellent example tutorials. ### CNTK 203: Reinforcement Learning Basics CNTK version 1 had several excellent tutorials, but version 2 has the Python notebook versions of these plus a few new ones. One of the newest demos is an example of reinforcement learning. This application of Neural Nets was first described in the paper Human-level control through deep reinforcement learning, by the Google DeepMind group. This idea has proven to be very successful in systems that learn to play games. This topic has received a lot of attention, so we were happy to see this tutorial included in CNTK. The example is a very simple game that involves balancing a stick. More specifically they use the cart-pole configuration from OpenAI. As shown in figure 3, the system state can be described by a 4-tuple: position of the cart, its velocity, the angle of the pole and the angular velocity. The idea of the game is simple. You either push the cart to the left or the right and see if you can keep the stick vertical. If you drift too far off course or the pole angle goes beyond an angle of 15 degrees, the game is over. Your score is the total number of steps you take before failure. The full example is in the github repository and we are not going to go through all the code here. The Jupyter notebook for this example is excellent, but if you are new to this topic you may find some additional explanation of value in case you decide to dig into it. Figure 3. Cart-Pole game configuration. The part of reinforcement learning used here is called a Deep Q-Network. It uses a neural network to predict the best move when the cart is in a given state. This is done by implicitly modeling a function Q(s,a) which is the optimal future reward given state s and the action is a and where the initial reward is r. They approximate Q(s,a) using the “Bellmann equation” which describes how to choose action a in a given state s to maximize the accumulated reward over time based inductively on the same function applied to the following states s’. The parameter gamma is a damping factor that guarantees the recurrence converges. (Another excellent reference for this topic is the blog by Jaromír Janisch.) The CNTQ team approached this problem as follows. There are three classes. • Class Brain. This hold our neural net and trainer. There are three methods • Create() which is called at initialization. It creates the network. There are two tensor parameters: observation, which is used to hold the input state and q_target which is a tensor used for training. The network is nice and simple: l1 = Dense(64, activation=relu) l2 = Dense(2) unbound_model = Sequential([l1, l2]) model = unbound_model(observation)  The training is by the usual stochastic gradient descent based on a loss measure. loss = reduce_mean(square(model - q_target), axis=0) meas = reduce_mean(square(model - q_target), axis=0) learner = sgd(model.parameters, lr, gradient_clipping_threshold_per_sample=10) trainer = Trainer(model, loss, meas, learner)  • Train(x, y) which calls the trainer for batches of states x and predicted outcomes y which we will describe below • Predict(s) which invokes the model for state ‘s’ and returns a pair of optimal rewards given a left or right move. • Class Memory. This hold a record of recent moves. This is used by the system to create training batches. There are two methods • Add(sample configuration) – adds a four tuple consisting of a starting state, an action and a result and a resulting state tuple to a memory. • Sample(n) returns a random sample of n configurations samples from the memory. • Class Agent which is the actor that picks the moves and uses the memory to train the network. There are three methods here. • Act(state) returns a 0 or 1 (left move or right move) that will give the best reward for the given state. At first it just makes random guesses, but as time passes it uses the Predict method of the Brain class to select the best move for the given state. • Observe(sample configuration) records a configuration in the memory and keeps track of the time step and another parameter used by act. • Replay() is the main function for doing the learning. This is the hardest part to understand in this tutorial. It works by grabbing a random batch of memorized configurations from memory. What we will do is use the current model to predict an optimal outcome and use that as the next step in training the model. More specifically for each tuple in the batch we want to turn it into a training sample so that the network behaves like the Bellmann equation. A tuple consists of the start state, the action, the reward and the following state. We can apply our current model to predict the award for the start state and also for the result state. We can use this information to create a new reward tuple for the given action and start state that models the Bellmann recurrence. Our training example is the pair consisting of the start state and this newly predicted reward. At first this is a pretty poor approximation, but amazingly over time it begins to converge. The pseudo code is shown below. x = numpy.zeros((batchLen, 4)).astype(np.float32) y = numpy.zeros((batchLen, 2)).astype(np.float32) for i in range(batchLen): s, a, r, s_ = batch[i] # s = the original state (4 tuple) # a is the action that was taken # r is the reward that was given # s_ is the resulting state. # let t = the reward computed from current network for s # and let r_ be the reward computed for state s_. # now modify t[a] = r + gamma* numpy.amax(r_) # this last step emulated the bellmann equation x[i] = s y[i] = t self.brain.train(x,y)  The final part of the program is now very simple. We have an environment object that returns a new state and a done flag for each action the agent takes. We simply run our agent until it falls out of bounds (the environment object returns done=True). If the step succeeded, we increment our score. The function to run the agent and to keep score is shown below. def run(agent): s = env.reset() R = 0 while True: a = agent.act(s) s_, r, done, info = env.step(a) if done: # terminal state s_ = None agent.observe((s, a, r, s_)) agent.replay() #learn from the past s = s_ R += r if done: return R  Each time we run “run” it learns a bit more. After about 7000 runs it will take over 600 steps without failure. The text above is no substitute for a careful study of the actual code in the notebook. Also, as it is a notebook, you can have some fun experimenting with it. We did. ## Final Thoughts CNTK is now as easy to use as any of the other deep learning toolkits. While we have not benchmarked its performance they claim it is extremely fast and it make good use of multiple GPUs and even a cluster of servers. We are certain that the user community will enjoy using and contributing to its success. ## Citation. The team that first created CNTK should be cited. I know there are likely many others that have contributed to the open source release in one way or another, but the following is the master citation. Amit Agarwal, Eldar Akchurin, Chris Basoglu, Guoguo Chen, Scott Cyphers, Jasha Droppo, Adam Eversole, Brian Guenter, Mark Hillebrand, T. Ryan Hoens, Xuedong Huang, Zhiheng Huang, Vladimir Ivanov, Alexey Kamenev, Philipp Kranen, Oleksii Kuchaiev, Wolfgang Manousek, Avner May, Bhaskar Mitra, Olivier Nano, Gaizka Navarro, Alexey Orlov, Hari Parthasarathi, Baolin Peng, Marko Radmilac, Alexey Reznichenko, Frank Seide, Michael L. Seltzer, Malcolm Slaney, Andreas Stolcke, Huaming Wang, Yongqiang Wang, Kaisheng Yao, Dong Yu, Yu Zhang, Geoffrey Zweig (in alphabetical order), “An Introduction to Computational Networks and the Computational Network Toolkit“, Microsoft Technical Report MSR-TR-2014-112, 2014. # Kubernetes and the Google Cloud Container Service: Fun with Pods of Celery. In a previous post I talked about using Mesosphere on Azure for scaling up many-tasks parallel jobs and I promised to return to Kubernetes when I figured out how to bring it up. Google just made it all very simple with their new Google Cloud container services. And, thanks to their good tutorials, I learned about a very elegant way to do remote procedure calls using another open source tool called Celery. So let me set the stage with a variation on an example I have used in the past. Suppose we have 10000 scientific documents that are stored in the cloud. I would like to use a simple machine learning method to classify each of these by topic. I would like to do this quickly as possible and, because the analysis of each document is independent of the others, I can try to process as many as possible in parallel. This is the basic “many task” parallel model and one of the most common uses of the cloud for scientific computing purposes. To do this we will use the Celery distributed task queue mechanism to take a list of our documents and send each one to a work queue where the tasks will be parceled out to workers who will do the analysis and respond. ## The Google Cloud Container Service and a few words about Kubernetes. Before getting into the use of Celery and the analysis program, let’s describe the Google Cloud Container Service and a bit about Kubernetes. Getting started is incredibly easy. Google has a small free trial account which is sufficient to do the experiments described. Go to http://cloud.google.com and sign in or create an account. This will take you to the “console” portal. The first thing you need to do is to create a project. In doing so it will be assigned an id which is a string of the form “silicon-works-136723”. There is a drop down menu on the left end of the blue banner at the top of the page. (Look for three horizontal bars.) This allows you to select the type of service you want to work on. Select the “Container Engine”. On the “container clusters” page there is a link that will allow you to create a cluster. With the free account you cannot make a very big cluster. You are limited to about 4 dual core servers. If you fill in the form and submit it, you will soon have a new cluster. There is a special icon of the form “>_” in the blue banner. Clicking on that icon will create an instance of a “Cloud Shell” that will be automatically authenticated to your account. The page you will see should resemble Figure 1 below. The next thing you need to do is to authenticate your cloud shell with your new cluster. By selecting your container and clicking on the “connect” button to the right you will get the code to paste into the cloud shell. The result should now look exactly like Figure 1. Figure 1. Creating a Google cloud cluster and connecting the cloud shell to it. Interacting with Kubernetes, which is now running on our small cluster, is through command lines which can be entered into the cloud shell. Kubernetes has a different, and somewhat more interesting architectures than other container management tools. The basic unit of scheduling in Kubernetes is launching pods. A pod consists of a set of one or more Docker-style containers together with a set of resources that are shared by the containers in that pod. When launched a pod resides on a single server or VM. This has several advantages for the containers in that pod. For example, because the containers in a pod are all running on the same VM, the all share the same IP and port space so the containers can find each other through conventional means like “localhost”. They can also share storage volumes that are local to the pod. To start let’s consider a simple single container pod to run the Jupiter notebook. There is a standard Docker container that contains Jupyter and the scipy software stack. Using the Kubernetes control command kubectl we can launch Jupyter and expose its port 8888 with the following statement. $ kubectl run jupyter --image=jupyter/scipy-notebook --port=8888


To see that it is up and running we can issue the command “kubectl get pods” which will return the status of all of our running pods.   Though we have launched jupyter it is still not truly visible.   To do that we will associate a load balancer with the pod.   This will expose the port 888 to the open Internet.

$kubectl expose deployment jupyter --type=LoadBalancer  Once that has been run you can get the IP address for jupyter from the “LoadBalancer Ingress:” field of the service description when you run the following. If it doesn’t appear, try again. $ kubectl describe services jupyter


One you have verified that it is working at that address on port 8888, you should shut it down immediately because, as you can see, there is no security with this deployment.  Deleting a deployment is easy.

$kubectl delete deployment jupyter  There is another point that one must be aware of when building containers that need to directly interact with the google cloud APIs. To make this work you will need to get application default credentials to run in your container. For example if you container is going to interact with the storage services you will need this. To get the default application credentials follow the instructions here. We will say a few more words about this below. ## The Analysis Example in detail. Now to describe Celery and how to use Celery and Kubernetes in the many-task scenario described above. To use Celery we start with our analysis program. We have previously described the analysis algorithms in detail in another post, so we won’t duplicate that here. Let’s start by assuming we have a function predict(doc) that takes a document as a string as an argument and returns a string containing the result from our trained machine learnging classifiers. Our categories are “Physics”, “Math”, “Bio”, “Computer Science” and “Finance” and the result from each classifier is simply the category that that classifier determines to be the most likely correct answer. Celery is a distributed remote procedure call system for Python programs. The Celery view of the world is you have a set of worker processes running on remote machines and a client process that is invoking functions that are executed on the remote machines. The workers and the clients all coordinate through a message broker running somewhere else on the network. Here we use a RabbitMQ service that is running on a Linux VM on the NSF JetStream cloud as illustrated in Figure 2. Figure 2. Experimental Configuration with Celery workers running on Kubernetes in the Google Cloud Container Service, the RabbitMQ broker running in a VM on the NSF Jetstream Cloud and a client program running as a notebook on a laptop. The code block below illustrates the basic Celery worker template. Celery is initialized with a constructor that takes the name of the project and a link to the broker service which can be something like a Redis cache or MongoDB. The main Celery magic is invoked with a special Python “decorator” associated with the Celery object as shown in the predictor.py file below. from celery import Celery app = Celery('predictor', backend='amqp') #Now initialize and load all the data structure that will be constant #and recused for each analysis. In our case this will include #all the machine learning models that were trained on the data #previously. And create a main worker function to invoke the models. def invokeMLModels(statement): .... return analysis #define the functions we will call remotely here @app.task def predict(statement): prediction = invokeMLModels(statement) return [prediction]  What this decorator accomplishes is to wrap the function in a manner that it can be invoked by a remote client. To make this work we need create a Celery worker from our predictor.py file with the command below which registers a worker instance as a listener on the RabbitMQ queue. >celery worker -A predictor -b 'amqp://guest@brokerIPaddr'  Creating a client program for our worker is very simple. It is similar to the worker template except that our version of the predict( ) function does nothing because we are going to invoke it with the special Celery apply_async( ) method that will push the argument to the broker queue and return control immediately to the client. The object that is returned from this call is similar to what is sometimes called a “future” or a “promise” in the programming language literature. What it is a placeholder for the returned value. Once we attempt to evaluate the get() method on this object our client will wait until a reply is returned from the remote worker that picked up the task. from celery import Celery app = Celery('predictor', broker='amqp://guest@brokerIPaddr', backend='amqp') @app.task def predict(statement): return ["stub call"] res = predict.apply_async(["this is a science document ..."]) print res.get()  Now if we have 10000 documents to analyze we can send them in sequence to the queue as follows. #load all the science abstracts into a list documents = load_all_science_abstracts() res = [] for doc in documents: res.append(predict.apply_async([doc]) #now wait for them all to be done predictions = [result.get() for result in res] #now do an analysis of the predictions  Here we push each analysis task into the queue and save the async returned objects in a list. Then we create a new list by waiting for each prediction value to be returned. Our client can run anywhere there is Internet access. For example this one was debugged on a Jupyter instances running on a laptop. All you need to do is “pip install celery” and run Juypter. There is much more to say about Celery and the interested reader should look at the Celery Project site for the definitive guide. Let us now turn to using this with Kubernetes. We must first create a container to hold the analysis code and all the model data. For that we will need a Docker file and a shell script to correctly launch celery one the container is deployed. For those actually interested in trying this, all the files and data are in OneDrive here. The Docker file shown below has more than we need for this experiment. # Version 0.1.0 FROM ipython/scipystack MAINTAINER yourdockername "youremail" RUN easy_install celery RUN pip install -U Sphinx RUN pip install Gcloud RUN easy_install pattern RUN easy_install nltk RUN easy_install gensim COPY bookproject-key.json / COPY models / COPY config / COPY sciml_data_arxiv.p / COPY predictor.py / COPY script.sh / ENTRYPOINT ["bash", "/script.sh"]  To build the image we first put all the machine learning configuration files in a directory called config and all the learned model files in a directory called models. At same level we have the predictor.py source. For reasons we will explain later we will also include the full test data set: sciml_data_arxiv.p. The Docker build starts with the ipython/scipystack container. We then use easy_install to install Celery as well as four packages used by the ML analyzers: pattern, nltk and gensim. Though we are not going to use the Gcloud APIs here, we include them with a pip install. But to make that work we need an updated copy of Sphinx. To make the APIs work we would need our default client authentication keys. They are stored in a json file called bookproject-key.json that was obtained from the Gcloud portal as described previously. Finally we copy all of the files and directory to the root path ‘/’. Note that the copy from a directory is a copy of the all contained files to the path ‘/’ and not to a new directory. The ENTRYPOINT runs our script which is shown below. cp /predictor.py . export C_FORCE_ROOT='true' export GOOGLE_APPLICATION_CREDENTIALS='/bookproject-key.json' echo$C_FORCE_ROOT
celery worker -A predictor -b $1  Bash will run our script in a temp directory, so we need to copy our predictor.py file to that directory. Because our bash is running as root, we need to convince Celery that it is o.k. to do that. Hence we export C_FORCE_ROOT as true. Next, if we were using the Gcloud APIs we need to export the application credentials. Finally we invoke celery but this time we use the -b flag to indicate that we are going to provide the IP address of the RabbitMQ amqp broker as a parameter and we remove it from the explicit reference in the predictor.py file. When run the predictor file will look for all the model and configuration data in ‘/’. We can now build the docker image with the command >docker build -t “yourdockername/predictor” .  And we can test the container on our laptop with >docker run -i -t “yourdockername/predictor ‘amqp://guest@rabbitserverIP’  Using “-i -t” allows you to see any error output from the container. Once it seems to be working we can now push the image to the docker hub. (to use our version directly, just pull dbgannon/predictor) We can now return to our Google cloud shell and pull a version of the container there. If we want to launch the predictor container on the cluster, we can do it one at a time with the “kubectl run” command. However Kubernetes has a better way to do this using a pod configuration file where we can specify the number of pod instances we want to create. In the file below, which we will call predict-job.json we specify a job name, the container image in the docker hub, and the parameter to pass to the container to pass to the shell script. We also specify the number of pods to create. In this case that is 6 as identified in the “parallelism” parameter. apiVersion: batch/v1 kind: Job metadata: name:predict-job spec: parallelism: 6 template: metadata: name: job-wq spec: containers: - name: c image: dbgannon/predictor args: ["amqp://guest@ipaddress_of_rabbitmq_server"] restartPolicy: OnFailure  One command in the cloud shell will now launch six pods each running our predictor container. $kubectl create -f predict-job.json


## Some Basic Performance Observations.

When using many tasks system based on a distributed worker model there are always three primary questions about the performance of the system.

1. What is the impact of wide-area distribution on the performance?
2. How does performance scale with the number of worker containers that are deployed? More specifically, if we N workers, how does the system speed up as N increases?    Is there a point of diminishing return?
3. Is there a significant per/task overhead that the system imposes?  In other words, If the total workload is T and if it is possible to divide that workload into k tasks each  of size T/k , then what is the best value of k that will maximize performance?

Measuring the behavior of a Celery application as a function of the number workers is complicated by a number of factors.   The first concern we had was the impact of widely distributing the computing resources on the overall performance.   Our message broker (RabbitMQ) was running in a virtual machine in Indiana on the JetStream cloud.   Our client was running a Jupyter notebook on a laptop and the workers were primarily on the Midwest Google datacenter and on a few on other machines in the lab.   We compared this to a deployment where all the workers, the message broker and the client notebook were all running together on the Google datacenter.   Much to our surprise there was little difference in performance between the two deployments.   There are two ways to view this result.   One way is to say that the overhead of wide area distribution was not significant.   The other way to say this is that the overhead of wide area distribution was negligible compared to other performance problems.

A second factor that has an impact on performance as a function of the number of workers is the fact that a single Celery worker may have multiple threads that are responding to asynchronous function calls.   While we monitored the execution we noticed that the number of active threads in one worker could change over time. This made performance somewhat erratic.  Celery’s policy is that it will never have more threads than the number of available cores, so to limit the thread variability we ran workers in container pods on VMs with only one core.

Concerning the question of the granularity of the work partitioning we configured the program so that a number of documents could be processed in one invocation and this number could be set remotely.    By taking a set of 1000 documents and a fixed set of workers, we divided the document set into blocks of size K where K ranged from 1 to 100.   In general, larger blocks were better because the number of Celery invocations was smaller, but the difference was not great.   Another factor involved Celery’s scheduling for deciding which worker get the next invocation.   For large blocks this was not the most efficient because this left holes in the execution schedule when workers were occasionally idle while another was over scheduled.   For very small blocks these holes tended to be small.   We found that a value of K=2 gave reasonably consistent performance.

Finally to test scalability we used three different programs.

1. The document topic predictor described above where each invocation classified two documents.
2. A simple worker program that does no computation but just sleeps for 10 seconds before returning a “hello world” string.
3. A worker that computes part of the Euler sequence sum(1/i**2, i=1..n) where n = 109 .   Each worker computes a block of 107 terms of the sequence and the 100 partial results are added together to get the final result  (which approximates pi2/6  to about 7 decimal places).

The document predictor is very computational intensive and uses some rather large data matrices for the trained machine learning models.   The size of these arrays are about 150 megabytes total.  While this does fit in memory, the computation is going to involve a great deal of processor cache flushing and there may be memory paging effects.   The example that computes the Euler sum requires no data other than the starting point index and the size of the block to sum.   It is pure computation and it will have no cache flushing or memory paging effects, but it will keep the CPU very busy.   The “sleep” example leaves the memory and the CPU completely idle.

We ran all three with one to seven workers.   (6 workers using 6 cores from the small Google demo account and one on another other remote machine).    To compare the results, we computed the time for each program on one worker and plotted the speed-up ratio for 2, 3, 4, 5, 6 and 7 workers.   The results are shown in the graphs below.

Figure 3.  Performance as speed-up for each of the three applications with up to 7 workers.

As can be seen, the sleeper scales linearly in the number of workers.   In fact, when executed on multi-core machines it is almost super-linear because of the extra threads that can be used.  (It is very easy for a large number of threads to sleep in parallel.)   On the other hand, the predictor and the Euler examples reached a maximum speed up with around five workers.  Adding more worker pods to the servers did not show improvement because these applications are already very compute intensive.    This was a surprise as we expect all three experiments to scale well beyond seven workers.  Adding more worker pods to the servers did not show improvement because these applications are already very compute intensive.   When looking for the cause of this limited performance, we considered the possibility that the RabbitMQ broker was a bottleneck, but our previous experience with it has allowed us to scale applications to dozens of concurrent reader and writers.   We are also convinced that the Google Container Engine performed extremely well and it was not the source of any of these performance limitations. We suspect (but could not prove) that the Celery work distribution and result gathering mechanisms have overheads that limit scalability as the number of available workers grows.

## Conclusion

Google has made it very easy to deploy containerized applications using Kubernetes on their cloud container service.  Kubernetes has some excellent architectural features that allow multiple containers to be co-located on a single server within a pod.   We did not have time here to demonstrate this, but their documentation gives some excellent examples.

Celery is an extremely elegant way to do remote procedure calls in Python.  One only needs to define the function and annotate it with a Celery object.   It can then be remotely invoked with an asynchronous call that returns control to the caller.  A future like object is returned.  By calling a special method on the returned object the caller will pause until the remote call completes and the value is provided to the caller.

Our experiments demonstrated that Celery has limited scalability if it is used without modification and with the RabbitMQ message broker.   However, celery has many parameters and it may be possible that the right combinations will improve our results.  We will report any improved results we discover in a later version of this document.

# The State of the Cloud: Evolving to Support Deep Learning and Streaming Data Analytics and Some Research Challenges

(Note:  This is an updated version on 7/21/2016.   The change relates to containers and HPC and it is discussed in the  research topics at the end.)

I was recently invited to serve on a panel for the 2016 IEEE Cloud Conference.  As part of that panel I was asked to put together 15 minutes on the state of cloud technology and pose a few research challenges.   Several people asked me if I had published any of what I said so I decided to post my annotated notes from that mini-talk here. The slide deck that goes along with this can be found here.  There were three others on the panel who each made some excellent points and this document does not necessarily reflect their views.

Cloud computing has been with us for fifteen years now and Amazon’s Web Services have been around for ten.   The cloud was originally created to support on-line services such as email, search and e-commerce.  Those activities generated vast amounts of data and the task of turning this data into value for the user has stimulated a revolution in data analytics and machine learning.  The result of this revolution has been powerful and accurate spoken language recognition, near real-time natural language translation, image and scene recognition and the emergence of a first generation of cloud-based digital assistants and “smart” services.  I want to touch on several aspects of cloud evolution related to these exciting changes.

## Cloud Architecture

Cloud architectures have been rapidly evolving to support these computational and data intensive tasks.   The cloud data centers of 2005 were built with racks of off-the-shelf server and standard networking gear, but the demands of the new workloads described above are pushing the cloud architects to consider some radically different approaches.   The first changes were the introduction of software defined networks that greatly improved bisection bandwidth.   This also allowed the data center to be rapidly reconfigured and repartitioned to support customer needs as well as higher throughput for parallel computing loads.   Amazon was the first large public cloud vendor to introduce GPUs to better support high-end computation in the cloud and the other providers have followed suit. To accelerate the web search ranking process, Microsoft introduced FPGA accelerators and an overlay mesh-like network which adds an extra dimension of parallelism to large cloud applications.

The advent of truly large scale data collections made it possible to train very deep neural networks and all of the architectural advances described above have been essential for making progress in this area.   Training deep neural nets requires vast amounts of liner algebra and highly parallel clusters with multiple GPUs per node have become critical enablers.  Azure now support on-demand clusters of nodes with multiple GPUs and dedicated InfiniBand networks. The FPGAs introduced for accelerating search in the Microsoft data centers have also proved to be great accelerators for training convolutional neural networks.   GPUs are great for training deep networks but Nirvana has designed a custom ASIC that they claim to be a better accelerator.   Even Cray is now testing the waters of deep learning.   To me, all of these advances in the architecture of cloud data centers points to a convergence with the trends in supercomputer design.  The future exascale machines that are being designed for scientific computing may have a lot in common with the future cloud data centers.   Who knows?  They may be the same.

## Cloud System Software

The software architecture of the cloud has gone through a related evolution.  Along with software defined networking we are seeing the emergence of software defined storage.   We have seen dramatic diversification in the types of storage systems available for the application developer.  Storage models have evolved from simple blob stores like Amazon’s S3 to sophisticated distributed, replicated NoSQL stores designed for big data analytics such as Google’s BigTable and Amazon’s DynamoDB.

Processor virtualization has been synonymous with cloud computing.   While this is largely still true, container technology like Docker has taken on a significant role because of its advantages in terms of management and speed of deployment.  (It is worth noting that Google never used traditional virtualization in their data centers until their recent introduction of IaaS in GCloud.)   Containers are used as a foundation for microservices; a style of building large distributed cloud applications from small, independently deployable components.   Microservices provide a way to partition an application along deployment and language boundaries and they are well suited to Dev-Ops style application development.

Many of the largest applications running on the cloud by Microsoft, Amazon and Google are composed of hundreds to thousands of microservices.   The major challenges presented by these applications are management and scalability.    Data center operating systems tools have evolved to coordinate, monitor and attend to the life-cycle management of many concurrently executing applications, each of which is composed of vast swarms of containerized microservice.  One such systems is Mesos from Mesosphere.

## Cloud Machine Learning Tools

The data analytics needed to create the smart services of the future depend upon a combination of statistical and machine learning tools.  Bayesian methods, random forests and others have been growing in popularity and are widely available in open source tools.  For a long time, neural networks were limited to three levels of depth because the training methods failed to show improvements for deeper networks.  But very large data collections and some interesting advances in training algorithms have made it possible to build very accurate networks with hundreds of layers.  However, the computation involved in training a deep network can be massive.   The kernels of the computation involve the dense linear algebra that GPUs are ideally suited and the type of parallelism in the emerging cloud architecture is well suited to this task.   We now have a growing list of open source machine learning toolkits that have been recently released from the cloud computing research community.   These include Amazon’s Tensorflow, AzureML, Microsoft Research Computational Network Tool Kit (CNTK),  Amazon’s Deep Scalable Sparse Tensor Network Engine (DSSTNE), and Nervana’s NEON.    Of course the academic research community has also been extremely productive in this area.  Theano is an important Python toolkit that has been built with contributions from over a dozen universities and institutes.

Figure 1. cloud ML tools and services stack

Not every customer of cloud-based data analytics wants to build and train ML models from scratch.   Often the use cases for commercial customers are similar, hence another layer of services has emerged based on pre-trained models.   The use cases include image and language recognition, specialized search,  and voice-driven intelligent assistants.   As illustrated in Figure 1, these new services include Cortana (and MSR project Oxford components), Google ML, Amazon Alexa Skills Kit, IBM Watson Services and (using a different style cloud stack) Sentient Aware.

## Streaming Data Analytics Services

There are several “exponentials” that are driving the growth of cloud platforms and services.   These include Big Data, mobile apps, and the Internet of things.   The ability to analyze and act on data in motion is extremely important for application area including urban informatics, environmental and ecological monitoring and recovery, analysis of data from scientific experiments and web and data center log analysis.   The Cloud providers and open source research community has developed a host of new infrastructure tools that can be used to manage massive streams of data from remote sources.  These tools can be used to filter data streams, do on-line analysis and use the backend cloud machine learning services.  The tools include Spark Streaming, Amazon Kinesis, Twitter Heron, Apache Flink, Google Dataflow/Apache Beam and the Azure Event hub and data lake.   A more detailed analysis of these tools can be found here.

## A Few Research Challenges

As was evident at the IEEE cloud conference, there is no shortage of excellent research going on, but as promised here are a few topics I find interesting.

1. Cloud Data Center Architecture.  If you are interested in architecture research the Open Compute Project has a number of challenging projects that are being undertaken by groups of researchers.  They were founded by people from companies including Facebook, Intel, Google, Apple, Microsoft, Rackspace, Ericsson, Cisco, Juniper Networks and more and they have contributed open data center designs.   And it is open, so anybody can participate.
2. Cloud & Supercomputer convergence.   As the sophistication of the cloud data centers approach that of the new and proposed supercomputers it is interesting to look at what architectural convergence might look like.  For example, which modes of cloud application design will translate to supercomputers?   Is it possible that the current microservice based approach to interactive cloud services could be of value to supercomputer centers?   Can we engineer nanosecond inter-container messaging? Can we do a decent job of massive batch scheduling on the cloud with the same parallel efficiency as current supercomputers?
Update:  It seems that there is already some great progress on this topic.    The San Diego Supercomputer Center has just announced deployment of Singularity on two of their big machines.   Singularity is a special container platform from Gregory M. Kurtzer of LBNL.  There is a great article by Jeff Layton that gives a nice overview of Singularity.
3. Porting Deep Learning to Supercomputers. There is currently serious interest in doing large scale data analytics on large supercomputers such as those at the national centers.  Some believe that the better algorithms will be available with these advance parallel machines.   Can we compile tensorflow/CNTK/ DSSTNE using MPI for exascale class machines?  In general, are there better ways to parallelize NN training algorithms for HPC platforms?
4. The current open source stream analytics platforms describe above are designed to handle massive streams of events that are each relative small. However, many scientific event streams are more narrow and have event object that may be massive blobs.   What would it take to modify the open source streaming tools to be broadly applicable to these “big science” use cases.

I welcome feedback on any of the items discussed here.   Many of you know more about these topics than I, so let me know where you think I have incorrectly or overstated any point.

# A Quick Dive into Cloud Data Streaming Technology

This is the second part of a two part series about data streaming technology.  The first part is about streaming data in science and this part describes the programming models for several open source cloud based data streaming tools including Spark Streaming, Storm and Heron, Googles Dataflow and Apache Flink.

## Introduction

Cloud computing evolved from the massive data centers that were built to handle the “big data” challenges that confronted the designers of on-line services like search and e-mail.    For the most part, data from these services accrued into large collections in the cloud where they could be analyzed by massively parallel, batch computing jobs.   The types of knowledge derived from this analysis is used to improve the services that generated the data in the first place.   For example, data analysis of cloud system log files can yield valuable information about how to improve performance of the cloud system.   Analysis of user search terms can improve the search index.  Analysis of vast collections of text can be used to create new machine learning based services such as natural language translation services.

While batch analysis of big collections is extremely important, it is often the case that the results of the analysis must be available as soon as the data is available.   For example, analyzing data from instruments that control complex systems, such as the sensors onboard an autonomous vehicle or an energy power grid.  In these instances, the data analysis is critical to driving the system.  In some cases, the value of the results diminishes rapidly as it gets older.  For example, trending topics in a twitter stream is not very interesting if it is no longer trending.   In other cases, the volume of data that arrives each second is so large that it cannot be retained and real-time analysis or data reduction is the only way to handle it.   This is true of some extremely large science experiments.

We refer to the activity of analyzing data coming from unbounded streams as data stream analytics.  While many people think this is a brand new topic, there is a longer history that goes back to some basic research on complex event processing in the 1990s at places like Stanford, Caltech and Cambridge.  These projects created some of the intellectual foundation for today’s systems.

In the paragraphs that follow we will describe some of the recent approaches to stream analytics that have been developed by the open source community and the public cloud providers.    As we shall see there are many factors that determine when a particular technology is appropriate for a particular problem.   While it is tempting to think that one open source solutions can cover all the bases, this may not be the case.  In fact there is an entire zoo of interesting solutions including Spark Streaming which has been derived from the Spark parallel data analysis system,  Twitter’s  Storm system which has been redesigned by Twitter as Heron, Apache Flink from the German Stratosphere project, Googles Dataflow which is becoming Apache Beam which will run on top of Flink, Spark and Google’s cloud.  Other university projects include Borealis from Brandeis, Brown and MIT,  Neptune and the Granules project at Colorado State.   In addition to Google Cloud dataflow other commercial cloud providers have contributed to the available toolkit: Amazon Kinesis,  Azure Streaming and IBM Stream Analytics are a few examples.   In some cases, the analysis of instrument data streams needs to move closer to the source and tools are emerging to do “pre-analysis” to decide what data should go back to the cloud for deeper analysis.   For example, the Apache Quark edge-analytics tools are designed to run in very small systems such as the Raspberry Pi.   A  good survey of many of these stream processing technologies is by Kamburugamuve and Fox.   They cover many issues not discussed here.

## Basic Design Challenges of Streaming Systems

Before continuing it is useful to address several basic problems that confront the designers of these system.   A major problem is the question of correctness and consistency.   Here is the issue.  Data in an unbounded stream is unbounded in time.   But if you want to present results from the analytics, you can’t wait until the end of time.   So instead you present results at the end of a reasonable window of time.  For example, a daily summary based on a complete checkpoint of events for that day. But what if you want results more frequently?   Every second? The problem is that if the processing is distributed and the window of time is short you may not have a way to know about the global state of the system and some events may be missed or counted twice.  In this case the reports may not be consistent.  Strongly consistent event systems will guarantee that each event is processed once and only once.    A weakly consistent system may give you approximate results that you can “back up” by a daily batch run on the daily checkpoint file.  This gives you some ground-truth to fall back on if you suspect your on-line rapid analysis reporting is less reliable.   Designs based on combining a streaming engine with a separate batch system is called the Lambda Architecture.  The goal of many of the systems described below is to combine the batch computing capability with the streaming semantics so having a separate batch system is not necessary.

The other issue is the design of the semantics of time and windows.   Many event sources provide a time stamp when an event is created and pushed into the stream.  However, the time at which an events is processed will be later.   So we have event time and processing time.  To further complicate things events may be processed out of event-time order.   This raises the question of how we reason about event time in windows defined by processing time.

There are at least four types of windows.   Fixed Time windows divide the income stream into logical segments that correspond to a specified interval of processing time.  The intervals do not overlap. Sliding windows allow for the windows to overlap.  For example, windows of size 10 seconds that start every 5 seconds.   Per-session windows divide the stream by sessions of activity related to some key in the data.  For example, mouse clicks from a particular user may be bundled into a sequence of sessions of clicks nearby in time. Finally, there is the global window that can encapsulate an entire bounded stream.   Associated with windows there must be a mechanism to trigger an analysis of the content of the window and publish the summary.   Each of the systems below support some windowing mechanisms and we will discuss some of them and provide some concluding remarks at the end.  A great discussion of this and many related issues is found in a pair of articles by Tyler Akidau.

Another design involves the way the system distributes the work over processors or containers in the cloud and the way parallelism is achieved.   As we shall see the approaches to parallelism of the systems described here are very similar.   This paper will not discuss performance or scalability issues.  That is another topic we will return to later.

Finally, we note that operations on streams often resemble SQL-like relational operators.   However, there are difficulties with this comparison.  How do you do a join operation on two streams that are unbounded?  The natural solution involves dividing streams by windows in time and doing the join over each window.  Vijayakumar and Plale have looked at this topic extensively.  The CEDR system from MSR illustrated how SQL-like temporal queries can have a well-defined semantics.

## Cloud Providers and the Open Source Streaming Tools.

One way to distinguish the streaming engines is look at the approach to the programming model.  In one camp is an approach based on batch processing as derived from Hadoop or Spark, and the other is based on the pipelined execution of a directed acyclic graph.

### Spark Streaming

Spark streaming is a good example that illustrates how one can adapt a batch style processing analytics tool to a streaming case.   The core idea is very simple.  You break the stream into a bunch of little batches.

To illustrate this and a few of the other technologies discussed here we will frame the discussion in terms of a hypothetical science application.   Assume we have a large set of environmental sensor distributed over some area.  Each sensor is connected by WiFi to the internet and each sends a sequence of messages to a cloud address for analysis.  The sensors may be weather, sound, co2, light, motion, vibration or image capture.   The size of the messages may only be a few bytes (time stamp + geo-location + temperature) or a few megabytes of sound or images.    The goal of the project may be environmental restoration where you are interested in the health and development of the flora and fauna in some devastated forest.   Or it may be something like the NSF ocean observatories project which has a large number of wired as well as untethered instruments along the U.S. coastal waters.

Spark streaming works by taking the input from a stream data source aggregator such as

1. A high throughput publish-subscribe system like RabbitMQ or a more highly scalable system like Apache Kafka or
2. The Microsoft Azure Event Hub (which we described in another post) or
3. Amazon Kinesis.

Kinesis is a robust data aggregator in that it can take from many sources at high rates of speed and it will retain the stream records for up to seven days.   A Kinesis producer is a source of a stream of data records.  Each Producer uses a partition key, such as “co2 sensor” that is attached to each data record as it is sent to Kinesis.   Internally Kinesis partitions data into “shards” and each shard can handle up to 2 MB/sec or 1000 records per second of input data.   Kinesis uses the partition key to map your data record to a shard.   The total capacity of your stream is the sum of the capacity of the shards that it contains.

A Kinesis client is the program that pulls the data records from the Kinesis shards and processes it.   You can roll your own client or you can use spark streaming or one of the other systems described here to do the processing.   Spark streaming is just a version of Spark that processes data in batches where each batch is defined by a time interval.   The Spark name for a stream is a DStream which is a sequence or Spark RDDs (Resilient Distributed Dataset).  We covered Spark in a previous article here.  Spark Streaming provides a nice adaptor which will automatically read the data from Kinesis shards and repackage them into DStreams so that they can be consumed by the Spark Engine as shown in Figure 1.

Figure 1.   Environmental sensor analysis stream example.

Each RDD in the DStream is represents the data in a window of time from the shard associated with the instrument stream.   This RDD is processed in parallel by the spark engine.   Another level of parallelism is exploited by the fact that we have DStreams associated with each shard and we may have many of them.  There is one processing thread for each shard. This is nicely illustrated in Figure 2 from the Spark Streaming Guide.

Figure 2.   Spark Streaming with Kinesis  (image from Spark streaming kinesis integration guide)

DStreams can be transformed into new DStreams using the Spark Streaming library.  For example there are the map() and filter() functions that allows us to apply an analysis or filter on a DStream to produce a new one.   DStreams can be merged together by the union() operator or, if there is a common key, such as a timestamp, one can apply a join() operator to create a new DStream with events with the same key tied together.  Because each RDD in the DStream is process completely by the Spark engine, the results are strongly consistent.   There is a very good technical paper from the Berkeley team that created spark streaming and it is well worth a read.

To illustrate spark streaming let’s assume that every second our sensors from figure 1 each transfer a byte array that encodes a json string representing its output every second. Suppose we are interested in receiving a report of the average temperature for each 10 second window at each location where we have a temperature sensor.   We can write a Python Spark Streaming program to do this as follows.   First we need to create a streaming context and Kinesis connector to grab the stream of instrument data.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

sc = SparkContext("....", "sensortest")
ssc = StreamingContext(sc, 10)

ks = KinesisUtils.createStream(
sc, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval],[StorageLevel])


Ks should now be a DStream where each RDD element is the set of events collected by Kinesis in the last 10 seconds.  (Note: I have not yet actually tried this, so some details may be wrong.  This example is adapted from a Kafka version from Jon Haddad and the Kenisis integration guide).

Next we will need to convert byte array for each sensor record to a json Python dictionary.  From there we will filter out all but the temperature sensors, then using a simple map-reduce compute the average temperature for each sensor (which we identify by its location).   To do this we can use the reduceByKey() method which will give us a sum and count for each sensor.  We can then map that into a new DStream taking the form of a dictionary of sensor locations and average temperature for that interval as follows.

temps = ks.filter(lambda x: x["sensortype"] == "tempsensor")   \
.map(lambda x: (x["location"], (x["value"], 1))      \
.reduceByKey(lambda (x1,y1),(x2,y2): (x1+x2,y1+y2))  \
.map(lambda z: {"location": z[0], "average temp": z[1][0]/z[1][1]])


We may now dump our result DStream temps to storage at the end of the processing of this RDD.   Alternatively,  we can join this DStream with a static DStream to compute a running average temperature.

## Storm and Heron: Streaming with a DAG dataflow style.

There are several significant systems based on executing a directed graph of tasks in a “dataflow” style. We will give a brief overview of three of these.  One of the earliest was Storm which was created by Nathan Marz and released as open source by Twitter in late 2011.   Storm was written in a dialect of Lisp called Clojure that works on the Java VM.    In 2015 Twitter rewrote Storm and it is has deployed it under the name Heron which is being released as an Apache project.  The Heron architecture was described in an article in the ACM SIGMOD 2015 conference. Heron implements the same programming model and API as Storm, so we will discuss Storm first and then say a few words about the Heron design.

Storm (and Heron) run “topologies” which are directed acyclic graphs whose nodes are Spouts (data sources) and Bolts (data transformation and processing).   Actually Storm has two programming models. One of these we can call classic and the other is called Trident which is built on top of the classic model.  In both cases Storm (and Heron) topologies are directed acyclic graphs as shown in Figure 3.

Figure 3.   Storm/Heron topology. On the left is the abstract topology as defined by the program and on the right is the unrolled parallel topology for runtime.

The programming model is based on extending the basic spout and bolt classes and then using a topology builder to tie it all together.   A basic template for a Bolt is shown below.   There are three required methods.  The prepare() method is a special constructor that is called when the actual instance is deployed on the remote JVM.  It is supplied with context about the configuration and topology as well as a special object called the OuputCollector which is used to connect the Bolts output to the output stream defined by the topology.   The prepare() method is also where you instantiate your own data structures.

The basic data model for Storm/Heron is a stream of Tuples.  A tuple is just that: a tuple of items where each item need only be serializable.  Some of the fields in a tuple have names that are used for communicating a bit of semantics between bolts.   The method declareOutputFields() is used to declare the name of the fields in a stream.   More on this point later.   The heart of the bolt is the method execute(). This is invoked for each new tuple that is sent to the bolt and it contains the computational core of the bolt.   It is also where results from the Bolts process is sent to its output streams.

The main programming API for Storm is Java, so we will touch briefly on that here. There are several base classes and styles of bolts, but this is the basic template.  One of the specialized Bolt classes is for sliding and tumbling windows.  Spouts are very similar classes, but the most interesting ones are the Spouts that connect to event providers like Kafka or EventHub.

public class MyBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
/*
*execute is called when a new tuple has been delivered.
*do your real work here.  for example,
*create a list of words from the tuple and then emit them
*to the default output stream.
*/
for(String word : words){
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
/*
* the declarer is how we declare out output fields in the default
* output stream.  you can have more than one output stream
* using declarestream. the emit() in execute needs to identify
* the stream for each output value.
*/
declarer.declare(new Fields("word"));
}
}


The topology builder class is to build the abstract topology and provide instructions for how the parallelism should be deployed.   The key methods of the build are setBolt() and setSpout().  These each take three arguments: the name of the spout or bolt instance, an instance of your spout or bolt class and an integer that tells the topology how many tasks will be assigned to execute this instance.  A task is a single thread that is assigned to a spout or bolt instance.   This is the parallelism number.   The code below shows how to create the topology of Figure 3.

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("Spout", new MySpout(), 2);
builder.setBolt("BoltA", new MyBoltA(), 4).shuffleGrouping("spout");
builder.setBolt("BoltB", new MyBoltB(), 3)
.fieldsGrouping("BoltA", new Fields("word"));
builder.setBolt("BoltC", new MyBoltC(), 2).shuffelGrouping("spout")

Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(“mytopology”, config, builder.createTopology());


As you can see, there are 2 tasks for the spout, 4 for bolt A, 3 for bolt B and 2 for bolt C.   Note that the 2 tasks for the spout are sent to 4 for Bolt B.   How do we partition the 2 output streams over the 4 tasks?  To do this we use a stream grouping function.   In this case we have used Shuffle grouping which randomly distributed them.   In the second case we map the 4 outputs streams from Bolt A to the 3 tasks of bolt B using a field grouping based on a field name.   This makes sure that all tuples with the same field name are mapped to the same task.

As mentioned above the Twitter team has redesigned storm as Heron.   The way a topology is executed is that a set of container instances are deployed to manage it as shown in Figure 4.

Figure 4.   Heron architecture detail.

The topology master coordinates the execution of the topology on a set of other containers that each contain a stream manager and heron instance processes which execute the tasks for the bolts and spouts.  The communication between the bolts and spouts are mediated by the stream manager and all the stream managers are connected together in an overlay network.  (The topology master makes sure they are all in communication.)  Heron provides great performance improvements over Storm.  One improvement of the architecture is better flow control of data from spouts when the bolts are falling behind.  Please look at the full paper for more detail. Some of the best Storm tutorial material comes from Michael Noll’s blog (here is a good example).

## Trident

As mentioned Storm has another programming model that is implemented on top of the basic spout bolt library.   This is called Trident.     The classic Storm programming model is based on the topology instance.  You construct the flow graph by adding spouts and bolts.   It is building a graph by adding the nodes. Trident is somewhat of a dual concept: it is about the edges.   The central figure in Trident is the stream. The first thing to note is that trident processes all events in a stream in batches and Trident works very hard to make sure that each tuple is processed once and only once.  However, in real life failure happens and retries may be required.  Since tuples originate from spouts defining the retry semantics must be closely tied to the spout.  Trident has several configurations for spout depending on the semantics required.  Some are transactional, meaning every batch has a transaction identifier (txid) and a tuple does not appear in any other batch.  Using the txid we can make sure we never process a tuple more than once.  If the tuple caused the processing of the batch to fail, we can re-issue the entire batch.   Regular Storm spouts are non-transactional.   Another type of spout is “opaque transactional”  the third category which guarantees that each tuple is processed exactly once but, if not, it may appear in another batch.

Let’s begin by declaring a trivial artificial (non-transactional) spout that has a single word in each tuple called “name”.   I want the batch size to be 50 tuples.   The code will look something like this.

TridentTopology topology = new TridentTopology();
FixedBatchSpout spout = new FixedBatchSpout(new Fields("name"), 50,
... the word list here ... )
Stream str1 = topology.newStream("spout", spout)


Now that we have a stream we can start making transformations to it.   For example, we can expand the tuple so each tuple contains the word and also the number of characters in the word.   We can do this by creating a function object that takes the string from the tuple and emits its length.

public static class Getlength extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
collector.emit(new Values(tuple.getString(0).length()));
}
}


We apply this function to the stream to create a new stream.

Stream str2 = str1.each(new Fields("name"), new Getlength, new Fields("length"));


Notice that the function only emitted the length.   The each() function has the strange property that it appends new field to the end of the tuple, so now each tuple has labels [“name”, “length”].    Next suppose we only want names from a particular list mynames and we want to drop the others.   We will write a filter function to do that and then create a new filtered stream.

public static class NameFilter extends BaseFilter {
List nameslist

public NameFilter(List names) {
this.namelist = names;
}
@Override
public boolean isKeep(TridentTuple tuple) {
return namelist.contains(tuple.getString(0));
}
}
Stream str3 = str2.each(new Fields("name","length"), new NameFilter(mynames));


Now let’s partition the stream by the name field and compute the counts of each. The result is of type TridentState.

TridentState counts =
str3.groupBy(new Fields("name"))
.persistentAggregate(new MemcachedState.opaque(serverLocations),
new Count(), new Fields("count"))


The details about how the data is sent to the databases behind the memcash are not important here by the idea is we can now keep track of the aggregate state of the stream.

The final thing we should look at is how parallelism is expressed.   This is actually fairly simple annotations to the stream.   Putting all the steps above into one expression we can show how this is done.

TridentState counts =
topology.newStream("spout", spout)
.parallelismHint(2)
.shuffle()
.each(new Fields("name"), new Getlength, new Fields("length"))
.parallelismHint(5)
.each(new Fields("name","length"), new NameFilter(mynames))
.groupBy(new Fields("name"))
.persistentAggregate(new MemcachedState.opaque(serverLocations),
new Count(), new Fields("count"));


This version creates two instances of the spout and five instances of the Getlength() function and uses the random shuffle to distribute the tuple batches to the instances.   There is much more to classic Storm and Trident and there are several good books on the subject.

## Google’s Dataflow and Apache Beam

The most recent entry to the zoo of solutions we will discuss is Apache Beam (now in Apache’s incubation phase.) Beam is the open source release of the Google Cloud Dataflow system.  Much of what is said below is a summary of their document. An important motivation for Beam (from now on I will use that name because it is shorter than writing “Google Cloud Dataflow”) is to treat the batch and streaming cases in a completely uniform way.   The important concepts are

1. Pipelines – which encapsulates the computation in the model.
2. PCollections – the data as it moves through a Pipeline.
3. Transforms – the computational transformations that operate on PCollections and produce PCollections
4. Sources and Sinks.

### PCollections

The idea is that a PCollection can be either a very large but fixed size set of element or a potentially unbounded stream.   The elements in any PCollection are all of the same type, but that type maybe any serializable Java type.   The creator of a PCollection often appends a timestamp to each element at creation time.   This is particularly true of unbounded collections. One very important type of PCollection that is used often is the Key-Value PCollection KV<K, V> where K and V are the Key and Value types.   Another important thing to understand about PCollections is that they are immutable.  You can’t change them but you can use transforms to translate them into new PCollections.

Without going into the details of how you initialize a pipeline, here is how we can create a PCollection of type PCollection<String> of strings from a file.

Pipeline p = Pipeline.create(options);
PCollection pc =


We have used the pipeline operator apply() which allows us to invoke the special transform TextIO to read the file.   There are other pipeline operators, but we will not discuss many of them.  Now, in a manner similar to the way Trident uses the each() operator to create new Trident streams, we will create a sequence of PCollections using the apply() method of the PCollection class.

There are five basic transform types in the library.   Most takes a built-in or user defined function object as an argument and applies the function object to each element of the PCollection to create a new PCollection.

1. Pardo –  apply the function argument to each element of the of the input PCollection. This is done in parallel by workers tasks that are allocated to this activity.   This is basic embarrassingly parallel map parallelism
2. GroupByKey – apply this to a KV<K,V> type of PCollection with group all the elements with the same key into the a single list, so the resulting PCollection is of type KV<K, Iterable<V>>.    In other words, this is the shuffle phase of a map-reduce.
3. Combine – apply an operation that reduces a PCollection to a PCollection with a single element. If the PCollection is windowed the result is a Pcollection with the combined result for each window.   Another type of combining is for key-grouped PCollections.
4. Flatten – combine PCollections of the same type into a single PCollection.
5. Windowing and Triggers – These are not transformations in the usual sense, but defining mechanisms for the window operations.

To illustrate some of these features let’s redo the environmental sensor example again but we will compute the average temperature for each location using a sliding window.    For the sake of illustration, we will use an imaginary pub-sub system to get the events from the instrument steam and let’s suppose the events are delivered to our system in the form of a Java object from the class InstEvnt.  That would be declared as follows.

@DefaultCoder(AvroCoder.class)
static class InstEvent{
@Nullable String instType;
@Nullable String location;
public InstEvent( ....)
public String getInstType(){ ...}
public String getLocation(){ ...}
}


This class definition illustrates how a custom serializable type looks like in Beam. We can now create our stream from our fictitious pub-sub system with this line.

PCollection input =
.timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
.subscription(options.getPubsubSubscription()));


We next must filter out all but the “tempsensor” events. While we are at it, let’s convert the stream so that the output is a stream of key-value pairs corresponding to (location, reading). To do that we need a special function to feed to the ParDo operator.

static class FilterAndConvert extends DoFn<InstEvent, KV<String, Double>> {
@Override
public void processElement(ProcessContext c) {
InstEvent ev = c.element();
if (ev.getInstType() == "tempsensor")
}
}


We Now we can apply the Filter and Convert operator to our input stream. Let us also create a sliding window of events of duration five minutes that is created every two minutes. We note that the window is measured in terms of the timestamps on the events and not on the processing time.

PCCollection<KV<String, Float>> reslt = input
.apply(Pardo.of(new FilterAndConvert())
.apply(Window.<KV<String, Double>> into(SlidingWindows.of(
Duration.standardMinutes(5))
.every(Duration.standardMinutes(2))))


Our stream reslt is now a KV<String,Double> type and we can apply a GroupByKey and Combine operation to reduce this to a  KV<String,Double> where each location key maps to the average temperature.   To make life easy Beam has a number of variations of this simple map-reduce operation and one exists that is perfect for this case:  Mean.perKey() which combines both steps in one transformation.

PCollection<KV<String, Double>> avetemps
= reslt.apply(Mean.<String, Double>perKey());


Finally we can now take the set of average temperatures for each window and send them to an output file.

PCollection outstrings = avetemps
.apply(Pardo.of(new KVToString())
.apply(TextIO.Write.named("WritingToText")
.to("/my/path/to/temps")
.withSuffix(".txt"));


The function class KVToString()  is one we define in a manner similar to the FilterAndConvert class above. There are two things to notice in what happened above.   First, we have used an implicit trigger that generates the means and output at the end of the window.   Second, note that because the windows overlap, events will end up in more than one window.

Beam has several other types of triggers.   For example, you can have a data driven trigger looks at the data as it is coming and fires when some condition you have set is met.   The other type is based on a concept introduce by Google Dataflow called the watermark.  The idea of the watermark is based on event time.    It is used to emit results when the system estimates that it has seen all the data in a given window. There are actually several very sophisticated ways to define triggers based on different ways to specify the watermark.  We won’t go into them here and we refer you to the Google Dataflow documents.

Flink is now one of the “runners” for Beam because it is possible to implement the Beam semantics on top of Flink.   Many of the same core concepts exist in Flink and Beam.  As with the other systems, Flink takes input streams from one or more sources, which are connected by a directed graph to a set of sinks.

Like the others, the system is based on a Java virtual machine and the API is rendered in Java and Scala.  There is also an (incomplete) Python API where there is also a similarity to Spark Streaming.   To illustrate this, we can compare the Flink implementation of our instrument filter for figure 1 to the Spark Streaming example above.

The Flink Kinesis Producer is still a “work in progress”, so this code was tested by reading a stream from a CSV file.  The Flink data types do not include the Python dictionary/Json types so we use here a simple tuple format.   Each line of the input stream looks like

instrument-type string, location string, the word "value", floating value

For example,

tempsensor, pike street and second ave, value, 72.3


After reading from the file (or Kinesis shard) the records in the stream data are now 4-tuples of type (STRING, STRING, STRING, FLOAT). The core of the Flink version of the temperature sensor averager is shown below.

class MeanReducer(ReduceFunction):
def reduce(self, x, y):
return (x[0], x[1], x[2], x[3] + y[3], x[4] + y[4])

env = get_environment()

resuts = data \
.filter(lambda x: x[0]=='tempsensor') \
.map(lambda x: (x[0], x[1], x[2], x[3], 1.0)) \
.group_by(1) \
.reduce(MeanReducer()) \
.map(lambda x: 'location: '+x[1]+' average temp %f' % (x[3]/x[4]))


The filter operation is identical to the Spark Streaming case.   After filtering the data we turn each record into a 5-tuple by appending 1.0 to the end of the 4-tuple.  The group_by(1) and reduce using the MeanReducer function.  The group_by(1) is a signal to shuffle these so that they are keyed by field in position 1 which corresponds to the  location string and then we apply the reduction to each of the grouped tuple sets. This operation is the same as the reduceByKey function in the Spark Streaming example.   The final map converts each element to a string that gives the average temperature for each location.

This example does not illustrate is Flink’s windowing operators, which are very similar to Beam’s, nor does it illustrate the underlying execution architecture.    In a manner similar to the other systems described here, Flink parallelizes the stream and tasks during execution.   For example, our temperature sensor example has a logical view as tasks which may be executed in parallel as shown in Figure 5.