Category Archives: computer science

Observations About Streaming Data Analytics for Science

I recently had the pleasure of attending two excellent workshops on the topic of streaming data analytics and science.  A goal of the workshops was to understand the state of the art of “big data” streaming applications in scientific research and, if possible, identify common themes and challenges.  Called Stream2015 and Stream2016, these meetings were organized by Geoffrey Fox, Lavanya Ramakrishnan and Shantenu Jha.   The talks at the workshop were from an excellent collection of scientists from universities and the national labs and professional software engineers who are building cloud-scale streaming data tools for the Internet industry.

First it is important to understand what we mean by streaming data analytics and why it has become so important.   Most scientific data analysis involves “data at rest”: data that was generated by a physical experiment or simulation and saved in files in some storage system.   That data is then analyzed, visualized and summarized by various researchers over a period of time.   The sizes of scientific data archives are growing and the number of disciplines creating new ones is expanding.    New organizations like the Research Data Alliance have been created to help coordinate the development and sharing of scientific data collections.   However not all data is “at rest” in this sense.   Sometimes data takes the form of an unbounded stream of information.   For example, the continuous stream of live instrument data from on-line sensors or other “internet of things” (IoT) devices.  Even computer system logs can produce large continuous streams.  Other examples include data from continuously running experiments or automated observatories such as radio telescopes or the output of DNA sequencers.

In some cases, the volume and rate of generation is so large, we cannot keep the data at rest for very long.  The observed data from the Square Kilometer Array (SKA) will be so large that that it is too expensive to contemplate keeping it and therefore it must be immediately processed into a reduced stream.  An important aspect of this large scale streaming scientific data analysis is computational steering: the need for a human or smart processes to analyze the data stream for quality or relevance and then to make rapid adjustments to the source instruments or simulations. The report from the first Streams workshop describes many of these cases.  For example, autonomous vehicles processing radar data streams for oil and gas exploration or modern avionics systems that have to recognize bad data in real-time.  Data coming from superconducting tokamak experiments must be managed and analyzed in real-time to adjust the control settings, and prevent catastrophic events.

This article has two parts.   In this part we will look at the issue of streaming data in science and then present some of the lessons I gathered from the workshops.  The workshop organizers have not released their final report for Stream2016, so their conclusions may be vastly different from my own.   In the second part we take a deep dive into the cloud centric data analytics tools to try to understand the landscape of ideas and approaches that have evolved in recent years in this community.

There are many factors that determine when a particular technology is appropriate for a particular problem.  Streaming data analytics is an interesting case that illustrates how diverse challenges and requirements have led software designers to build vastly different solutions.   For example, the software built to manage the vast Twitter data streams just can’t handle the analytic problems encountered when steering high-end electron microscopy experiments.  It is worth trying to understand why this is the case.

We can divide the spectrum of streaming data scenarios into three basic categories

  1. The data streaming challenges that confront large enterprises when dealing the data from millions of users of Internet enabled devices.   These might be the “click-streams” from browsers to search engines where it is critical to understand user sentiment or where to place advertisements based on previous user queries.  The stream may be the vast logs of the behavior of systems with tens of thousands of active machines that need to be constantly monitored, scaled and serviced.  In these cases, the individual events that make up the stream are often very small records of a few bytes in length.
  2. Large scale environmental or urban sensor networks such as wide-area earthquake sensor networks or the NSF Ocean Observatories Initiative or urban sensors networks such as those proposed in Chicago’s Array of Things project.  These are very heterogeneous collection of data streams that may involve instruments with very different stream characteristics.   For example, some small sensors may generate a high rate of small message while others may generate large bunches of large Mbyte-size messages in bursts such as you would see from an UAV surfacing and uploading many records.  They may require intermediate analysis at various stages but final analysis “downstream”.   Another good example is the stream of data from a swarm of robots that must be analyzed to avoid collision (see the paper by He, Kamburugamuve and Fox which describes this real-time challenge in detail.)
  3. The streams generated by very large experimental facilities like the Large Hadron Collider, Square Kilometer Array, the Advanced Photon Source and massive supercomputer simulations.  These large scale scientific experiments can be extremely complex and involve large numbers of instruments or HPC simulations, multiple data analysis steps and a distributed set of collaborators. Most of the data analysis in these experiments are not like the pure streaming data challenges we see in items 1 and 2.   The data streams are often extremely large file object that must move through complex laboratory networks.   The orchestration of the streaming activity more accurately resembles workflow than data flow and often that workflow must allow a human in the loop.

While it is tempting to think that one solution paradigm can cover all the bases, this may not be the case.  Cases 1 has led to an explosion of creativity in the open source community and several very significant Apache projects.  These include Spark Streaming which has been derived from the Spark parallel data analysis system,  Twitter’s  Storm system which has been redesigned by Twitter as Heron, Apache Flink from the German Stratosphere project, Googles Dataflow (also see this article) which is becoming Apache Beam which will run on top of Flink and Spark.  Other university projects include Neptune and the Granules project at Colorado State.   In addition to Google Cloud dataflow other cloud providers include Amazon Kinesis,  Azure Streaming and IBM Stream Analytics.   (Are you confused yet?   In the second part of this report we will describe many of these in much greater detail.)

It turns out that many of the tools described above for case 1 also apply to case 2 under certain conditions.  The challenges arise in two areas.  If the real-time demands of the application require very low latencies such as is required for various UAV challenges, some cloud solutions can be lacking.  However, Kamburugamuve, Ekanayake, Pathirage and Fox demonstrate that Storm’s basic communication mechanisms can be vastly improved using collective communication that exploit shared memory and optimized routing to meet the demands of the robot swarm example mentioned above.  The second challenge is if the size of the individual events in the stream is large (greater than a megabyte), such as you may find in many instruments that deal with image our sound object, it may not work at all with many of the systems designed with case 1 in mind.  Algorithmic methods can be used to reduce the size so approximate methods can be used to identify events for deeper off-line analysis.  In many of these instrument streaming cases it is necessary to do more processing of the stream “near the edge”.   In other words, many small data sources can be “pre-analyzed” by processors very near the source.   For example, the Apache Quark edge-analytics tools are designed to run in very small systems such as the Raspberry Pi.

Case 3 presents the greatest departure from the emerging open source tools.  The ATLAS experiment on the Large Hadron Collider (LHC) has a large Monte Carlo simulation component and they have converted the processing of the data into a relatively fine-grained event stream, called the Atlas Event Service.  A distributed workload manager, PanDA manages a global queue of analysis tasks that can be executed on a variety of platforms including Amazon and, in a specialized form called Yoda, on HPC systems.

At the other end of the application spectrum, massively parallel simulation model running on an exascale computer can generate vast amounts of data.   Every few simulated time steps the program may generate a very large (50GB or more) data structure distributed over a thousand parallel processing elements.  You can save a few of these to a file system, but it is now preferable to create a stream of these big objects and let another analysis system consume them directly.  The state-of-the-art HPC I/O library, called ADIOS, provided a very simple, standard- looking API to the application programmer, but the back-end of ADIOS can be adapted to a variety of storage or networking layers while taking full advantage of the parallel I/O capabilities of the host system.   One such back-end is facilitated by a networking layer, EVPath  that  provides the flow and control needed to handle such a massive stream.   Another backend target for ADIOS is DataSpaces, a system for creating shared data structures between application across distributed systems.  DataSpaces accomplishes this by mapping n-dimensional array objects to one dimension by using a distributed hash table and Hilbert space filling curves.   Together these provide a variety of streaming abstractions to allow data to move from one HPC application to a variety of HPC data analysis and visualization tools as illustrated in Figure 1.

adios

Figure 1.  From “Stream Processing for Remote Collaborative Data Analysis” by Klasky, Chang, Choi, Churchill, Kurc, Parashar, Sim, Wolf and Wu.  ORNL, PPPL, Rutgers, GT, SBU, UTK, LBNL.  White paper Stream2016 workshop.  

