Category Archives: Uncategorized

Manifold Learning and Deep Autoencoders in Science

One way to think about machine learning is to view it as building a model of a system based on samples of data that are artifacts of that system.  This view does not resonate very well when the problem at hand is identifying photos containing sail boats, but it is apt when ML is applied to scientific data.

The data from scientific experiments is often takes the form of vectors in a very high dimensional space and we are looking for an underling organization of that data that reflects properties our system.   Manifold learning is based on the assumption that the system you are trying to model generates data that lies on or near a lower dimensional surface in the higher dimension coordinate space of the data.   Picture the surface of a sphere or a curve in 3-D.   If this manifold assumption about the data is true, it may be possible to “unfold’’ the surface so that a projection or other linear analysis makes the data easier to understand.

Autoencoders are deep neural networks that can be used to discover properties of the manifold of data that characterizes the system you are trying to model.  Illustrated below, autoencoders have an encoder phase and a decoder phase with a space of much lower dimension than the input in the middle.  By studying the properties of this lower dimensional space, we have a better view of the data manifold. You train an autoencoder to be the identity function.  


Recently, a lovely blog article A look at deep learning for science by Prabhat gave us an excellent overview of some uses of deep learning technology in science applications and several of these were applications of autoencoders.  Partly inspired by that article and by our own experiments with some very interesting images of neuron cells collected by Maryana Alegro at UCSF, we put together a little tutorial on autoencoders in science.   In the article we discuss two types of autoencoders: denoising and variational.  The variational autoencoder we examine is applied to the cell images in an effort to create a model that can be used to both classify samples as well generate new samples by following a path along the surface of the manifold. 

We have published this little study as a supplement to our chapter on machine learning in the book “Cloud Computing for Science and Engineering”.   The link to the article is here.

Cloud Computing for Science and Engineering

Ian Foster and I have just completed a final draft of a book that is designed to introduce the working scientist, engineer or student to cloud computing.  It surveys the technology that underpins the cloud, new approaches to technical problems enabled by the cloud, and the concepts required to integrate cloud services into scientific work.  Many of the blog posts that have appeared here have been reworked and, we hope, greatly improved and integrated into the text.  In addition the book contains introductions to the Globus data management and services infrastructure that has become widely used by the research community.

We have a website for the book that contains draft chapters,  jupyter notebooks that illustrate most of the concepts and a collection of lecture slides for the tutorial at the IEEE International Conference on Cloud Engineering based on the material in the Book.  This collection will grow over time.  The book website will also contain updates to the book material as the current cloud technology evolves.

The Table of contents for  the book is below.   We look forward to your feedback.

Table of Contents

1 Orienting in the cloud universe

Part I. Managing data in the cloud

2 Storage as a service
3 Using cloud storage services

Part II. Computing in the cloud

4 Computing as a service
5 Using and managing virtual machines
6 Using and managing containers
7 Scaling deployments

Part III. The cloud as platform

8 Data analytics in the cloud
9 Streaming data to the cloud
10 Machine learning in the cloud
11 The Globus research data management platform

Part IV. Building your own cloud

12 Building your own cloud with Eucalyptus (with Rich Wolski)
13 Building your own cloud with OpenStack (with Stig Telfer)
14 Building your own SaaS

Part V. Security and other topics

15 Security and privacy
16 History, critiques, futures
18 Afterword: A discovery cloud?

Big Data Congress 1017- CFP

I wanted to help get the word out concerning the final call for papers for the 2017 IEEE Big Data Congress.   The deadline is February 28, 2017.  This meeting is part of an annual gang of meetings including the IEEE Cloud conference, the IEEE International Conference on Web Services and others.   The conferences are going to be in Honolulu, Hawaii June 25 – June 30, 2017.

The Big Data meeting will have four tracks: research, applications, short papers and special topics.  The special topics including vision papers to point out emerging challenges, papers that describe new  data sets and benchmarks,   experience and  surveys.

The full call for papers is here:

Should be a good meeting.





Cloud-Native Applications – call for papers

The IEEE Cloud Computing Journal is going to publish a special issue on the topic of Cloud-Native applications. This is an extremely interesting topic and it cuts to the heart of what makes the cloud a platform that is, in many ways, fundamentally different from what we have seen before.

What is “cloud-native”? That is what we want you to tell us. Roger Barga from Amazon, Neel Sundaresan from Microsoft and this blogger have been invited to be guest editors. But we want you to tell us soon. The deadline is March 1, 2017. The papers do not need to be long (3,000 to 5,000 words) and some of the topics possible include:

  • Frameworks to make it easier for industry to build cloud-native applications;
  • Educational approaches and community based organizations that can promote cloud-native design concepts;
  • The role of open source for building cloud-native applications;
  • VM and container orchestration systems for managing cloud-native designs;
  • Efficient mechanisms to make legacy applications cloud-native;
  • Comparing applications – one cloud-native and the other not – in terms of performance, security, reliability, maintainability, scalability, etc.;

And  more.   please go to  to read more.


Kubernetes and the Google Cloud Container Service: Fun with Pods of Celery.

In a previous post I talked about using Mesosphere on Azure for scaling up many-tasks parallel jobs and I promised to return to Kubernetes when I figured out how to bring it up.   Google just made it all very simple with their new Google Cloud container services.   And, thanks to their good tutorials, I learned about a very elegant way to do remote procedure calls using another open source tool called Celery.

So let me set the stage with a variation on an example I have used in the past.   Suppose we have 10000 scientific documents that are stored in the cloud.   I would like to use a simple machine learning method to classify each of these by topic.    I would like to do this quickly as possible and, because the analysis of each document is independent of the others, I can try to process as many as possible in parallel.  This is the basic “many task” parallel model and one of the most common uses of the cloud for scientific computing purposes.      To do this we will use the Celery distributed task queue mechanism to take a list of our documents and send each one to a work queue where the tasks will be parceled out to workers who will do the analysis and respond.

The Google Cloud Container Service and a few words about Kubernetes.

Before getting into the use of Celery and the analysis program, let’s describe the Google Cloud Container Service and a bit about Kubernetes.   Getting started is incredibly easy.   Google has a small free trial account which is sufficient to do the experiments described.  Go to and sign in or create an account.  This will take you to the “console” portal. The first thing you need to do is to create a project. In doing so it will be assigned an id which is a string of the form “silicon-works-136723”.    There is a drop down menu on the left end of the blue banner at the top of the page.  (Look for three horizontal bars.) This allows you to select the type of service you want to work on.     Select the “Container Engine”.  On the “container clusters” page there is a link that will allow you to create a cluster.   With the free account you cannot make a very big cluster.   You are limited to about 4 dual core servers.   If you fill in the form and submit it, you will soon have a new cluster.  There is a special icon of the form “>_” in the blue banner.  Clicking on that icon will create an instance of a “Cloud Shell” that will be automatically authenticated to your account.   The page you will see should resemble Figure 1 below.   The next thing you need to do is to authenticate your cloud shell with your new cluster.   By selecting your container and clicking on the “connect” button to the right you will get the code to paste into the cloud shell.  The result should now look exactly like Figure 1.


Figure 1.   Creating a Google cloud cluster and connecting the cloud shell to it.

Interacting with Kubernetes, which is now running on our small cluster, is through command lines which can be entered into the cloud shell.   Kubernetes has a different, and somewhat more interesting architectures than other container management tools.   The basic unit of scheduling in Kubernetes is  launching pods. A pod consists of a set of one or more Docker-style containers together with a set of resources that are shared by the containers in that pod.  When launched a pod resides on a single server or VM.   This has several advantages for the containers in that pod.   For example, because the containers in a pod are all running  on the same VM, the all share the same IP and port space so the containers can find each other through conventional means like “localhost”.   They can also share storage volumes that are local to the pod.

To start let’s consider a simple single container pod to run the Jupiter notebook.  There is a standard Docker container that contains Jupyter and the scipy software stack.  Using the Kubernetes control command kubectl we can launch Jupyter and expose its port 8888 with the following statement.

$ kubectl run jupyter --image=jupyter/scipy-notebook --port=8888

To see that it is up and running we can issue the command “kubectl get pods” which will return the status of all of our running pods.   Though we have launched jupyter it is still not truly visible.   To do that we will associate a load balancer with the pod.   This will expose the port 888 to the open Internet.

$ kubectl expose deployment jupyter --type=LoadBalancer

Once that has been run you can get the IP address for jupyter from the “LoadBalancer Ingress:” field of the service description when you run the following.   If it doesn’t appear, try again.

$ kubectl describe services jupyter