At the Streams 2016 workshop Kerstin Kleese Van Dam makes the important observation that that the workflow systems managing the stream analytics of time-critical experiments can be complex and the success of the experiment depends upon reliable performance of the overall system. The use case she described is “In Operando catalysis experiments”.   More specifically, this involves the steering of high end electron microscopy experiments where a beam of electrons is transmitted through an ultra-thin specimen, interacting with the specimen as it passes through. These experiments can generate atomic resolution diffraction patterns, images and spectra under wide ranging environmental conditions. In-situ observations with these instruments, were physical, chemical or biological processes and phenomena are observed as they evolve.  These experiments generate from 10GB-10’s of TB (e.g. at BNL) of data per at rates ranging from 100 images/sec for basic instruments to 1600 images/sec for state of the art systems. To optimize the scientific outcome of such experiments it is essential to analyze and interpret the results as they are emerging.  It is essential that the workflow system reliably deliver optimal performance, especially in situations where time-critical decisions must be made or computing resources are limited.

The current systems in use include the Analysis in Motion framework developed by PNNL, but the challenge that is presented here is to enact the workflow in a way that yields reliable performance. The workflows are frequently composite applications built from loosely coupled parts, running on a loosely connected set of distributed and heterogeneous computational resources. Each workflow task may be designed for a different programming model and implemented in a different language, and most communicate via files sent over general purpose networks.  This research group currently has a DOE project to demonstrate “Integrated End-to-End Performance Prediction and Diagnosis for Extreme Scientific Workflows (IPPD)”.

Concluding Observations. 

The streaming data landscape is very new and evolving fast.  I have come to the conclusion that of the three application domains described above (1: Internet Data Analysis, 2: Array of Things Instruments, 3: Big Science) only 1 and 2 are starting to see convergence of lines of thought and models of computing while 3 will always be driven by very different requirements and needs.   The bleeding edge of science does not have the deep pockets of Google or Amazon when it comes to IT resources.   Their budgets are dominated by the massive experimental facilities and supercomputers and hence the solutions must be custom.  And each experimental domain is unique enough that few common tools beyond MPI exist. On the other hand, one can argue that Twitter, Google and all of the various Apache projects discussed here are also custom built for the problems they each are trying to solve.   This is a world of “bespoke” software systems.

Algorithms and Analysis

An area where there will be great opportunity for sharing is in the algorithmic techniques that will be used to analyze the data.  The Streams 2015 report observed that a variety of compelling research topics have emerged including adaptive sampling, online clustering, sketching and approximation that trade space and time complexity for accuracy.   Sketching reduces an element of the stream to a basic form that allows easy generation of approximate answers to important queries.  There are many forms of sketching. Szalay described an elegant way to do principal component analysis (PCA) is a streaming context.  This provides a way to reduce the spectral complexity of a stream of big events.  Machine Learning classifiers can and are used as part of stream analytics across application domains as diverse as tweet analysis and medical imaging.      With the growing capabilities of deep learning systems, more data, images and sounds can be analyzed and recognized in near real-time.   Skype can do near-real time natural language translation and face recognition from video streams.   Applying the same technology to sifting through streams of instrument data will lead to new tools to understand earthquakes, hurricanes and tornadoes. We anticipate a lot of great work emerging from this area.

Azure Container Services Are Now Live: An Initial Look

The Microsoft Azure container services are now live and, for the most part, they work very well.  There are actually two container services the Azure team is supporting.   One is Mesosphere DC/OS and the other is Docker swarm.   I have been using various versions of Mesos and Mesosphere for a year now, but those deployments were somewhat ad hoc. Some previous postings are here and here and this article provides some updates to both.   These services are now in “general availability”, which is Microsoft speak for “it is now a product”.  There is a good start-up tutorial available here which will lead you through the setup phase.  In this post we will focus on some basic features of DC/OS and show a very simple example of how well it scales.   In a future post we will look at Swarm.

DC/OS

Following the introduction tutorial lined above it was relatively easy to create a DC/OS cluster with 8 worker nodes (and one public node) and one master.    Using the instructions, we also created a secure tunnel to the master node mapping port 80 there to localhost port 80.   The web link http://localhost on my windows10 box brought up the DC/OS web user interface.   What you see is the summary of all of the resources used as shown in Figure 1 below.

dcos1Figure 1.  DCOS web interface.

DCOS is the distributed cluster operating system and its job is to support deployed services.   The most valuable of these services is Marathon which is a container orchestration service that will allow you to easily scale the number instances of your containers and keep them running.   It can also be used to enforce special constraints.   For example, if you deploy a docker container that needs to bind to a special port on they host, it will not schedule another conflicting instance on that same host.   And it has a very nice graphical user interface shown in figure 2 that can be accessed through the DCOS interface.

dcos2

Figure 2.   Marathon interface showing all running services.

As you can see above I have an instance of Apache Spark, two instances of the streaming service Storm and one instance of the Zeppelin notebook and one instance of the simple web server Nginx all running.  Launching a new Docker container or service is very simple: fill in a web-form.    However, there is a command line tool that works very well on linux and windows.  For example, to get the information in Figure 2 above the command line call is as follows.

dcos7

The same command line interface can be used to launch new container instances and we will illustrate that below.

DCOS also has views of the of the individual resources.   Figure 3 displays the current view of all of the individual nodes in the cluster showing how many of the nodes are holding active containers or services.

dcos3

Figure 3.  DCOS display of worker node status and load

A Simple Example.

There are many prepackaged apps available for a one-click launch such as those listed above.  I originally wanted to Kafka in a demo, but there is still a bug with my deployment that does not allow me to access the Kafka gateway or the public node (10.0.0.5 in the Figure 3 list).   I will revise this report with an update as soon as I can solve that problem.

The example is a simple message filtering experiment.   Assume you have some source of independent tasks that must be analyzed as fast as possible and the results stored in a table or database.  Assume further that you task stream can be pre-filtered into and sorted into buckets of similar tasks that can be analyzed by code that is best suited for the tasks in that bucket.  For example, some tasks contain images of landscapes and others contain images of animals and you want to provide analyzers that are appropriate for each.   Or you are looking at logging data and your pre-filters detect several different types of anomalies and you want to group anomalies of similar type together.   We will use queues to hold the contents of each bucket.  The pre-filters push the data into the queues and workers pull the tasks of the queue, do the analysis and push the results to into a table. The general picture is shown in Figure 4 below.

dcos4

Figure 4.   Sample “microservice” configuration for our experiment.

Depending upon the complexity of the analysis undertaken by the worker and the arrival rate of tasks into the queues we may need to increase the number of workers assigned to each bucket queue as shown in Figure 5.

dcos5

Figure 5.   Adding additional workers to manage extra work at each queue.

In this simple experiment we will look at how increasing the number of workers can improve the throughput of the system.   Now for the details of the set-up.   Instead of using Kafka, we will use another common message broker RabbitMQ that is running on another linux box on Azure.   We use the Azure Table service to store the results.   Our worker service is a Docker container that is running a simple Python program that has two parts.

  1. When the worker starts-up it does not know what queue to list to.    So it looks in a separate queue called “roles” that will contain the name of the queue needed an extra worker.
  2. When it has the name of the queue to work on and begin pulling data items from the given queue and processing them and saving them to the table.  A time stamp is added to each item as it goes into the table.

In a real application step 2 can include task specialization once the worker knows what queue it is working on.  For example, in our text classifier example we loaded specific machine learning tables and states when we knew what topics we were analyzing.

In this example, we are only interested in the basic scale-up performance improvement as we increase the number of workers assigned to each queue.    The Python code for the container is not pretty, but you  are welcome to read and use it.   It is on GitHub here.   To deploy the Docker container on DCOS one needs a deployment configuration json file.  This config.json is shown below.

{
   "container": {
      "type": "DOCKER",
      "docker": {
          "image": "escigrp/rabbitpullpush"
       }
    },
   "id": "worker",
   "instances": 1,
   "cpus": 0.2,
   "mem": 512,
}

Notice that this specifies that the container is in the Docker hub with the name escigrp/rabbitpullpush and that we wish to devote 0.2 cpus and 512 MB of memory to this resource.  And we want one instance.

The dcos command to launch this container in the cluster is

dcos marathon app add config.json

Our “worker” will immediately show up as a deployment on the DCOS Marathon web page.

We are going to measure the throughput of the system in the number of events per second it can process as we increase the number of workers per node.   The way the experiment is done is as follows

For N = 1 to 14:

  1. Preload each of the 4 queues (named “1”, “2”, “3”, “4”) with 500 messages and start up 4*N instances of the worker container with Marathon on DCOS.
  2. Load the “roles” queue with N instances of each queue name.   Each of the four queues will now have N devoted workers.
  3. When all of the queues are empty, look in the table.   Subtract the earliest time stamp from the latest to get an approximation of the elapsed time.
  4. Use marathon to shutdown the workers and go to step 1.

Recall that there are 8  dual core nodes in the cluster.   Each instance of the worker container is allocated 0.2 of a core.   This means marathon could possibly schedule 80 instances.   However, there are other processes running on cluster so a practical limit was 60.   In fact we tested up to 56 container instances (14*4).   The results are shown in Figure 6 below.

dcos6

Figure 6.   Events processed per second as the number of workers per queue grows from 1 to 14.

There are several surprises for me here.   First the performance scales very linearly as the number of container instances grows.   Because there are only 16 cores available I expected this to level off when N was near 8  (32 instances), but, with the exception of an anomaly around 13, it kept climbing to N = 14 (56 instances).    Second, the absolute performance is not very good.   Digging deeper into the code and conducting several additional experiments revealed that the bottleneck is the table insertion due to an old and slow version of the python library.  Without the table insertion a single worker container instance call pull events at a rate of about 20 events per second, so 56 instances will be over 1000 events/sec which is well within the range of RabbitMQ.

Dynamic Scaling and Conclusion

A more interesting experiment would be to have the system described above dynamically scale the number of container instances as circumstances require.   For example, if one could monitor the depth of each queue, then if a queue starts to grow larger one could issue a command to increase the number of instances devoted to that queue.  If the queue is empty one could reduce the number of instances.    I am fairly certain there are a number of ways to do this, but one easy way is to use the “marathon update” command.   This command allows a “real-time” update to json configuration.    Any field in the configuration can be modified.   For example, to  update the configuration to 10 instances one can issue the command below.

dcos marathon app update worker env='{"instances":"10"}'

This change in status should trigger marathon to make the necessary adjustments and change the number of instances to 10.   It would be relatively straight forward to write a program that would poll the event broker for status and check the current queue lengths and, depending on the conditions issue the dcos command above.

Final Thoughts.

It is great to see this container service based on Mesosphere’s DC/OS finally available in a reliable and highly usable form.   This an excellent platform for managing large collections of Docker containers and orchestrating microservices deployments.   The performance of the system was excellent and the web user interface is well done.   The command line interface is solid and only gave me one problem.   Installing the command line interface for Kafka caused problems on windows and it did not follow the script here.  It seemed to be loading an old version that did not support windows.   The other problem was that the DC/OS cluster I deployed on Azure had one public node, but the “public” IP address give for this node was not reachable.   (Any reader who knows how to address these problems please comment here and I will update this post.  As is often the case, there are easy solutions to problems that stump me.)

In a future post we will look at the Docker Swarm deployment that is also part of this new Azure release.

Fun with Recurrent Neural Nets: One More Dive into CNTK and TensorFlow

In a previous article I set about comparing Microsoft’s Computational Network Took Kit for deep neural nets to Google’s TensorFlow.  I concluded that piece with a deep dive into how recurrent neural nets (RNNs) were represented in each system.   I specifically went after the type of RNNs known by the strange name of Long Short-Term Memory (LSTM) networks.   I wanted to learn a bit more about how these systems worked.  I decided to treat them like laboratory specimens so that I could poke and prod them to see what I could learn and what I could get them to do.  This article is essentially my lab notebook.  Warning:  With the exception of a bit toward the end, this is not technically very deep.   In fact, I did not discover anything that has not been extensively reported on elsewhere.   But I learned a lot and had some fun.   Perhaps it will be of interest to students just starting to learn about this subject.   Before I get to far into this, I would like to mention that I recently discovered an excellent series of tutorials on RNNs by Denny Britz that are definitely worth reading.

CNTK’s LSTM and Hallucinating Bloomberg Financial News

One of the many good examples in CNTK is language modeling exercise in Examples/Text/PennTreebank.   The documentation for this one is a bit sparse and the example is really just of a demo for how easy it is to use their “Simple Network Builder” to define a LSTM network and train it with stochastic gradient decent on data from the Penn Treebank Project.   One command starts the learning:

cntk configFile=../Config/rnn.cntk

Doing so trains the network, tests it and saves the model.  However, to see the model data in an easily readable form you need a trivial addition to the configfile: you need to add the following dumpnode command to put a dump file a directory of your choosing.

dumpnode=[
    action = "dumpnode"
    modelPath = "$ModelDir$/rnn.dnn"
    outputFile = "$OutputDir$/modeltext/dump"
]

This creates a big text file with all the trained data.   To experiment with the trained model, I decided to load it into a python notebook and rebuild the LSTM network from the defining equations.  From the CNTK book those equations are

lstm_eqn

I was pleased to see that the dumped model text had the same W and b tensors names as in the equations, so my job was relatively easy.    I extracted each of the tensors and saved them into a file (I will make these available in Github).   The python code for the LSTM based on the equations above is below.

def rnn(word, old_h, old_c):
      Xvec = getvec(word, E)

      i = Sigmoid(np.matmul(WXI, Xvec) + 
                  np.matmul(WHI, old_h) + WCI*old_c + bI)
      f = Sigmoid(np.matmul(WXF, Xvec) + 
                  np.matmul(WHF, old_h) + WCF*old_c + bF)
      
      c = f*old_c + i*(np.tanh(np.matmul(WXC, Xvec) + 
                               np.matmul(WHC, old_h) + bC))
      
      o = Sigmoid(np.matmul(WXO, Xvec)+ 
                  np.matmul(WHO, old_h)+ (WCO * c)+ bO)
      
      h = o * np.tanh(c)
      
      #extract ordered list of five best possible next words
      q = h.copy()
      q.shape = (1, 200)
      output = np.matmul(q, W2)
      outlist = getwordsfromoutput(output)
      return h, c, outlist

As you can see, this is almost a literal translation of the equations.    The only different is that this has as input a text string for the input word.  However the input to the equations is a vector encoding of the word.  The model generates the encoding matrix E which has the nice property that the ith column of matrix corresponds to the word in the ith position in the vocabulary list.  The function getvec(word, E) takes the embedding tensor E, and looks up the position of the word  in the vocabulary list and returns the column vector of E that corresponds to that word.   The output of one pass through the LSTM cell is the vector h.  This is a compact representation of the words likely to follow the input text to this point.  To convert this back into “vocabulary” space we multiply it by another trained vector W2.  The size of our vocabulary is 10000 and the vector output is that length.  The ith element of output represents the relative likelihood that that ith word is next word to follow the input so far.  Getwordsfromoutput simply returns the top 5 candidate words in order of likelihood.

Before going further, it is worth looking closer at the properties of the word embedding matrices E and W2.   There is a fascinating paper by  Mikolov, Yih and Zweig entitled “Linguistic Regularities in Continuous Space Word Representations” where they suggest that the embedding space for word has several interesting properties.   I decided to investigate that.   Their point is that words that are similar in a linguistic sense will be nearby in the embedding space.   For example, present tense verbs should be near other present tense verbs and singular nouns should be near each other, etc.   I decided to try that.  However, there are two embedding mappings.  One is based on the tensor E and the other based on the W2 tensor.   E has dimension 150 by 10000 and W2 is 200 by 10000.  The difference in dimensionality are because of arbitrary decisions made in defining the hidden layers in the network.  But both represent word imbeddings.  I experimented with both.  I wrote a function getnear(word, M) which takes a word and looks for the 5 most nearby words in the space where M is transpose of either E or W2. (I used cosine distance as the metric.) Verb tense locality and noun plurals worked best in the W2 space as illustrated below.