One you have verified that it is working at that address on port 8888, you should shut it down immediately because, as you can see, there is no security with this deployment.  Deleting a deployment is easy.

$ kubectl delete deployment jupyter

 There is another point that one must be aware of when building containers that need to directly interact with the google cloud APIs.  To make this work you will need to get application default credentials to run in your container.   For example if you container is going to interact with the storage services you will need this.   To get the default application credentials follow the instructions here.  We will say a few more words about this below.

The Analysis Example in detail.

Now to describe  Celery and how to use Celery and Kubernetes in the many-task scenario described above.

To use Celery we start with our analysis program.   We have previously described the analysis algorithms in detail in another post, so we won’t duplicate that here.  Let’s start by assuming we have a function predict(doc) that takes a document as a string as an argument and returns a string containing the result from our trained machine learnging classifiers.  Our categories are “Physics”, “Math”, “Bio”, “Computer Science” and “Finance” and the result from each classifier is simply the category that that classifier determines to be the most likely correct answer.

Celery is a distributed remote procedure call system for Python programs.   The Celery view of the world is you have a set of worker processes running on remote machines and a client process that is invoking functions that are executed on the remote machines.   The workers and the clients all coordinate through a message broker running somewhere else on the network.

Here we use a RabbitMQ service that is running on a Linux VM on the NSF JetStream cloud as illustrated in Figure 2.


Figure 2.  Experimental Configuration with Celery workers running on Kubernetes in the Google Cloud Container Service,  the RabbitMQ broker running in a VM on the NSF Jetstream Cloud and a client program running as a notebook on a laptop.

The code block below illustrates the basic Celery worker template.    Celery is initialized with a constructor that takes the name of the project and a link to the broker service which can be something like a Redis cache or MongoDB.   The main Celery magic is invoked with a special Python “decorator” associated with the Celery object as shown in the file below.

from celery import Celery
app = Celery('predictor', backend='amqp')

#Now initialize and load all the data structure that will be constant 
#and recused for each analysis.  In our case this will include
#all the machine learning models that were trained on the data 
#previously. And create a main worker function to invoke the models.  
def invokeMLModels(statement):
	return analysis
#define the functions we will call remotely here
def predict(statement):
	prediction = invokeMLModels(statement)
	return [prediction]

What this decorator accomplishes is to wrap the function in a manner that it can be invoked by a remote client.   To make this work we need create a Celery worker from our file with the command below which registers a worker instance as a listener on the RabbitMQ queue.

>celery worker -A predictor -b 'amqp://guest@brokerIPaddr'

Creating a client program for our worker is very simple.   It is similar to the worker template except that our version of the predict( ) function does nothing because we are going to invoke it with the special Celery apply_async( ) method that will push the argument to the broker queue and return control immediately to the client.   The object that is returned from this call is similar to what is sometimes called a “future” or a “promise” in the programming language literature.  What it is a placeholder for the returned value.   Once we attempt to evaluate the get() method on this object our client will wait until a reply is returned from the remote worker that picked up the task.

from celery import Celery
app = Celery('predictor', broker='amqp://guest@brokerIPaddr', backend='amqp')

def predict(statement):
	return ["stub call"]
res = predict.apply_async(["this is a science document ..."])

print res.get()

Now if we have 10000 documents to analyze we can send them in sequence to the queue as follows.

#load all the science abstracts into a list
documents = load_all_science_abstracts()

res = []
for doc in documents:

#now wait for them all to be done
predictions = [result.get() for result in res]
#now do an analysis of the predictions

Here we push each analysis task into the queue and save the async returned objects in a list.  Then we create a new list by waiting for each prediction value to be returned.   Our client can run anywhere there is Internet access.  For example this one was debugged on a Jupyter instances running on a laptop.  All you need to do is “pip install celery” and run Juypter.

There is much more to say about Celery and the interested reader should look at the Celery Project site for the definitive guide. Let us now turn to using this with Kubernetes.  We must first create a container to hold the analysis code and all the model data.   For that we will need a Docker file and a shell script to correctly launch celery one the container is deployed.   For those actually interested in trying this, all the files and data are in OneDrive here.   The Docker file shown below has more than we need for this experiment.

# Version 0.1.0
FROM ipython/scipystack
MAINTAINER yourdockername "youremail"
RUN easy_install celery
RUN pip install -U Sphinx
RUN pip install Gcloud
RUN easy_install pattern
RUN easy_install nltk
RUN easy_install gensim
COPY bookproject-key.json /
COPY models /
COPY config /
COPY sciml_data_arxiv.p /
ENTRYPOINT ["bash", "/"]

To build the image we first put all the machine learning configuration files in a directory called config and all the learned model files in a directory called models.  At same level we have the source.   For reasons we will explain later we will also include the full test data set: sciml_data_arxiv.p. The Docker build starts with the ipython/scipystack container.   We then use easy_install to install Celery as well as four packages used by the ML analyzers: pattern, nltk and gensim.   Though we are not going to use the Gcloud APIs here, we include them with a pip install.   But to make that work we need an updated copy of Sphinx.  To make the APIs work we would need our default client authentication keys.   They are stored in a json file called bookproject-key.json that was obtained from the Gcloud portal as described previously.   Finally we copy all of the files and directory to the root path ‘/’.  Note that the copy from a directory is a copy of the all contained files to the path ‘/’ and not to a new directory.   The ENTRYPOINT runs our script which is shown below.

cp / .
export C_FORCE_ROOT='true'
export GOOGLE_APPLICATION_CREDENTIALS='/bookproject-key.json'
celery worker -A predictor -b $1

Bash will run our script in a temp directory, so we need to copy our file to that directory.  Because our bash is running as root, we need to convince Celery that it is o.k. to do that.  Hence we export C_FORCE_ROOT as true.  Next, if we were using the Gcloud APIs we need to export the application credentials.   Finally we invoke celery but this time we use the -b flag to indicate that we are going to provide the IP address of the RabbitMQ amqp broker as a parameter and we remove it from the explicit reference in the file.  When run the predictor file will look for all the model and configuration data in ‘/’.   We can now build the docker image with the command

>docker build -t “yourdockername/predictor” .

And we can test the container on our laptop with

>docker run -i -t “yourdockername/predictor ‘amqp://guest@rabbitserverIP’

Using “-i -t” allows you to see any error output from the container.   Once it seems to be working we can now push the image to the docker hub.   (to use our version directly, just pull dbgannon/predictor)

We can now return to our Google cloud shell and pull a version of the container there.   If we want to launch the predictor container on the cluster, we can do it one at a time with the “kubectl run” command.  However Kubernetes has a better way to do this using a pod configuration file where we can specify the number of pod instances we want to create.  In the file below, which we will call predict-job.json we specify a job name, the container image in the docker hub,   and the  parameter to pass to the container to pass to the shell script.   We also specify the number of pods to create.   In this case that is 6 as identified in the “parallelism” parameter.

apiVersion: batch/v1
kind: Job
   parallelism: 6
            name: job-wq
                  - name: c
                  image: dbgannon/predictor
                  args: ["amqp://guest@ipaddress_of_rabbitmq_server"]
          restartPolicy: OnFailure

One command in the cloud shell will now launch six pods each running our predictor container.

$kubectl create -f predict-job.json 

Some Basic Performance Observations.

When using many tasks system based on a distributed worker model there are always three primary questions about the performance of the system.

  1. What is the impact of wide-area distribution on the performance?
  2. How does performance scale with the number of worker containers that are deployed? More specifically, if we N workers, how does the system speed up as N increases?    Is there a point of diminishing return?
  3. Is there a significant per/task overhead that the system imposes?  In other words, If the total workload is T and if it is possible to divide that workload into k tasks each  of size T/k , then what is the best value of k that will maximize performance?

Measuring the behavior of a Celery application as a function of the number workers is complicated by a number of factors.   The first concern we had was the impact of widely distributing the computing resources on the overall performance.   Our message broker (RabbitMQ) was running in a virtual machine in Indiana on the JetStream cloud.   Our client was running a Jupyter notebook on a laptop and the workers were primarily on the Midwest Google datacenter and on a few on other machines in the lab.   We compared this to a deployment where all the workers, the message broker and the client notebook were all running together on the Google datacenter.   Much to our surprise there was little difference in performance between the two deployments.   There are two ways to view this result.   One way is to say that the overhead of wide area distribution was not significant.   The other way to say this is that the overhead of wide area distribution was negligible compared to other performance problems.

A second factor that has an impact on performance as a function of the number of workers is the fact that a single Celery worker may have multiple threads that are responding to asynchronous function calls.   While we monitored the execution we noticed that the number of active threads in one worker could change over time. This made performance somewhat erratic.  Celery’s policy is that it will never have more threads than the number of available cores, so to limit the thread variability we ran workers in container pods on VMs with only one core.