rnn-embedding

These are only illustrations.  For a deeper statistical analysis look at the Mikolov paper.   A more interesting conjecture from their study was that there may be some linearity in these embedding that might allow one to try simple analogies of the form “A is to B, as C is to __”.   Their idea is that if a, b and c are the vector embeddings of the words A, B and C, then the embedding of “__” may be computed as d = c + (b-a).  So I wrote a little function AistoBasCisto(A, B, C) that does this computation.   In the results I had to delete A, B and C from the candidate answers because they came up often as nearby.   In this case my results were less encouraging.  It worked better with the E space than with W2.   For example, for E we have

rnn-analogy1

And for the W2 space the results looked like

rnn-analogy2

As you can see the “run running walk __” example failed with E but was close, but still incorrect, with W2.

You may wonder why these particular words came up.  The data we used to train the system came from a small subset of the Penn TreeBank collection as provided in the CNTK package.   It is heavily dominated by financial news items.    This explains why the plural of person could be managers or customers.   A larger vocabulary and data collection would be needed to truly test the analogy by linearity conjecture.

Now on to hallucinating the financial news.

Now to test the LSTM as a truly recurrent network.   We provide the network with a starting word and let it suggest the next word.  And then we repeat this process constructing a “sentence”.  In the code below we randomly pick one of the top three suggest by the network as the next word.

c = np.zeros(shape = (200, 1))
h = np.zeros(shape = (200, 1))
output = np.zeros(shape = (10000, 1))
word = 'my'
sentence= word 
for _ in range(40):
    h, c, outlist = rnn(word, h, c)
    word = outlist[randint(0,3)]
    sentence = sentence + " " +word
print sentence+"."

In this case we start with the word “my” and let it generate a 40 word sentence.  The output is

my new rules which would create an interest position here unless there should prove   signs of such things too quickly although the market could be done better toward paying further volatility where it would pay cash around again if everybody can.

This is a great example of hallucinating financial news. Let’s try it again starting with the word “president”.

president michael de brown wrote himself against democratic union law which represents an emergency relief agreement during a new york state district or early tuesday before july after a federal government agency created early losses without mr. krenz or perhaps.

Now with the word “the”.

the company reported third-quarter results reflecting a number compared between N barrels including pretax operating loss from a month following fiscal month ending july earlier compared slightly higher while six-month cds increased sharply tuesday after an after-tax loss reflecting a strong.

The “sentences” end rather abruptly because of the 40 word limit I set.  If you let it go it will run on until the state vector for the sentence seems to break down.     Try this yourself.  To make it easy to play with this example, I have put the code in GitHub.  The trained model text files are in OneDrive and is a zipped file of about 50MB.

There are many more excellent and fun examples.  Andrej Karpathy has a great blog article showing how RNNs can mimic Shakespeare, or Latex science articles and many more.

TensorFlow’s seq2seq French Lesson.

One of the most interesting examples in the TensorFlow tutorials is an English to French translator.  As with the CNTK example it was trivial to start the translator learning following the instructions in the tutorial.   After letting this run for about a week, I wanted to see how well it would do.     As with the CNTK example, I created a Jupyter IPython notebook and loaded the trained model.   I will explain how that was done in more detail below but, for now, I will show how we can invoke it to test its translation ability.    This particular trained model was not very big and with a relatively small data set, so I didn’t expect much.    In fact, as you will see, to a French speaker it is a disaster.   On the other hand, it learned more French in a week of training that I did in three semesters of French in college.   (For full disclosure, this was my weakest subject in college and my grade was a hard-fought “C” each semester.)

The code below demonstrates how the model is invoked.   First you have to tokenize the input sentence.  The algorithm uses a system of buckets of fixed sizes to make the training more efficient.  You next find the smallest bucket that can contain your sentence and convert this to the input vector list needed by the model.   The step function takes a Tensorflow session, the input vector list and a null list of decoder inputs (to be explained later) and generates a list of vectors as outputs.  Each vector represents the likelihood that individual vocabulary words are the correct word at that point in the translated sentence.   We pick the most likely and print the sentence.

sentence = " I am not the president of France. "

token_ids = data_utils.sentence_to_token_ids(sentence, en_vocab)
      # Which bucket does it belong to?
bucket_id = min([b for b in xrange(len(_buckets))
                 if _buckets[b][0] > len(token_ids)])
      # Get a 1-element batch to feed the sentence to the model.
encoder_inputs, decoder_inputs, target_weights = 
    model.get_batch({bucket_id: [(token_ids, [])]}, bucket_id)

_, _, output_logits = model.step(sess, encoder_inputs, decoder_inputs, 
                                 target_weights, bucket_id, True)
		  
outputs = [int(np.argmax(logit, axis=1)) for logit in output_logits]
print(" ".join([rev_fr_vocab[output] for output in outputs]))

Je ne suis pas le président de la France .

This example is not too bad.  However, if I ask

“In which city does the president of France live?”

I get

“Dans quelle ville le président de la France ?”.

This is not exactly correct.    If I feed this into Google translate and ask what this means in English I get “In which city the President of France?”.   If I give it this one,

What is the name of a good restaurant?

The system responds with

Quel est le nom d’une bonne bonne bonne ?”

Which translates back to “What is the name of a good good good?”.  Probably not very helpful on the streets of Paris.   It turns out restaurant is not in the tiny training vocabulary used here.   Finally, given this sentence

” The article stated that the President of the United States is here today. “

The translator returned

Le paragraphe a indiqué que le président des États-Unis est aujourd ‘ hui aujourd ‘ hui .”

The end of this reply is “is today today”.    As I said, this is still much better than I could do with my college French.   However, as you can see from the previous two examples, our little translator runs out of gas at the end of sentences and tends to repeat itself.   You should try this yourself.   I have put the notebook file in github or you can execute these directly from the Tensorflow python code.   All you need to do is train the model from TensorFlow and run the notebook with the path to the model output directory.

While loading and using the trained model was easy and fun, understanding the seq2seq model used in this example takes a bit of work.   So this part of this article will get a bit more technical.

The TensorFlow translate program is based on a sequence-to-sequence model constructed from more primitive recurrent neural nets.   By sequence-to-sequence we mean a network that takes a sequence as input and produces a sequence as output.   It consists of two parts: an “encoder RNN” and a “decoder RNN” as shown in Figure 1 below.

seq2seq

Figure 1.   A sequence-to-sequence RNN English to French translator with the encoder and decoder unrolled to show the flow of messages.

In this figure the RNNs are “unrolled” to show the flow of messages.  The state vector at the end of the encoder is a vector embedding of the input sentence.   This state vector is used to start the decoder along with a “GO” token.  The diagram shows the network after it has been trained.   During training the inputs to the decoder are the French version of the English sentences.   I won’t talk about the training here because is enough to try to understand how this works.  Before I go any further I want to point you to some important papers.  Sutskever, Vinyals and Le published an early important paper on sequence to sequence models that is worth reading.

To understand how it is built the network we need to dig into the code a bit. The building blocks are a set of classes of base type RNNCell with specializations

  1. BasicRNNCell
  2. GRUCell
  3. BasicLSTMCell
  4. LSTMCell
  5. OutputProjectionWrapper
  6. InputProjectionWrapper
  7. EmbeddingWrapper
  8. MultiRNNCell

The ones we will see used here are GRUCell, MultiRNNCell and EmbeddingWrapper.   We discussed LSTMCell in our previous article but we need to look at GRUCell here because that is the one used in the example.   The GRUCell is a “Gated Recurrent Unit” invented by Cho et. al.  in “Learning Phrase Representations using RNN Encoder–Decoder for Statistical Machine Translation”.  The “gated” phrase comes from the way the output is defined as coming mostly from the previous state or from a combination with the new input.   The diagram below tries to explain this a bit better.

gru-pic

Figure 2.   GRU wiring diagram

It also helps to see it in terms of the defining equations.

gru-eq1