Concerning the question of the granularity of the work partitioning we configured the program so that a number of documents could be processed in one invocation and this number could be set remotely.    By taking a set of 1000 documents and a fixed set of workers, we divided the document set into blocks of size K where K ranged from 1 to 100.   In general, larger blocks were better because the number of Celery invocations was smaller, but the difference was not great.   Another factor involved Celery’s scheduling for deciding which worker get the next invocation.   For large blocks this was not the most efficient because this left holes in the execution schedule when workers were occasionally idle while another was over scheduled.   For very small blocks these holes tended to be small.   We found that a value of K=2 gave reasonably consistent performance.

Finally to test scalability we used three different programs.

  1. The document topic predictor described above where each invocation classified two documents.
  2. A simple worker program that does no computation but just sleeps for 10 seconds before returning a “hello world” string.
  3. A worker that computes part of the Euler sequence sum(1/i**2, i=1..n) where n = 109 .   Each worker computes a block of 107 terms of the sequence and the 100 partial results are added together to get the final result  (which approximates pi2/6  to about 7 decimal places).

The document predictor is very computational intensive and uses some rather large data matrices for the trained machine learning models.   The size of these arrays are about 150 megabytes total.  While this does fit in memory, the computation is going to involve a great deal of processor cache flushing and there may be memory paging effects.   The example that computes the Euler sum requires no data other than the starting point index and the size of the block to sum.   It is pure computation and it will have no cache flushing or memory paging effects, but it will keep the CPU very busy.   The “sleep” example leaves the memory and the CPU completely idle.

We ran all three with one to seven workers.   (6 workers using 6 cores from the small Google demo account and one on another other remote machine).    To compare the results, we computed the time for each program on one worker and plotted the speed-up ratio for 2, 3, 4, 5, 6 and 7 workers.   The results are shown in the graphs below.


Figure 3.  Performance as speed-up for each of the three applications with up to 7 workers.

As can be seen, the sleeper scales linearly in the number of workers.   In fact, when executed on multi-core machines it is almost super-linear because of the extra threads that can be used.  (It is very easy for a large number of threads to sleep in parallel.)   On the other hand, the predictor and the Euler examples reached a maximum speed up with around five workers.  Adding more worker pods to the servers did not show improvement because these applications are already very compute intensive.    This was a surprise as we expect all three experiments to scale well beyond seven workers.  Adding more worker pods to the servers did not show improvement because these applications are already very compute intensive.   When looking for the cause of this limited performance, we considered the possibility that the RabbitMQ broker was a bottleneck, but our previous experience with it has allowed us to scale applications to dozens of concurrent reader and writers.   We are also convinced that the Google Container Engine performed extremely well and it was not the source of any of these performance limitations. We suspect (but could not prove) that the Celery work distribution and result gathering mechanisms have overheads that limit scalability as the number of available workers grows.


Google has made it very easy to deploy containerized applications using Kubernetes on their cloud container service.  Kubernetes has some excellent architectural features that allow multiple containers to be co-located on a single server within a pod.   We did not have time here to demonstrate this, but their documentation gives some excellent examples.

Celery is an extremely elegant way to do remote procedure calls in Python.  One only needs to define the function and annotate it with a Celery object.   It can then be remotely invoked with an asynchronous call that returns control to the caller.  A future like object is returned.  By calling a special method on the returned object the caller will pause until the remote call completes and the value is provided to the caller.

Our experiments demonstrated that Celery has limited scalability if it is used without modification and with the RabbitMQ message broker.   However, celery has many parameters and it may be possible that the right combinations will improve our results.  We will report any improved results we discover in a later version of this document.

A Quick Dive into Cloud Data Streaming Technology

This is the second part of a two part series about data streaming technology.  The first part is about streaming data in science and this part describes the programming models for several open source cloud based data streaming tools including Spark Streaming, Storm and Heron, Googles Dataflow and Apache Flink.


Cloud computing evolved from the massive data centers that were built to handle the “big data” challenges that confronted the designers of on-line services like search and e-mail.    For the most part, data from these services accrued into large collections in the cloud where they could be analyzed by massively parallel, batch computing jobs.   The types of knowledge derived from this analysis is used to improve the services that generated the data in the first place.   For example, data analysis of cloud system log files can yield valuable information about how to improve performance of the cloud system.   Analysis of user search terms can improve the search index.  Analysis of vast collections of text can be used to create new machine learning based services such as natural language translation services.

While batch analysis of big collections is extremely important, it is often the case that the results of the analysis must be available as soon as the data is available.   For example, analyzing data from instruments that control complex systems, such as the sensors onboard an autonomous vehicle or an energy power grid.  In these instances, the data analysis is critical to driving the system.  In some cases, the value of the results diminishes rapidly as it gets older.  For example, trending topics in a twitter stream is not very interesting if it is no longer trending.   In other cases, the volume of data that arrives each second is so large that it cannot be retained and real-time analysis or data reduction is the only way to handle it.   This is true of some extremely large science experiments.

We refer to the activity of analyzing data coming from unbounded streams as data stream analytics.  While many people think this is a brand new topic, there is a longer history that goes back to some basic research on complex event processing in the 1990s at places like Stanford, Caltech and Cambridge.  These projects created some of the intellectual foundation for today’s systems.

In the paragraphs that follow we will describe some of the recent approaches to stream analytics that have been developed by the open source community and the public cloud providers.    As we shall see there are many factors that determine when a particular technology is appropriate for a particular problem.   While it is tempting to think that one open source solutions can cover all the bases, this may not be the case.  In fact there is an entire zoo of interesting solutions including Spark Streaming which has been derived from the Spark parallel data analysis system,  Twitter’s  Storm system which has been redesigned by Twitter as Heron, Apache Flink from the German Stratosphere project, Googles Dataflow which is becoming Apache Beam which will run on top of Flink, Spark and Google’s cloud.  Other university projects include Borealis from Brandeis, Brown and MIT,  Neptune and the Granules project at Colorado State.   In addition to Google Cloud dataflow other commercial cloud providers have contributed to the available toolkit: Amazon Kinesis,  Azure Streaming and IBM Stream Analytics are a few examples.   In some cases, the analysis of instrument data streams needs to move closer to the source and tools are emerging to do “pre-analysis” to decide what data should go back to the cloud for deeper analysis.   For example, the Apache Quark edge-analytics tools are designed to run in very small systems such as the Raspberry Pi.   A  good survey of many of these stream processing technologies is by Kamburugamuve and Fox.   They cover many issues not discussed here.

Basic Design Challenges of Streaming Systems

Before continuing it is useful to address several basic problems that confront the designers of these system.   A major problem is the question of correctness and consistency.   Here is the issue.  Data in an unbounded stream is unbounded in time.   But if you want to present results from the analytics, you can’t wait until the end of time.   So instead you present results at the end of a reasonable window of time.  For example, a daily summary based on a complete checkpoint of events for that day. But what if you want results more frequently?   Every second? The problem is that if the processing is distributed and the window of time is short you may not have a way to know about the global state of the system and some events may be missed or counted twice.  In this case the reports may not be consistent.  Strongly consistent event systems will guarantee that each event is processed once and only once.    A weakly consistent system may give you approximate results that you can “back up” by a daily batch run on the daily checkpoint file.  This gives you some ground-truth to fall back on if you suspect your on-line rapid analysis reporting is less reliable.   Designs based on combining a streaming engine with a separate batch system is called the Lambda Architecture.  The goal of many of the systems described below is to combine the batch computing capability with the streaming semantics so having a separate batch system is not necessary.

The other issue is the design of the semantics of time and windows.   Many event sources provide a time stamp when an event is created and pushed into the stream.  However, the time at which an events is processed will be later.   So we have event time and processing time.  To further complicate things events may be processed out of event-time order.   This raises the question of how we reason about event time in windows defined by processing time.

There are at least four types of windows.   Fixed Time windows divide the income stream into logical segments that correspond to a specified interval of processing time.  The intervals do not overlap. Sliding windows allow for the windows to overlap.  For example, windows of size 10 seconds that start every 5 seconds.   Per-session windows divide the stream by sessions of activity related to some key in the data.  For example, mouse clicks from a particular user may be bundled into a sequence of sessions of clicks nearby in time. Finally, there is the global window that can encapsulate an entire bounded stream.   Associated with windows there must be a mechanism to trigger an analysis of the content of the window and publish the summary.   Each of the systems below support some windowing mechanisms and we will discuss some of them and provide some concluding remarks at the end.  A great discussion of this and many related issues is found in a pair of articles by Tyler Akidau.