The quantity ut  is a gate vector.  Recall the sigmoid function switches sharply between one and zero.  So when ut is one then h is just a copy of the old h and we are ignoring the input x it is based on the value ct.   The gate rt is determines how much of the old state goes into defining the value of ct.   To understand how this is encoded in TensorFlow you need to understand the function.

linear(args, output_size, bias, bias_start=0.0, scope=None)

where args is a list of tensors each of size batch x n .   Linear computes sum_i(args[i] * W[i]) + bias where W is a list of matrix variables of size n x outputsize and bias is a variable of size outputsize.   In the equations above we have represented linear algebra as a matrix times a column vector.   Tensorflow uses the transpose notation:   row vector on the left times the transpose of the matrix.   So in linear the args are a list of row vectors.   Where is the matrix W and offset b?  This is fetched from memory based on the variable current scope, because W and b are variable tensors that are learned values.   If you look at the first two equations above, you will see they are almost identical.   In fact, we can write them as

gru-eq2

If you transpose the last one from column form into row form you can now compute both with one invocation of the linear function.   The code for the GRUCell is below.   As you can see they have encoded one pass through the GRU cell with only two matrix vector multiplies.   You can also see that the way the variable scope is used to pick out the W’s for the gates and the W for the state/output.  Another point to remember that an invocation of the “__call__ function operator does not cause the tensor to execute the operation, rather it builds the graph.

class GRUCell(RNNCell):
  def __init__(self, num_units):
    self._num_units = num_units
   ... stuff deleted ....
  def __call__(self, inputs, state, scope=None):
    with vs.variable_scope(scope or type(self).__name__):  
      with vs.variable_scope("Gates"):  # Reset gate and update gate.
        # We start with bias of 1.0 to not reset and not udpate.
        r, u = array_ops.split(1, 2, linear([inputs, state],  
                               2 * self._num_units, True, 1.0))
        r, u = sigmoid(r), sigmoid(u)
      with vs.variable_scope("Candidate"):
        c = tanh(linear([inputs, r * state], self._num_units, True))
      new_h = u * state + (1 - u) * c
    return new_h, new_h

The top level class we invoke for building our model is seq2seqModel.    When we create an instance of this class it sets in motion a set of flowgraph building steps.  I am going to skip over a lot of stuff and try to give you the big picture.  The first graph building step in the initialization of an instance of this object is

# Create the internal multi-layer cell for our RNN.
    single_cell = rnn_cell.GRUCell(size)
     …
    if num_layers > 1:
      cell = rnn_cell.MultiRNNCell([single_cell] * num_layers)

As you can see we are creating a GRU cell graph generator instance and making a list of num_layers of this object and passing that to the constructor for MultiRNNCell.   In our case, num_layers has been set to 2.   MultiRNNCell is pretty easy to understand.   It builds a graph consisting of a stack of (in this case) GRU cells where the output state vector of each level is fed to the input of the level above it.  This new compound cell has an output that is the state of the top sub-cell and whose output state is the concatenation of the output states of all the sub-cells.

The next part is not so easy to follow.    We will take our MultiRNNCell graph builder and use it to create and encoder and a special decoder.    But first we must make a short digression.

Paying Attention

There is a problem that is encountered in the sequence-to-sequence model.   The encoder encodes the entire sentence into a state vector which is used by the decoder as its input.    That state vector is an abstract representation of our entire sentence as a single point in a very high dimensional space.    The decoder has been trained to use that point as a starting point to unroll a translated version of the sentence.   I find the fact that it works at all to be rather remarkable.   It is as if the decoder takes the English state vector and transforms it into a similar point in “French” space.

Unfortunately, the longer the input sentence, the more difficult it is to decode it.   How much information can we pack into one point?   The problem is that at each decoding step we need a little bit more information than is provided by the state vector as it passes through the decoder loop.   The idea used here is to help the decoder by providing it a bit of focus derived from the input sequence at each stage of the decoder loop.  This is generally referred to as “attention”, as in “at this step of decoding please pay attention to what the encoder was doing here”.   Bahdanau, Cho and Bengio had an early paper about this that used a bidirectional pass over the input sequence. As they put it, they wanted to “automatically (soft-)search for parts of a source sentence that are relevant to predicting a target word”.  (Denny Britz has a lovely blog article about attention and describes several fascinating applications.  It is  well worth  reading.) The mechanism for attention used in the TensorFlow example is based on a paper by Vinyals et. al. and we will follow that one here.   The key idea is rather than take the single final state vector from the encoder, let’s collect the state vectors at each stage of the encoder.   Following Vinyals, let the encoder state vectors for each input word be

atten1

And let the decoder state vectors be

atten2

Then for each decoder time step t compute

atten3

Where the Ws are learned matrices and v is a learned vector.    Then as the input to the t+1 state vector of the decoder we use the concatenation

atten4

The idea is this new state vector at time t+1 puts much more focus on the corresponding words in in the encoder string. This all happens in a function called seq2seq.attention_decoder that is called in another constructor function seq2seq.embedding_attention_seq2seq that wraps and an embedding around a graph generated by our MultiCellRNN graph builder to generate the final decoder graph.   These graphs are all stitched together in the Seq2SeqModel constructor.  It is fair to say that there are many levels of abstraction here that are used to build the decoder and link it to the encoder.  I am leaving out many details that are critical for the training such as the part that implements the bucket handler.   The final graph, in its most abstract form is pictured below in figure 3.

seq2seq_final

Figure 3.  The Translate.py sequence to sequence translator is based on a two level GRU cell encoder and an attention-augmented two level GRU cell decoder.  The input English is entered in reverse order as an optimization

Final Thoughts

As I have said above, I have not included all the details of how the seq2seq translator is put together, but I tried to include the highlights that I found most interesting.   I encourage you to dive into the code and discover the rest.   You will likely find some errors in what I described above.   If so, please let me know.

There is really a lot of exciting results that have come out in the last few years relating to RNNs.   For example, Lei Ba, Mnih and Kavukcuoglu demonstrated that RNNs with attention can be applied to interesting image analysis challenges, such as reading the house number from a street scene.   In “Teaching Machines to Read and Comprehend” Hermann et. al. excellent paper demonstrate the use of an attentive RNN build to answer simple questions about text.   I personally don’t think any RNN can pass a Turing test yet, so it ain’t A.I.  But these little statistical machines are certainly wonderful mimics and they can speak better French than I.

Streaming Events to AzureML Through Azure Stream Analytics

 

UPDATE!  Microsoft has just recently released a much better way to integrate Azure Machine Learning with Azure Stream Analytics.  You can now call an AzureML service directly from the SQL query in the stream analytics system.  This means you don’t need the message bus and microservice layer I describe below.   Check out the blog by Sudhesh Suresh and the excellent tutorial from Jeff Stokes.   I will do a performance analysis to compare this method to the one below, but I will wait until the new feature comes out of “preview” mode :).   I will also use a better method to push events to the eventhub.


 

Doing sophisticated data analytics on streaming data has become a really interesting and hot topic.   There is a huge explosion of research around this topic and there are a lot of new software tools to support it.  Amazon has its new Kinesis system,  Google has moved from MapReduce to Cloud DataFlow, IBM has Streaming Analytics on their Bluemix platform and Microsoft has released Azure Stream Analytics.  Dozens of other companies that use data streaming analytics to make their business work have contributed to the growing collection of open source of tools.   For example LinkedIn has contributed the original version of Apache Kafka and Apache Samza and Yahoo contributed Storm.  And there are some amazing start-ups that are building and supporting important stream analytics tools such as Flink from Data-Artisans. Related university research includes systems like Neptune from Colorado State.  I will post more about all of these tools later, but in this article I will describe my experience with the Microsoft EventHub, Stream Analytics and AzureML.   What I will show below is the following

  1. It is relatively easy to build a streaming analytics pipeline with these tools.
  2. It works, but the  performance behavior of the system is a bit uneven with long initial latencies and a serious bottleneck which is probably the result of small size of my experiment.
  3. AzureML services scale nicely
  4. Finally there are some interesting ways in which blocking can greatly improve performance.