Another design involves the way the system distributes the work over processors or containers in the cloud and the way parallelism is achieved.   As we shall see the approaches to parallelism of the systems described here are very similar.   This paper will not discuss performance or scalability issues.  That is another topic we will return to later.

Finally, we note that operations on streams often resemble SQL-like relational operators.   However, there are difficulties with this comparison.  How do you do a join operation on two streams that are unbounded?  The natural solution involves dividing streams by windows in time and doing the join over each window.  Vijayakumar and Plale have looked at this topic extensively.  The CEDR system from MSR illustrated how SQL-like temporal queries can have a well-defined semantics.

Cloud Providers and the Open Source Streaming Tools.

One way to distinguish the streaming engines is look at the approach to the programming model.  In one camp is an approach based on batch processing as derived from Hadoop or Spark, and the other is based on the pipelined execution of a directed acyclic graph.

Spark Streaming

Spark streaming is a good example that illustrates how one can adapt a batch style processing analytics tool to a streaming case.   The core idea is very simple.  You break the stream into a bunch of little batches.

To illustrate this and a few of the other technologies discussed here we will frame the discussion in terms of a hypothetical science application.   Assume we have a large set of environmental sensor distributed over some area.  Each sensor is connected by WiFi to the internet and each sends a sequence of messages to a cloud address for analysis.  The sensors may be weather, sound, co2, light, motion, vibration or image capture.   The size of the messages may only be a few bytes (time stamp + geo-location + temperature) or a few megabytes of sound or images.    The goal of the project may be environmental restoration where you are interested in the health and development of the flora and fauna in some devastated forest.   Or it may be something like the NSF ocean observatories project which has a large number of wired as well as untethered instruments along the U.S. coastal waters.

Spark streaming works by taking the input from a stream data source aggregator such as

  1. A high throughput publish-subscribe system like RabbitMQ or a more highly scalable system like Apache Kafka or
  2. The Microsoft Azure Event Hub (which we described in another post) or
  3. Amazon Kinesis.

Kinesis is a robust data aggregator in that it can take from many sources at high rates of speed and it will retain the stream records for up to seven days.   A Kinesis producer is a source of a stream of data records.  Each Producer uses a partition key, such as “co2 sensor” that is attached to each data record as it is sent to Kinesis.   Internally Kinesis partitions data into “shards” and each shard can handle up to 2 MB/sec or 1000 records per second of input data.   Kinesis uses the partition key to map your data record to a shard.   The total capacity of your stream is the sum of the capacity of the shards that it contains.

A Kinesis client is the program that pulls the data records from the Kinesis shards and processes it.   You can roll your own client or you can use spark streaming or one of the other systems described here to do the processing.   Spark streaming is just a version of Spark that processes data in batches where each batch is defined by a time interval.   The Spark name for a stream is a DStream which is a sequence or Spark RDDs (Resilient Distributed Dataset).  We covered Spark in a previous article here.  Spark Streaming provides a nice adaptor which will automatically read the data from Kinesis shards and repackage them into DStreams so that they can be consumed by the Spark Engine as shown in Figure 1.


Figure 1.   Environmental sensor analysis stream example.

Each RDD in the DStream is represents the data in a window of time from the shard associated with the instrument stream.   This RDD is processed in parallel by the spark engine.   Another level of parallelism is exploited by the fact that we have DStreams associated with each shard and we may have many of them.  There is one processing thread for each shard. This is nicely illustrated in Figure 2 from the Spark Streaming Guide.


Figure 2.   Spark Streaming with Kinesis  (image from Spark streaming kinesis integration guide)

DStreams can be transformed into new DStreams using the Spark Streaming library.  For example there are the map() and filter() functions that allows us to apply an analysis or filter on a DStream to produce a new one.   DStreams can be merged together by the union() operator or, if there is a common key, such as a timestamp, one can apply a join() operator to create a new DStream with events with the same key tied together.  Because each RDD in the DStream is process completely by the Spark engine, the results are strongly consistent.   There is a very good technical paper from the Berkeley team that created spark streaming and it is well worth a read.

To illustrate spark streaming let’s assume that every second our sensors from figure 1 each transfer a byte array that encodes a json string representing its output every second. Suppose we are interested in receiving a report of the average temperature for each 10 second window at each location where we have a temperature sensor.   We can write a Python Spark Streaming program to do this as follows.   First we need to create a streaming context and Kinesis connector to grab the stream of instrument data.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

sc = SparkContext("....", "sensortest")
ssc = StreamingContext(sc, 10)

ks = KinesisUtils.createStream(
     sc, [Kinesis app name], [Kinesis stream name], [endpoint URL],
     [region name], [initial position], [checkpoint interval],[StorageLevel])

Ks should now be a DStream where each RDD element is the set of events collected by Kinesis in the last 10 seconds.  (Note: I have not yet actually tried this, so some details may be wrong.  This example is adapted from a Kafka version from Jon Haddad and the Kenisis integration guide).

 Next we will need to convert byte array for each sensor record to a json Python dictionary.  From there we will filter out all but the temperature sensors, then using a simple map-reduce compute the average temperature for each sensor (which we identify by its location).   To do this we can use the reduceByKey() method which will give us a sum and count for each sensor.  We can then map that into a new DStream taking the form of a dictionary of sensor locations and average temperature for that interval as follows.