In a previous post I described how to use AzureML to create a version of a science document analyzer that I originally put together using Python Scikit-Learn, Docker and Mesosphere.  That little project is described in another post.   The streaming challenge here is very simple.   We have data that comes from RSS feeds concerning the publication of scientific papers.  The machine learning part of this is to use the abstract of the paper to automatically classify the paper into scientific categories.  At the top level these categories are “Physics”, “math”, “compsci”, “biology”, “finance”.    Big streaming data challenges that you find in industry and science involve the analysis of events that may be large and arrive at the rate of millions per second.   While there are a lot of scientists writing lots papers, they don’t write papers that fast.   The science RSS feeds I pull from are generating about 100 articles per day.  That is a mere trickle.  So to really push throughput experiments I grabbed several thousands of these records and wrote a simple server that would push them as fast as possible to the analysis service.

In this new version described here I want to see what sort of performance I could get from AzureML and Azure Stream Analytics.   To accomplish this goal I set up the pipeline shown in Figure 1.

system-diagram-eventhub

Figure 1.   Stream event pipeline

As shown in the diagram events (document abstracts) are initially pushed into the event hub which is configured as the source for the stream analytics engine.  The analytics engine does some initial processing of the data and then pushes the events to a message queue in the Azure Message Bus.  The events are then pulled from the queue by a collection of simple microservices which call the AzureML classifier and then push the result of the call into the azure table service.

In the following paragraphs I will describe how each stage of this pipeline was configured and I will conclude with some rather surprising throughput results.

Pushing events into the Event Hub.

To get the events into the Event Hub we use a modified version the solution posted by Olaf Loogman.  Loogman’s post provides an excellent discussion of how to get an event stream from the Arduino Yun and the Azure event hub using a bit of C and Python code.  Unfortunately the Python code does not work with the most current release of the Python Azure SDK[1], but it works fine with the old one. In our case all that we needed to do was modify Loogman’s code to read our ArXiv RSS feed data and convert it into a simple JSON object form

{ ‘doc’: ‘the body of the paper abstract’,
  ‘class’: ‘one of classes:Physics, math, compsci, bio, finance’,
  ‘title’: ‘ the title of the paper’ 
  }

The Python JSON encoder is very picky so it is necessary to clean a lot of special characters out of the abstracts and titles.  Once that is done it is easy to push the stream of documents to the Event Hub[2].

According to Microsoft, the Event Hub is capable of consuming many millions of events per second.  My simple Python publisher was only able to push about 4 events per second to the hub, but by running four publisher instances I was up to 16 eps.   It was clear to me that it would scale to whatever I needed.

The Stream Analytics Engine

The Azure stream analytics engine, which is a product version of the Trill research project, has a very powerful engine for doing on-line stream processing.   There are three things you must do to configure the analytics engine for your application.    First you must tell it where to get its input.   There are three basic choices:  the event hub,  blob storage and the “IOT” hub.   I configured it to pull from my event hub channel.   The second task is to tell the engine where to put the output.  Here you have more choices.  The output can go to a message queue, a message topic, blob or table storage or a SQL database.  I directed it to a message queue.  The final thing you must do is configure the query processing step you wish the engine to do.   For the performance tests described here I use the most basic query which simply passes the data from the event hub directly to the message queue.   The T-SQL code for this is

SELECT 
     *
INTO
     eventtoqueue
FROM
     streampuller

where eventtoqueue is the alias for my message bus queue endpoint and streampuller is the alias for my event queue endpoint.  The original JSON object that went into the event hub emerges with top level objects which include the times stamps of the event entry and exit as shown below.

{  "doc": "body of the abstract",
   "class": "Physics",
   "title":"title of the paper",
   "EventProcessedUtcTime":"2015-12-05T20:27:07.9101726Z",
    "PartitionId":10, 
    "EventEnqueuedUtcTime":"2015-12-05T20:27:07.7660000Z"
}

The real power of the Analytics Engine is not being used in our example but we could have done some interesting things with it.   For example, if we wanted to focus our analysis only on the Biology documents in the stream, that would only require a trivial change to the query.

SELECT
     *
INTO
    eventtoqueue
FROM
    streampuller
    WHERE class = ‘bio’

Or we could create a tumbling window and group select events together for processing as a block.  (I will return to this later.) To learn more about the Azure Analytics Engine and T-SQL a good tutorial is a real-time fraud detection example.

Pulling events from the queue and invoking the AzureML service

To get the document events from the message queue so that we can push them to the AzureML document classifier we need to write a small microservice to do this.  The code is relatively straightforward, but a bit messy because it involves converting the object format from the text received from the message queue to extract the JSON object which is then converted to the list form required by the AzureML service.   The response is then encoded in a form that can be stored in the table.    All of this must be encapsulated in the appropriate level of error handling.   The main loop is shown below and the full code is provided in Github.

def processevents(table_service, hostname, bus_service, url, api_key):
    while True:
       try:
            msg = bus_service.receive_queue_message('tasks', peek_lock=False)
            t = msg.body
            #the message will be text containing a string with a json object 
            #if no json object it is an error.  Look for the start of the object
            start =t.find("{")
            if start > 0:   
                t = t[start:]
                jt = json.loads(t)
                title = jt["title"].encode("ascii","ignore")
                doc  = jt["doc"].encode("ascii","ignore") 
                tclass = jt["class"].encode("ascii","ignore")
                evtime = jt["EventEnqueuedUtcTime"].encode("ascii", "ignore")
                datalist = [tclass, doc, title]
                #send the datalist object to the AzureML classifier
                try:
                       x = sendrequest(datalist, url, api_key)
                       #returned value is the best guess and 
                       #2nd best guess for the class
                       best = x[0][1]
                       second = x[0][2]
                       #save the result in an Azure Table using the hash 
                       # of the title as the rowkey
                       #and the hostname of this container as the table partition
                       rk = hash(title)
                       timstamp = str(int(time.time())%10000000)
                      item = {'PartitionKey': hostname, 'RowKey': str(rk),
                              'class':tclass, 'bestguess':best, 'secondguess':second,
                              'title': title, 'timestamp': timstamp, 'enqueued': evtime}
                      table_service.insert_entity('scimlevents', item)
                 except:
                        print "failed azureml call or table service error"
           else:
                 print "invalid message from the bus"
       except:
          print "message queue error”

Performance results

Now that we have this pipeline the most interesting experiment is to see how well it will perform when we push a big stream of messages to it.

To push the throughput of the entire pipeline as high as possible we wanted as many instances of the microservice event puller as possible.   So I deployed a Mesos Cluster with 8 dual core worker nodes and one manager node using the template provided  here.   To maximize parallelism for the AzureML service, three additional endpoints were created.   The microservices were assigned one of the four AzureML service endpoints as evenly as possible.

We will measure the throughput in seconds per event or, more accurately, the average interval between event arrivals over a sliding window of 20 events.     The chart below shows the seconds/event as the sliding window proceeds from the start of a stream to the end.   The results were a bit surprising.   The blue line represents one instance of the microservice and one input stream.   As you can see there is a very slow startup and the performance levels out at about 1 second/event.   The orange line is the performance with 12 microservice instances and four concurrent input streams.  Again the startup was slow but the performance leveled off at about 4 events/sec.

stream2

Figure 2.   Performance in Seconds/Event with a sliding window of 20 events.   Event number on the x-axis.

This raises two questions.

  1. Why the slow startup?
  2. Why the low performance for 12 instance case? Where is the bottleneck in the pipeline? Is it with the Azure Stream Analytics service,  the message bus or the AzureML service?

Regarding the slow startup, I believe that the performance of the Azure services are scaled on demand.  In other words, as a stream arrives at the event hub, resources are allocated to try to match the processing rate with the event arrival rate.   For the first events the latency of the system is very large (it can be tens of seconds), but as the pool of unprocessed events grows in size the latency between event processing drops very fast.   However, as the pool of unprocessed events grows smaller small resources are withdrawn and the latency goes back up again.  (We pushed all the events into the event hub in the first minute of processing, so after that point the number in the hub declines.  You can see the effect of this in the tail of the orange curve in Figure 2.)  Of course, belief is not science.   It is conjecture.

Now to find the bottleneck that is limiting performance.   It is clear that some resources are not scaling to a level that will allow us to achieve more than 4 events/second with the current configuration.   So I tried two experiments.

  1. Run the data stream through the event hub, message bus to the microservices but do not call the AzureML service.   Compare that to the version where the AzureML service is called.
  2. Run the data stream directly into the message bus and do not call the AzureML service.  Compare that one to the others.

The table below in Figure 3 refers to experiment 1.   As you can see, the cost of the AzureML service is lost in the noise.   For experiment 2 we see that the performance of the message bus alone was slightly worse that the message bus plus the eventhub and stream analytics.  However, I expect these slight differences are not statistically significant.

stream3

 Figure 3.  Average events per second compairing one microservice instance and one input stream to 8 microservices and 4 input streams to 8 microservices and 4 input streams with no call to AzureML and 8 microservices with 4 input streams and bypassing the eventhub and stream analytics.

The bottom line is that the message bus is the bottleneck in the performance of our pipeline.  If we want to ask the question how well the AzureML service scales independent of the message bus, we can replace it with the RabbitML AMQP server we have used in previous experiments.     In this case we see a very different result.   Our AzureML service demonstrates nearly linear speed-up in terms of events/sec.    The test used an ancreasing number of microservices (1, 2, 4, 8, 12, 16 and 20) and four endpoints to the AzureML service.   As can be seen in Figure 4 the performance improvement lessens when the number of microservice instances goes beyond 16.  This is not surprising as there are only 8 servers running all 20 instances and there will be contention over shared resources such as the network interface.

stream4

Figure 4.   AzureML events per second using multiple microservice instances pulling the event stream from a RabbitMQ server instance running on a Linux server on Azure.

Takeaway

It was relatively easy to build a complete event analytics pipeline using the Azure EventHub, Azure Stream Analytics and to build a set of simple microservices that pull events from the message bus and invoke an AzureML service.   Tests of the scalability of the service demonstrated that performance is limited by the performance of the Azure message bus.    The AzureML service scaled very well when tested separately.

Because the Azure services seems to deliver performance based on demand (the greater the number unprocessed events in the EventHub, the greater the amount of resources that are provisioned to handle the load.)  The result is a slow start but a steady improvement in response until a steady state is reached.   When the unprocessed event pool starts to diminish the resources seem to be withdrawn and the processing latency grows.      It is possible that the limits that I experience were because the volume of the event stream I generated was not great enough to warrant the resources to push the performance beyond 4 events/sec.   It will require more experimentation to test that hypothesis.

It should also be noted that there are many other ways to scale these systems.  For example, more message bus queues and multiple instances of the AzureML server.    The EventHub is designed to ingest millions of events per second and the stream analytics should be able to support that.  A good practice may be to use the Stream Analytics engine to filter the events or group events in a window into a single block which is easier to handle on the service side.   The AzureML server has a block input mode which is capable of processing over 200 of our classification events per second, so it should be possible to scale the performance of the pipeline up a great deal.

 

[1] The Microsoft Azure Python folks seem to have changed the namespaces around for the Azure SDK.   I am sure it is fairly easy to sort this out, but I just used the old version.

[2] The source code is “run_sciml_to_event_hub.py” in GitHub for those interested in the little details.

Performance Analysis of a Cloud Microservice-based ML Classifier

(This is an edited version correcting some of the analysis in the version i posted last week)

Microservice architectures have become an important tool for large scale cloud application deployment. I wanted to understand how well a swarm of microservices could be used to process streams of events where a non-trivial computation is required for each.   I decided a fun test would be to use machine learning to classify scientific document abstract that appear on public RSS feeds.   This is the first result of that experiment.  It uses a very small Mesosphere cluster on Microsoft Azure.   I will release the code to GitHub and data as soon as i clean it up a bit.

In a previous post we looked at building a document classifier for scientific paper abstracts.   In this post we will see how we can deploy it as a network of containerized microservices running on Mesosphere on Microsoft Azure.  We will then push this network of services hard to see how well it scales and we will try to isolate the performance issues. Specifically we are interested in the following challenge. If you have a heavy stream of events coming into a stream analysis system you may overload your services.   The conventional wisdom is that you can scale up the number of servers and services to cope with the load.   Fortunately more than one microservice instance can be deployed on a single VM. But does increasing the number of services instances per VM allow us to scale the system to meet the throughput requirements? Are there fundamental limits to how well this strategy will work? What are the limiting factors?

To summarize where we are so far, we have a set of RSS feeds that are “pushing” scientific publication events to us.   Our system looks at the abstracts of these events and use several different machine learning tools to classify the document into one (or more) scientific disciplines and sub-disciplines.   We have five major disciplines: Physics, Math, Biology, Computer Science and Finance. Each of these major disciplines is further divided into a number of sub-disciplines or specialties. We have divided the work into four main activities

  1. Training the classifiers. For this we used some of the labeled data from the ArXiv RSS feeds as training data.   (This was discussed in detail in the previous post, so we won’t go over it again here.)   This training phase is used to generate models that will be used by the classifier services discussed here.   We generate models for the main topic classifier and each of the sub-domain classifiers.   This is a pre-processing step and it is not counted in our performance analysis.
  2. Pulling the events from the RSS feeds and pushing them to the main topic classifier. The main topic predictor/classifier uses two primary machine learning methods to determine which disciplines the incoming documents belongs to.   As we have seen, these two methods agree on the discipline about 87% of the time. But sometimes they disagree on a document and we have two potential correct answers.   We interpret this as a potentially “interdisciplinary” research document and classify it as belong to both answers.
  3. Doing the sub-domain classification. As shown in the Figure 1 below the major topic classifiers push the document to the one (or two) sub-discipline specific classifiers.   These classifiers use the same ML methods as the main topic classifier to further classify the document into the sub-disciplines.

architecture

Figure 1.   Conceptual model of the document classifier microservice architecture

Managing queues and Pushing the classified document to a table of results.

What is not shown in the diagram in Figure 1 is what happens next. While the conceptual model provides a reasonable top-level view, the real design requires several additional critical components.   First as events move through the system you need a way to buffer them in queues.   We use the Active Message Queuing Protocol (AMQP) which is one of the standards in this space.   A good implementation of AMQP is the RabbitMQ system which we hosted on a server in the Azure cloud.     As shown in Figure 2   we (logically) position the RabbitMQ event hub between the major topic classifiers and the sub-domain classifiers.   We establish 7 queues on the event hub.   There is a queue for each of the major topic areas (Physics, Bio, Math, CS, and Finance), a “status” queue and a “role” queue.   The classify microservices are given classification roles by polling the “role” queue.   This tells them which topic area they are assigned to.   When they complete the sub-domain classification of a document they invoke another microservice that is responsible for storing the result in the appropriate table partition for later lookup by the users. This microservice sends an acknowledgement of the completed classification back to the event “status” queue prior. We shall refer to this microservice as the Table Web Service.  The “Status Monitor” is the overall system log and is critical for our performance measurements.

arch-detail

Figure 2.   The detailed architecture picture.

The Execution Environment

The RabbitMQ server is hosted on a Linux VM on Microsoft Azure and the rest of the system is deployed on a small Mesosphere cluster in a different Azure data center (see Figure 3).   The cluster is indeed small. There is one master node that runs the Mesosphere life cycle management services, the web browser interface and Marathon job management service. There are 5 dual core worker VMs. The Azure table is also a separate service.   The experiment status monitor runs in an IPython Notebook on my laptop.  experiment-setup

Figure 3.  Execution Environment

Using the web interface for Marathon we deploy different service configurations for the experiments described here.   Our table-pusher web service listens on a fixed port, so we can only have one instance per worker VM.   In practice this works out reasonably well because the classify services on any VM can invoke the local copy with very little overhead.   However, as we shall see, there are other major overheads associated with this service that will play a large role in the performance analysis.