temps = ks.filter(lambda x: x["sensortype"] == "tempsensor")   \
   .map(lambda x: (x["location"], (x["value"], 1))      \
   .reduceByKey(lambda (x1,y1),(x2,y2): (x1+x2,y1+y2))  \
   .map(lambda z: {"location": z[0], "average temp": z[1][0]/z[1][1]])

We may now dump our result DStream temps to storage at the end of the processing of this RDD.   Alternatively,  we can join this DStream with a static DStream to compute a running average temperature.

Storm and Heron: Streaming with a DAG dataflow style.

There are several significant systems based on executing a directed graph of tasks in a “dataflow” style. We will give a brief overview of three of these.  One of the earliest was Storm which was created by Nathan Marz and released as open source by Twitter in late 2011.   Storm was written in a dialect of Lisp called Clojure that works on the Java VM.    In 2015 Twitter rewrote Storm and it is has deployed it under the name Heron which is being released as an Apache project.  The Heron architecture was described in an article in the ACM SIGMOD 2015 conference. Heron implements the same programming model and API as Storm, so we will discuss Storm first and then say a few words about the Heron design.

Storm (and Heron) run “topologies” which are directed acyclic graphs whose nodes are Spouts (data sources) and Bolts (data transformation and processing).   Actually Storm has two programming models. One of these we can call classic and the other is called Trident which is built on top of the classic model.  In both cases Storm (and Heron) topologies are directed acyclic graphs as shown in Figure 3.


Figure 3.   Storm/Heron topology. On the left is the abstract topology as defined by the program and on the right is the unrolled parallel topology for runtime.

The programming model is based on extending the basic spout and bolt classes and then using a topology builder to tie it all together.   A basic template for a Bolt is shown below.   There are three required methods.  The prepare() method is a special constructor that is called when the actual instance is deployed on the remote JVM.  It is supplied with context about the configuration and topology as well as a special object called the OuputCollector which is used to connect the Bolts output to the output stream defined by the topology.   The prepare() method is also where you instantiate your own data structures.

The basic data model for Storm/Heron is a stream of Tuples.  A tuple is just that: a tuple of items where each item need only be serializable.  Some of the fields in a tuple have names that are used for communicating a bit of semantics between bolts.   The method declareOutputFields() is used to declare the name of the fields in a stream.   More on this point later.   The heart of the bolt is the method execute(). This is invoked for each new tuple that is sent to the bolt and it contains the computational core of the bolt.   It is also where results from the Bolts process is sent to its output streams.

The main programming API for Storm is Java, so we will touch briefly on that here. There are several base classes and styles of bolts, but this is the basic template.  One of the specialized Bolt classes is for sliding and tumbling windows.  Spouts are very similar classes, but the most interesting ones are the Spouts that connect to event providers like Kafka or EventHub.

public class MyBolt extends BaseRichBolt{
	private OutputCollector collector;
	public void prepare(Map config, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	public void execute(Tuple tuple) {
		*execute is called when a new tuple has been delivered.
		*do your real work here.  for example,
		*create a list of words from the tuple and then emit them
		*to the default output stream.
		for(String word : words){
			this.collector.emit(new Values(word));
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		* the declarer is how we declare out output fields in the default
		* output stream.  you can have more than one output stream 
		* using declarestream. the emit() in execute needs to identify
		* the stream for each output value.
		declarer.declare(new Fields("word"));

The topology builder class is to build the abstract topology and provide instructions for how the parallelism should be deployed.   The key methods of the build are setBolt() and setSpout().  These each take three arguments: the name of the spout or bolt instance, an instance of your spout or bolt class and an integer that tells the topology how many tasks will be assigned to execute this instance.  A task is a single thread that is assigned to a spout or bolt instance.   This is the parallelism number.   The code below shows how to create the topology of Figure 3.

TopologyBuilder builder = new TopologyBuilder(); 
builder.setSpout("Spout", new MySpout(), 2); 
builder.setBolt("BoltA", new MyBoltA(), 4).shuffleGrouping("spout"); 
builder.setBolt("BoltB", new MyBoltB(), 3)
                      .fieldsGrouping("BoltA", new Fields("word"));
builder.setBolt("BoltC", new MyBoltC(), 2).shuffelGrouping("spout") 

Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(“mytopology”, config, builder.createTopology());

As you can see, there are 2 tasks for the spout, 4 for bolt A, 3 for bolt B and 2 for bolt C.   Note that the 2 tasks for the spout are sent to 4 for Bolt B.   How do we partition the 2 output streams over the 4 tasks?  To do this we use a stream grouping function.   In this case we have used Shuffle grouping which randomly distributed them.   In the second case we map the 4 outputs streams from Bolt A to the 3 tasks of bolt B using a field grouping based on a field name.   This makes sure that all tuples with the same field name are mapped to the same task.

As mentioned above the Twitter team has redesigned storm as Heron.   The way a topology is executed is that a set of container instances are deployed to manage it as shown in Figure 4.


Figure 4.   Heron architecture detail.

The topology master coordinates the execution of the topology on a set of other containers that each contain a stream manager and heron instance processes which execute the tasks for the bolts and spouts.  The communication between the bolts and spouts are mediated by the stream manager and all the stream managers are connected together in an overlay network.  (The topology master makes sure they are all in communication.)  Heron provides great performance improvements over Storm.  One improvement of the architecture is better flow control of data from spouts when the bolts are falling behind.  Please look at the full paper for more detail. Some of the best Storm tutorial material comes from Michael Noll’s blog (here is a good example).


As mentioned Storm has another programming model that is implemented on top of the basic spout bolt library.   This is called Trident.     The classic Storm programming model is based on the topology instance.  You construct the flow graph by adding spouts and bolts.   It is building a graph by adding the nodes. Trident is somewhat of a dual concept: it is about the edges.   The central figure in Trident is the stream. The first thing to note is that trident processes all events in a stream in batches and Trident works very hard to make sure that each tuple is processed once and only once.  However, in real life failure happens and retries may be required.  Since tuples originate from spouts defining the retry semantics must be closely tied to the spout.  Trident has several configurations for spout depending on the semantics required.  Some are transactional, meaning every batch has a transaction identifier (txid) and a tuple does not appear in any other batch.  Using the txid we can make sure we never process a tuple more than once.  If the tuple caused the processing of the batch to fail, we can re-issue the entire batch.   Regular Storm spouts are non-transactional.   Another type of spout is “opaque transactional”  the third category which guarantees that each tuple is processed exactly once but, if not, it may appear in another batch.

Let’s begin by declaring a trivial artificial (non-transactional) spout that has a single word in each tuple called “name”.   I want the batch size to be 50 tuples.   The code will look something like this.

TridentTopology topology = new TridentTopology();  
FixedBatchSpout spout = new FixedBatchSpout(new Fields("name"), 50, 
                                 ... the word list here ... )      
Stream str1 = topology.newStream("spout", spout)

Now that we have a stream we can start making transformations to it.   For example, we can expand the tuple so each tuple contains the word and also the number of characters in the word.   We can do this by creating a function object that takes the string from the tuple and emits its length.

public static class Getlength extends BaseFunction {
  public void execute(TridentTuple tuple, TridentCollector collector) {
    collector.emit(new Values(tuple.getString(0).length()));

We apply this function to the stream to create a new stream.

Stream str2 = str1.each(new Fields("name"), new Getlength, new Fields("length"));

Notice that the function only emitted the length.   The each() function has the strange property that it appends new field to the end of the tuple, so now each tuple has labels [“name”, “length”].    Next suppose we only want names from a particular list mynames and we want to drop the others.   We will write a filter function to do that and then create a new filtered stream.

public static class NameFilter extends BaseFilter {
  List nameslist

  public NameFilter(List names) {
    this.namelist = names;
  public boolean isKeep(TridentTuple tuple) {
    return namelist.contains(tuple.getString(0));
Stream str3 = str2.each(new Fields("name","length"), new NameFilter(mynames)); 

Now let’s partition the stream by the name field and compute the counts of each. The result is of type TridentState.

TridentState counts = 
   str3.groupBy(new Fields("name"))
       .persistentAggregate(new MemcachedState.opaque(serverLocations), 
	                     new Count(), new Fields("count"))

The details about how the data is sent to the databases behind the memcash are not important here by the idea is we can now keep track of the aggregate state of the stream.

The final thing we should look at is how parallelism is expressed.   This is actually fairly simple annotations to the stream.   Putting all the steps above into one expression we can show how this is done.

TridentState counts = 
topology.newStream("spout", spout)
        .each(new Fields("name"), new Getlength, new Fields("length"))
        .each(new Fields("name","length"), new NameFilter(mynames))
        .groupBy(new Fields("name"))
        .persistentAggregate(new MemcachedState.opaque(serverLocations), 
	                      new Count(), new Fields("count"));   

This version creates two instances of the spout and five instances of the Getlength() function and uses the random shuffle to distribute the tuple batches to the instances.   There is much more to classic Storm and Trident and there are several good books on the subject.

Google’s Dataflow and Apache Beam

The most recent entry to the zoo of solutions we will discuss is Apache Beam (now in Apache’s incubation phase.) Beam is the open source release of the Google Cloud Dataflow system.  Much of what is said below is a summary of their document. An important motivation for Beam (from now on I will use that name because it is shorter than writing “Google Cloud Dataflow”) is to treat the batch and streaming cases in a completely uniform way.   The important concepts are

  1. Pipelines – which encapsulates the computation in the model.
  2. PCollections – the data as it moves through a Pipeline.
  3. Transforms – the computational transformations that operate on PCollections and produce PCollections
  4. Sources and Sinks.


The idea is that a PCollection can be either a very large but fixed size set of element or a potentially unbounded stream.   The elements in any PCollection are all of the same type, but that type maybe any serializable Java type.   The creator of a PCollection often appends a timestamp to each element at creation time.   This is particularly true of unbounded collections. One very important type of PCollection that is used often is the Key-Value PCollection KV<K, V> where K and V are the Key and Value types.   Another important thing to understand about PCollections is that they are immutable.  You can’t change them but you can use transforms to translate them into new PCollections.

Without going into the details of how you initialize a pipeline, here is how we can create a PCollection of type PCollection<String> of strings from a file.

Pipeline p = Pipeline.create(options);
PCollection pc = 

We have used the pipeline operator apply() which allows us to invoke the special transform TextIO to read the file.   There are other pipeline operators, but we will not discuss many of them.  Now, in a manner similar to the way Trident uses the each() operator to create new Trident streams, we will create a sequence of PCollections using the apply() method of the PCollection class.

There are five basic transform types in the library.   Most takes a built-in or user defined function object as an argument and applies the function object to each element of the PCollection to create a new PCollection.

  1. Pardo –  apply the function argument to each element of the of the input PCollection. This is done in parallel by workers tasks that are allocated to this activity.   This is basic embarrassingly parallel map parallelism
  2. GroupByKey – apply this to a KV<K,V> type of PCollection with group all the elements with the same key into the a single list, so the resulting PCollection is of type KV<K, Iterable<V>>.    In other words, this is the shuffle phase of a map-reduce.
  3. Combine – apply an operation that reduces a PCollection to a PCollection with a single element. If the PCollection is windowed the result is a Pcollection with the combined result for each window.   Another type of combining is for key-grouped PCollections.
  4. Flatten – combine PCollections of the same type into a single PCollection.
  5. Windowing and Triggers – These are not transformations in the usual sense, but defining mechanisms for the window operations.

To illustrate some of these features let’s redo the environmental sensor example again but we will compute the average temperature for each location using a sliding window.    For the sake of illustration, we will use an imaginary pub-sub system to get the events from the instrument steam and let’s suppose the events are delivered to our system in the form of a Java object from the class InstEvnt.  That would be declared as follows.

static class InstEvent{
	@Nullable String instType;
	@Nullable String location;
	@Nullable Double reading;
	public InstEvent( ....)
	public String getInstType(){ ...}
	public String getLocation(){ ...}
	public String getReading(){ ...}

This class definition illustrates how a custom serializable type looks like in Beam. We can now create our stream from our fictitious pub-sub system with this line.

PCollection input = 

We next must filter out all but the “tempsensor” events. While we are at it, let’s convert the stream so that the output is a stream of key-value pairs corresponding to (location, reading). To do that we need a special function to feed to the ParDo operator.

static class FilterAndConvert extends DoFn<InstEvent, KV<String, Double>> {
    public void processElement(ProcessContext c) {
         InstEvent ev = c.element();
	  if (ev.getInstType() == "tempsensor")
	     c.output(KV<String, Double>.of(ev.getLocation(), ev.getReading));

We Now we can apply the Filter and Convert operator to our input stream. Let us also create a sliding window of events of duration five minutes that is created every two minutes. We note that the window is measured in terms of the timestamps on the events and not on the processing time.

PCCollection<KV<String, Float>> reslt = input
.apply(Pardo.of(new FilterAndConvert())
.apply(Window.<KV<String, Double>> into(SlidingWindows.of(

Our stream reslt is now a KV<String,Double> type and we can apply a GroupByKey and Combine operation to reduce this to a  KV<String,Double> where each location key maps to the average temperature.   To make life easy Beam has a number of variations of this simple map-reduce operation and one exists that is perfect for this case:  Mean.perKey() which combines both steps in one transformation.

PCollection<KV<String, Double>> avetemps
	= reslt.apply(Mean.<String, Double>perKey());

Finally we can now take the set of average temperatures for each window and send them to an output file.

PCollection outstrings = avetemps
	.apply(Pardo.of(new KVToString())

The function class KVToString()  is one we define in a manner similar to the FilterAndConvert class above. There are two things to notice in what happened above.   First, we have used an implicit trigger that generates the means and output at the end of the window.   Second, note that because the windows overlap, events will end up in more than one window.

Beam has several other types of triggers.   For example, you can have a data driven trigger looks at the data as it is coming and fires when some condition you have set is met.   The other type is based on a concept introduce by Google Dataflow called the watermark.  The idea of the watermark is based on event time.    It is used to emit results when the system estimates that it has seen all the data in a given window. There are actually several very sophisticated ways to define triggers based on different ways to specify the watermark.  We won’t go into them here and we refer you to the Google Dataflow documents.

Apache Flink

Flink is now one of the “runners” for Beam because it is possible to implement the Beam semantics on top of Flink.   Many of the same core concepts exist in Flink and Beam.  As with the other systems, Flink takes input streams from one or more sources, which are connected by a directed graph to a set of sinks.

Like the others, the system is based on a Java virtual machine and the API is rendered in Java and Scala.  There is also an (incomplete) Python API where there is also a similarity to Spark Streaming.   To illustrate this, we can compare the Flink implementation of our instrument filter for figure 1 to the Spark Streaming example above.

The Flink Kinesis Producer is still a “work in progress”, so this code was tested by reading a stream from a CSV file.  The Flink data types do not include the Python dictionary/Json types so we use here a simple tuple format.   Each line of the input stream looks like

instrument-type string, location string, the word "value", floating value

For example,

tempsensor, pike street and second ave, value, 72.3

After reading from the file (or Kinesis shard) the records in the stream data are now 4-tuples of type (STRING, STRING, STRING, FLOAT). The core of the Flink version of the temperature sensor averager is shown below.

class MeanReducer(ReduceFunction):
    def reduce(self, x, y):
        return (x[0], x[1], x[2], x[3] + y[3], x[4] + y[4])

env = get_environment()
data = env.add_source(FlinkKinesisProducer( … ) … )

resuts = data \
    .filter(lambda x: x[0]=='tempsensor') \
    .map(lambda x: (x[0], x[1], x[2], x[3], 1.0)) \
    .group_by(1) \
    .reduce(MeanReducer()) \
    .map(lambda x: 'location: '+x[1]+' average temp %f' % (x[3]/x[4]))

The filter operation is identical to the Spark Streaming case.   After filtering the data we turn each record into a 5-tuple by appending 1.0 to the end of the 4-tuple.  The group_by(1) and reduce using the MeanReducer function.  The group_by(1) is a signal to shuffle these so that they are keyed by field in position 1 which corresponds to the  location string and then we apply the reduction to each of the grouped tuple sets. This operation is the same as the reduceByKey function in the Spark Streaming example.   The final map converts each element to a string that gives the average temperature for each location.

This example does not illustrate is Flink’s windowing operators, which are very similar to Beam’s, nor does it illustrate the underlying execution architecture.    In a manner similar to the other systems described here, Flink parallelizes the stream and tasks during execution.   For example, our temperature sensor example has a logical view as tasks which may be executed in parallel as shown in Figure 5.


Figure 5.   Flink logical task view and parallel execution view.

The Flink distributed execution engine is based on a standard master worker model.   The Flink source program is compiled into an execution data flow graph and sent to a job manager node by a client system.   The job manager executes the stream and transformations on remote Java VMs which run a task manager.  The task manager partitions its available resources into task slots where the individual tasks defined by the graph execution nodes are assigned.  The job manager and task managers manage the data communication streams between the graph nodes.   This is all very nicely illustrated by a figure from the Apache Flink documentation.   This documentation also describes the Flink windowing and other details of the implementation and programming model.

Summary and Conclusions

We have looked at four different systems, Spark Streaming, Storm/Heron, Google Dataflow/Beam and Flink.  Each of these has been used in critical production deployments and proven successful for their intended applications.  While we have only illustrated each with a trivial example we have seen that they all share some of the same concepts and create pipelines in very similar ways.   One obvious difference is in the way Storm/Heron explicitly constructs graphs from nodes and edges and the others use a very functional style of pipeline composition.    (Storm does have the Trident layer that allows a functional pipeline composition but it is not clear if this will be supported in the Heron version.)

Conceptually the greatest difference arises when comparing Spark Streaming to the others and, in particular, Beam.    Akidau and Perry make a very compelling argument for the superiority of the Beam model in comparison to Spark Streaming.   They make a number of important points.   One obvious one is that Spark is a batch system for which a streaming mode has been attached and Beam was designed from the ground up to be streaming with obvious batch capabilities.  The implication is that the windowing for Spark is based on the RDD in the DStream and this is clearly not as flexible as Beam windows.    A more significant point revolves around Beam’s recognition that event time and processing time are not the same.   Where this becomes critical is in dealing with out of order events, which are clearly possible in widely distributed situations.   Beam’s introduction of event-time windows, triggers and watermarks are a major contribution and clarifies a number of important correctness issues when events are out of order while still allowing you to get approximate results in a timely manner.

In terms of performance of these systems, we will leave it to another time to address this issue.    In fact, it would be a very interesting exercise to create a set of meaningful benchmarks that each system can be measured against.   It would be a non-trivial exercise to design the experiments, but well worth the effort.

Fun with Recurrent Neural Nets: One More Dive into CNTK and TensorFlow

In a previous article I set about comparing Microsoft’s Computational Network Took Kit for deep neural nets to Google’s TensorFlow.  I concluded that piece with a deep dive into how recurrent neural nets (RNNs) were represented in each system.   I specifically went after the type of RNNs known by the strange name of Long Short-Term Memory (LSTM) networks.   I wanted to learn a bit more about how these systems worked.  I decided to treat them like laboratory specimens so that I could poke and prod them to see what I could learn and what I could get them to do.  This article is essentially my lab notebook.  Warning:  With the exception of a bit toward the end, this is not technically very deep.   In fact, I did not discover anything that has not been extensively reported on elsewhere.   But I learned a lot and had some fun.   Perhaps it will be of interest to students just starting to learn about this subject.   Before I get to far into this, I would like to mention that I recently discovered an excellent series of tutorials on RNNs by Denny Britz that are definitely worth reading.

CNTK’s LSTM and Hallucinating Bloomberg Financial News

One of the many good examples in CNTK is language modeling exercise in Examples/Text/PennTreebank.   The documentation for this one is a bit sparse and the example is really just of a demo for how easy it is to use their “Simple Network Builder” to define a LSTM network and train it with stochastic gradient decent on data from the Penn Treebank Project.   One command starts the learning:

cntk configFile=../Config/rnn.cntk

Doing so trains the network, tests it and saves the model.  However, to see the model data in an easily readable form you need a trivial addition to the configfile: you need to add the following dumpnode command to put a dump file a directory of your choosing.

    action = "dumpnode"
    modelPath = "$ModelDir$/rnn.dnn"
    outputFile = "$OutputDir$/modeltext/dump"

This creates a big text file with all the trained data.   To experiment with the trained model, I decided to load it into a python notebook and rebuild the LSTM network from the defining equations.  From the CNTK book those equations are


I was pleased to see that the dumped model text had the same W and b tensors names as in the equations, so my job was relatively easy.    I extracted each of the tensors and saved them into a file (I will make these available in Github).   The python code for the LSTM based on the equations above is below.

def rnn(word, old_h, old_c):
      Xvec = getvec(word, E)

      i = Sigmoid(np.matmul(WXI, Xvec) + 
                  np.matmul(WHI, old_h) + WCI*old_c + bI)
      f = Sigmoid(np.matmul(WXF, Xvec) + 
                  np.matmul(WHF, old_h) + WCF*old_c + bF)
      c = f*old_c + i*(np.tanh(np.matmul(WXC, Xvec) + 
                               np.matmul(WHC, old_h) + bC))
      o = Sigmoid(np.matmul(WXO, Xvec)+ 
                  np.matmul(WHO, old_h)+ (WCO * c)+ bO)
      h = o * np.tanh(c)
      #extract ordered list of five best possible next words
      q = h.copy()
      q.shape = (1, 200)
      output = np.matmul(q, W2)
      outlist = getwordsfromoutput(output)
      return h, c, outlist

As you can see, this is almost a literal translation of the equations.    The only different is that this has as input a text string for the input word.  However the input to the equations is a vector encoding of the word.  The model generates the encoding matrix E which has the nice property that the ith column of matrix corresponds to the word in the ith position in the vocabulary list.  The function getvec(word, E) takes the embedding tensor E, and looks up the position of the word  in the vocabulary list and returns the column vector of E that corresponds to that word.   The output of one pass through the LSTM cell is the vector h.  This is a compact representation of the words likely to follow the input text to this point.  To convert this back into “vocabulary” space we multiply it by another trained vector W2.  The size of our vocabulary is 10000 and the vector output is that length.  The ith element of output represents the relative likelihood that that ith word is next word to follow the input so far.  Getwordsfromoutput simply returns the top 5 candidate words in order of likelihood.

Before going further, it is worth looking closer at the properties of the word embedding matrices E and W2.   There is a fascinating paper by  Mikolov, Yih and Zweig entitled “Linguistic Regularities in Continuous Space Word Representations” where they suggest that the embedding space for word has several interesting properties.   I decided to investigate that.   Their point is that words that are similar in a linguistic sense will be nearby in the embedding space.   For example, present tense verbs should be near other present tense verbs and singular nouns should be near each other, etc.   I decided to try that.  However, there are two embedding mappings.  One is based on the tensor E and the other based on the W2 tensor.   E has dimension 150 by 10000 and W2 is 200 by 10000.  The difference in dimensionality are because of arbitrary decisions made in defining the hidden layers in the network.  But both represent word imbeddings.  I experimented with both.  I wrote a function getnear(word, M) which takes a word and looks for the 5 most nearby words in the space where M is transpose of either E or W2. (I used cosine distance as the metric.) Verb tense locality and noun plurals worked best in the W2 space as illustrated below.


These are only illustrations.  For a deeper statistical analysis look at the Mikolov paper.   A more interesting conjecture from their study was that there may be some linearity in these embedding that might allow one to try simple analogies of the form “A is to B, as C is to __”.   Their idea is that if a, b and c are the vector embeddings of the words A, B and C, then the embedding of “__” may be computed as d = c + (b-a).  So I wrote a little function AistoBasCisto(A, B, C) that does this computation.   In the results I had to delete A, B and C from the candidate answers because they came up often as nearby.   In this case my results were less encouraging.  It worked better with the E space than with W2.   For example, for E we have


And for the W2 space the results looked like


As you can see the “run running walk __” example failed with E but was close, but still incorrect, with W2.

You may wonder why these particular words came up.  The data we used to train the system came from a small subset of the Penn TreeBank collection as provided in the CNTK package.   It is heavily dominated by financial news items.    This explains why the plural of person could be managers or customers.   A larger vocabulary and data collection would be needed to truly test the analogy by linearity conjecture.

Now on to hallucinating the financial news.

Now to test the LSTM as a truly recurrent network.   We provide the network with a starting word and let it suggest the next word.  And then we repeat this process constructing a “sentence”.  In the code below we randomly pick one of the top three suggest by the network as the next word.

c = np.zeros(shape = (200, 1))
h = np.zeros(shape = (200, 1))
output = np.zeros(shape = (10000, 1))
word = 'my'
sentence= word 
for _ in range(40):
    h, c, outlist = rnn(word, h, c)
    word = outlist[randint(0,3)]
    sentence = sentence + " " +word
print sentence+"."

In this case we start with the word “my” and let it generate a 40 word sentence.  The output is

my new rules which would create an interest position here unless there should prove   signs of such things too quickly although the market could be done better toward paying further volatility where it would pay cash around again if everybody can.

This is a great example of hallucinating financial news. Let’s try it again starting with the word “president”.

president michael de brown wrote himself against democratic union law which represents an emergency relief agreement during a new york state district or early tuesday before july after a federal government agency created early losses without mr. krenz or perhaps.

Now with the word “the”.

the company reported third-quarter results reflecting a number compared between N barrels including pretax operating loss from a month following fiscal month ending july earlier compared slightly higher while six-month cds increased sharply tuesday after an after-tax loss reflecting a strong.

The “sentences” end rather abruptly because of the 40 word limit I set.  If you let it go it will run on until the state vector for the sentence seems to break down.     Try this yourself.  To make it easy to play with this example, I have put the code in GitHub.  The trained model text files are in OneDrive and is a zipped file of about 50MB.

There are many more excellent and fun examples.  Andrej Karpathy has a great blog article showing how RNNs can mimic Shakespeare, or Latex science articles and many more.

TensorFlow’s seq2seq French Lesson.

One of the most interesting examples in the TensorFlow tutorials is an English to French translator.  As with the CNTK example it was trivial to start the translator learning following the instructions in the tutorial.   After letting this run for about a week, I wanted to see how well it would do.     As with the CNTK example, I created a Jupyter IPython notebook and loaded the trained model.   I will explain how that was done in more detail below but, for now, I will show how we can invoke it to test its translation ability.    This particular trained model was not very big and with a relatively small data set, so I didn’t expect much.    In fact, as you will see, to a French speaker it is a disaster.   On the other hand, it learned more French in a week of training that I did in three semesters of French in college.   (For full disclosure, this was my weakest subject in college and my grade was a hard-fought “C” each semester.)

The code below demonstrates how the model is invoked.   First you have to tokenize the input sentence.  The algorithm uses a system of buckets of fixed sizes to make the training more efficient.  You next find the smallest bucket that can contain your sentence and convert this to the input vector list needed by the model.   The step function takes a Tensorflow session, the input vector list and a null list of decoder inputs (to be explained later) and generates a list of vectors as outputs.  Each vector represents the likelihood that individual vocabulary words are the correct word at that point in the translated sentence.   We pick the most likely and print the sentence.

sentence = " I am not the president of France. "

token_ids = data_utils.sentence_to_token_ids(sentence, en_vocab)
      # Which bucket does it belong to?
bucket_id = min([b for b in xrange(len(_buckets))
                 if _buckets[b][0] > len(token_ids)])
      # Get a 1-element batch to feed the sentence to the model.
encoder_inputs, decoder_inputs, target_weights = 
    model.get_batch({bucket_id: [(token_ids, [])]}, bucket_id)

_, _, output_logits = model.step(sess, encoder_inputs, decoder_inputs, 
                                 target_weights, bucket_id, True)
outputs = [int(np.argmax(logit, axis=1)) for logit in output_logits]
print(" ".join([rev_fr_vocab[output] for output in outputs]))

Je ne suis pas le président de la France .

This example is not too bad.  However, if I ask

“In which city does the president of France live?”

I get

“Dans quelle ville le président de la France ?”.

This is not exactly correct.    If I feed this into Google translate and ask what this means in English I get “In which city the President of France?”.   If I give it this one,

What is the name of a good restaurant?

The system responds with

Quel est le nom d’une bonne bonne bonne ?”

Which translates back to “What is the name of a good good good?”.  Probably not very helpful on the streets of Paris.   It turns out restaurant is not in the tiny training vocabulary used here.   Finally, given this sentence

” The article stated that the President of the United States is here today. “

The translator returned

Le paragraphe a indiqué que le président des États-Unis est aujourd ‘ hui aujourd ‘ hui .”

The end of this reply is “is today today”.    As I said, this is still much better than I could do with my college French.   However, as you can see from the previous two examples, our little translator runs out of gas at the end of sentences and tends to repeat itself.   You should try this yourself.   I have put the notebook file in github or you can execute these directly from the Tensorflow python code.   All you need to do is train the model from TensorFlow and run the notebook with the path to the model output directory.

While loading and using the trained model was easy and fun, understanding the seq2seq model used in this example takes a bit of work.   So this part of this article will get a bit more technical.

The TensorFlow translate program is based on a sequence-to-sequence model constructed from more primitive recurrent neural nets.   By sequence-to-sequence we mean a network that takes a sequence as input and produces a sequence as output.   It consists of two parts: an “encoder RNN” and a “decoder RNN” as shown in Figure 1 below.


Figure 1.   A sequence-to-sequence RNN English to French translator with the encoder and decoder unrolled to show the flow of messages.

In this figure the RNNs are “unrolled” to show the flow of messages.  The state vector at the end of the encoder is a vector embedding of the input sentence.   This state vector is used to start the decoder along with a “GO” token.  The diagram shows the network after it has been trained.   During training the inputs to the decoder are the French version of the English sentences.   I won’t talk about the training here because is enough to try to understand how this works.  Before I go any further I want to point you to some important papers.  Sutskever, Vinyals and Le published an early important paper on sequence to sequence models that is worth reading.

To understand how it is built the network we need to dig into the code a bit. The building blocks are a set of classes of base type RNNCell with specializations

  1. BasicRNNCell
  2. GRUCell
  3. BasicLSTMCell
  4. LSTMCell
  5. OutputProjectionWrapper
  6. InputProjectionWrapper
  7. EmbeddingWrapper
  8. MultiRNNCell

The ones we will see used here are GRUCell, MultiRNNCell and EmbeddingWrapper.   We discussed LSTMCell in our previous article but we need to look at GRUCell here because that is the one used in the example.   The GRUCell is a “Gated Recurrent Unit” invented by Cho et. al.  in “Learning Phrase Representations using RNN Encoder–Decoder for Statistical Machine Translation”.  The “gated” phrase comes from the way the output is defined as coming mostly from the previous state or from a combination with the new input.   The diagram below tries to explain this a bit better.


Figure 2.   GRU wiring diagram

It also helps to see it in terms of the defining equations.


The quantity ut  is a gate vector.  Recall the sigmoid function switches sharply between one and zero.  So when ut is one then h is just a copy of the old h and we are ignoring the input x it is based on the value ct.   The gate rt is determines how much of the old state goes into defining the value of ct.   To understand how this is encoded in TensorFlow you need to understand the function.

linear(args, output_size, bias, bias_start=0.0, scope=None)

where args is a list of tensors each of size batch x n .   Linear computes sum_i(args[i] * W[i]) + bias where W is a list of matrix variables of size n x outputsize and bias is a variable of size outputsize.   In the equations above we have represented linear algebra as a matrix times a column vector.   Tensorflow uses the transpose notation:   row vector on the left times the transpose of the matrix.   So in linear the args are a list of row vectors.   Where is the matrix W and offset b?  This is fetched from memory based on the variable current scope, because W and b are variable tensors that are learned values.   If you look at the first two equations above, you will see they are almost identical.   In fact, we can write them as


If you transpose the last one from column form into row form you can now compute both with one invocation of the linear function.   The code for the GRUCell is below.   As you can see they have encoded one pass through the GRU cell with only two matrix vector multiplies.   You can also see that the way the variable scope is used to pick out the W’s for the gates and the W for the state/output.  Another point to remember that an invocation of the “__call__ function operator does not cause the tensor to execute the operation, rather it builds the graph.

class GRUCell(RNNCell):
  def __init__(self, num_units):
    self._num_units = num_units
   ... stuff deleted ....
  def __call__(self, inputs, state, scope=None):
    with vs.variable_scope(scope or type(self).__name__):  
      with vs.variable_scope("Gates"):  # Reset gate and update gate.
        # We start with bias of 1.0 to not reset and not udpate.
        r, u = array_ops.split(1, 2, linear([inputs, state],  
                               2 * self._num_units, True, 1.0))
        r, u = sigmoid(r), sigmoid(u)
      with vs.variable_scope("Candidate"):
        c = tanh(linear([inputs, r * state], self._num_units, True))
      new_h = u * state + (1 - u) * c
    return new_h, new_h

The top level class we invoke for building our model is seq2seqModel.    When we create an instance of this class it sets in motion a set of flowgraph building steps.  I am going to skip over a lot of stuff and try to give you the big picture.  The first graph building step in the initialization of an instance of this object is

# Create the internal multi-layer cell for our RNN.
    single_cell = rnn_cell.GRUCell(size)
    if num_layers > 1:
      cell = rnn_cell.MultiRNNCell([single_cell] * num_layers)

As you can see we are creating a GRU cell graph generator instance and making a list of num_layers of this object and passing that to the constructor for MultiRNNCell.   In our case, num_layers has been set to 2.   MultiRNNCell is pretty easy to understand.   It builds a graph consisting of a stack of (in this case) GRU cells where the output state vector of each level is fed to the input of the level above it.  This new compound cell has an output that is the state of the top sub-cell and whose output state is the concatenation of the output states of all the sub-cells.

The next part is not so easy to follow.    We will take our MultiRNNCell graph builder and use it to create and encoder and a special decoder.    But first we must make a short digression.

Paying Attention

There is a problem that is encountered in the sequence-to-sequence model.   The encoder encodes the entire sentence into a state vector which is used by the decoder as its input.    That state vector is an abstract representation of our entire sentence as a single point in a very high dimensional space.    The decoder has been trained to use that point as a starting point to unroll a translated version of the sentence.   I find the fact that it works at all to be rather remarkable.   It is as if the decoder takes the English state vector and transforms it into a similar point in “French” space.

Unfortunately, the longer the input sentence, the more difficult it is to decode it.   How much information can we pack into one point?   The problem is that at each decoding step we need a little bit more information than is provided by the state vector as it passes through the decoder loop.   The idea used here is to help the decoder by providing it a bit of focus derived from the input sequence at each stage of the decoder loop.  This is generally referred to as “attention”, as in “at this step of decoding please pay attention to what the encoder was doing here”.   Bahdanau, Cho and Bengio had an early paper about this that used a bidirectional pass over the input sequence. As they put it, they wanted to “automatically (soft-)search for parts of a source sentence that are relevant to predicting a target word”.  (Denny Britz has a lovely blog article about attention and describes several fascinating applications.  It is  well worth  reading.) The mechanism for attention used in the TensorFlow example is based on a paper by Vinyals et. al. and we will follow that one here.   The key idea is rather than take the single final state vector from the encoder, let’s collect the state vectors at each stage of the encoder.   Following Vinyals, let the encoder state vectors for each input word be


And let the decoder state vectors be


Then for each decoder time step t compute


Where the Ws are learned matrices and v is a learned vector.    Then as the input to the t+1 state vector of the decoder we use the concatenation


The idea is this new state vector at time t+1 puts much more focus on the corresponding words in in the encoder string. This all happens in a function called seq2seq.attention_decoder that is called in another constructor function seq2seq.embedding_attention_seq2seq that wraps and an embedding around a graph generated by our MultiCellRNN graph builder to generate the final decoder graph.   These graphs are all stitched together in the Seq2SeqModel constructor.  It is fair to say that there are many levels of abstraction here that are used to build the decoder and link it to the encoder.  I am leaving out many details that are critical for the training such as the part that implements the bucket handler.   The final graph, in its most abstract form is pictured below in figure 3.


Figure 3.  The sequence to sequence translator is based on a two level GRU cell encoder and an attention-augmented two level GRU cell decoder.  The input English is entered in reverse order as an optimization

Final Thoughts

As I have said above, I have not included all the details of how the seq2seq translator is put together, but I tried to include the highlights that I found most interesting.   I encourage you to dive into the code and discover the rest.   You will likely find some errors in what I described above.   If so, please let me know.

There is really a lot of exciting results that have come out in the last few years relating to RNNs.   For example, Lei Ba, Mnih and Kavukcuoglu demonstrated that RNNs with attention can be applied to interesting image analysis challenges, such as reading the house number from a street scene.   In “Teaching Machines to Read and Comprehend” Hermann et. al. excellent paper demonstrate the use of an attentive RNN build to answer simple questions about text.   I personally don’t think any RNN can pass a Turing test yet, so it ain’t A.I.  But these little statistical machines are certainly wonderful mimics and they can speak better French than I.