We are not limited by the number of classify services we deploy.   If the system operator wants 20 “physics” sub-domain classifiers, the operator need only increase the deployment size of the classifier service by 20 and then submit 20 “physics” role messages into the role queue.

The Microservice Life Cycles

The top-level topic classifiers take the data from the RSS streams and apply the machine learning algorithms to produce the best 1 (or 2) potential topics.   The result is converted to a json object which contains

  1. The domain as determined by the random forest classifier we call RF
  2. The domain as determined by a hybrid classifier we call “the best of 3 classifier” (see the previous post) we call Best.
  3. The title of the document
  4. The body (abstract) of the document.

This doc json object is then pushed into the queue specified by the ML classifiers. If both RF and Best agree on a topic like “math” then the document is put in the “math” queue.   If they disagree and one says “math” and the other says “bio” then the document is placed in both the “math” and “bio” queues.    The classifier microservice has the following life-cycle.

ML Classifier

  1. When launched it opens a connection to the “roles” queue and wait for a topic.
  2. When it receives the topic message from the “role” queue the classifier service, must initialize all the ML models for that topic from the saved trained models. (The models have been previously trained as described in the previous post and the trained models have been “pickled” and saved as blob in Azure blob storage.)
  3. It then begins to scan the queue for that topic.   It pulls the json document objects from the queue and applies the classifier. It then packages up a new json object consisting of the main topic, new sub-classification, title and abstract. For example, if the item came from the “physics” queue and the classifier decides it in the subclass “General Relativity”, then that is the sub-classification that in the object. (The classifier also has a crude confidence estimator. If it is not very certain the sub-classification is “General Relativity?”.   If it is very uncertain and General Relativity is the best of the bad choices, then it is “General Relativity???”.) It then sends this object via a web service call to a local web service (described below) that is responsible for putting the object into an Azure table. (more on this step below. )
  4. The service repeats 3 until it receives a “reset” message in the topic queue.   It then returns to the roles queue and step 1.

Table Web Service

(note: this is a revised version.   the reliable and fast versions were incorrectly described in the  previous version of this post.)

The table web service is the microservice component that receives web service invocations from the classifier. Staying true to the Microservice design concept it has only one job.

  1. When Invoked it pulls the json object from the invocation payload and formats a Table service tuple to insert into the partition of the table associated with the main topic.
  2. We actually have three versions of web service one we call “fast” and the other “reliable” and the other is “null”.
    • The “reliable” version works by making a new connection to the RabbitMQ event server and then opens a channel to send it messages.  It then inserts the tuple in  table and then sends a time stamped status notification to  the event queue.
    • The “fast” version reuses the RabbitMQ connection for as long as possible. For each invocation it opens the channel and inserts the message.
    • The “null” version skips the Azure table entirely and is used only for the performance evaluation.

(Unfortunately, the RabbitMQ connection will timeout if there is a lull in the use and catching the timeout connection and restarting it proved to be problematic.   So the “fast” version is only used to illustrate the performance models below. It is not reliable. The “reliable” version is very reliable.   It runs for months at a time without problems. As we shall see, it is slower.  The “null” version is fast and reliable but not actually useful for anything other than benchmarks.)

  1. Before returning a web service response to the calling classifier instance the web-service sends a time-stamped message to status queue.

The Experiment

It was stated at the outset of this document that we are interested in understanding the effect of scaling the number Docker containers on the ability of the system to meet the flow of events into the system. To test the system under load we pre-loaded 200 abstract for the Biology topic into the message queue and all the classifier service instances were instructed to be Biology sub-topic classifiers.   We then ran this with 1, 2, 4, 8, 16 and 24 instances of the classifier microservice.   We ran this against all three versions of the table web service.   To compute the elapsed time to consume and process all 200 abstracts the table web service appended a timestamp to each status message. By looking at the time stamp for the first message and the last message we could determine the system throughput. (There were five different worker VMs so there is a small issue of clock skew between timestamps, but this was negligible.   What was not negligible was delays cause by network traffic between the data centers.   Hence I would estimate that the error bars for all the number below to be about 10%. We ran many tests and the number below are averages.

The most important number is the throughput of the system in classifications per second.   The table in Figure 4 summarizes the results.   Clearly the best throughput was obtained with the combination of the null classifier and the null table service.   The next best was the combination of the classifier and the null table service.

throughput

Figure 4. Throughput results for 1, 2, 4, 8, 16 and 24 instances of the classifier service.

The combination of the classifier and either the fast or reliable table service showed rather poor performance.   To understand this we need to look at the basic flow of execution of a single classifier and a single table web service as shown in Figure 5.   Note that the classifier that invokes the web service cannot return to fetch another task until the table web service has sent the status message.

classify-thread

Figure 5. Flow of control of the classify-table web service from the message queue and back.

To understand where congestion may exist look at the points where resource contention may occur when multiple classify instances are running.   This is illustrated in Figure 6.

classify-pipeline

Figure 6.   The major points of resource contention occur when the services access the Azure Table service and the RabbitMQ message broker.

Quantifying this let

eq1 - Copy

We can do a very crude speedup analysis based on Amdahl’s law.   Looking at the part of the computation that is purely parallel and assuming sequential access to the Table service and message broker we can say

eq2 - Copy

tseq is a function of the two points of contention.   It turns out that the message queue is fast in comparison to the Azure table service.   This is due to the complexity of inserting an item into a table. Because we are only using one table partition “Biology” all our inserts go into one place.   Unfortunately, inserts into any disk-based system are going to involve locks so this is not surprising.  However, the difference between the fast version and the reliable version illustrate the high cost of establishing a new connection to the message broker for each event.  I am not sure what the function “f” is above but i suspect it is additive.

Let Tserial be the time for one instance of the classify service to complete the execution the set of messages and Tpar (n) be the time it takes n instances to do the job. We then have

eq3 - Copy

The speedup of n classifiers over a single classifier is then

eq4 - Copy

Analysis of the data on the time to do a single classifier with no table service give us tpar = 450 ms (approximately) and tseq  = 60 ms for the fast service and 100 ms for the reliable service.   Using the formula above we have the maximum speed-up Smax   = 8.5 for the fast service an 5.5 for the reliable service.   This approximately agrees with the results we can measure as seen in figure 7 below.

speedup

Figure 7.   Speed-up relative to one instance of the classifier with different table services plotted against different numbers of classifier instances.

Of course speed-up is a relative number, but it does illustrate the way systems scale.   As we can see the limits are approximately what we predicted.   However the case of the null table is different.   There should be essentially no sequential component so why is it flat-lining at around 13? The answer is that there are only 10 cores in our system.   That puts a fundamental limit on the performance.   In fact we are lucky to get more than a speed-up of 10.

Conclusions

One thing that does not come out in the text above is how well the concept of microservices worked.   One of the major selling points for this style of application construction is that factoring of the services into small independent components can help with maintenance of the overall system.   Using Mesosphere and the Marathon task scheduled proved this point well.   It was possible to modify, remove and scale any of the services without having to bring down the others.   This made all of the experiment described above easy to carry out.   And the system remained up for several months.

It is clear that 10 cores is not a very impressive platform to test scalability.   We did demonstrate that we could easily scale to about 2.5 services per core without problems.   Going beyond that did not improve performance.   I am currently trying to build an instance of Mesosphere or other Docker cluster mechanism (perhaps Swarm) with at least 100 servers.   I will update these results when i have that complete.

Another thing to do is cache the table writes and then update the table with an entire block.   A good solution for this is to introduce a Redis cache service.   Fortunately Docker makes this trivial.   Redis containers are  in the library.  Doing this and having a much larger cluster should enable at least a 20x improvement.

Actually, a more interesting experiment is to try this classification problem using one of the more conventional tools that already exist.   For example Microsoft Stream Analytics + Azure Machine Learning & Microsoft StreamInsight, Spark Streaming + Event Hubs and HDInsight, Amazon Kinesis, Google BigQuery, IBM Watson Analytics and numerous open source IoT platforms.   I plan to start with the Azure solution while i still have Azure resources.

I am in the process of moving all the code and data for this experiment into GitHub.   I will post link to these items in the next week or so.