Tag Archives: machine learning

An Encounter with Google’s TensorFlow

NOTE: This is a revised version of this blog that reflects much better ways to do some of the tensor algebra in the first example below.

Google has recently released some very interesting new tools to the open source community.   First came Kubernetes, their container microservice framework, and that was followed by two new programming systems based on dataflow concepts.   Dataflow is a very old idea that first appeared in the computer architecture community in the 1970 and 80s.   Dataflow was created as an alternative to the classical von-Neumann computer design.  It was hoped that it would have a performance advantage because it would exploit much greater levels of parallelism than thought possible with classical computers[1].  In dataflow systems computation can be visualized as a directed graph where the vertices of the graph are operators and data “flows” through the system along the edges of the graph.  As soon as data is available on all the input edges of an operator node the operation is carried out and new data is put on the output edges of the node.  While only a few actual data flow computers were built, the concept has been fundamental to distributed and parallel computing for a long time.  It shows up in applications like complex event processing, stream analytics and systems like the Microsoft AzureML programming model I described earlier.

Google’s newly released Cloud Dataflow is a programming system for scalable stream and batch computing.  I will return to Cloud Dataflow in another post, but for now, I will focus on the other dataflow system they released.  Called TensorFlow, it is a programming system designed to help researchers build deep neural networks[2] and it appears to be unrelated to Cloud Dataflow.  All the documentation and downloadable code for TensorFlow are on-line.  TensorFlow is also similar to Microsoft Research’s Computational Network Toolkit (CNTK) in several ways that I will describe later.

TensorFlow is designed allow the programmer to easily “script” a dataflow computation where the basic units of computing are very large multi-dimensional arrays.   The scripts are written in Python or C++ and it works very well with IPython/jupyter notebooks.   In the following pages I will give a very light introduction to TensorFlow programming and illustrate it by building a bare-bones k-means clustering algorithm.   I will also briefly describe one of their examples of a convolutional neural network.

TensorFlow can be installed and run on your laptop, but as we shall show below, it really shines on bigger more powerful hardware.  An interesting thing happened in the recent evolution of deep neural networks.   The most impressive early work on really large deep neural network was done on large cloud-scale clusters.  However, the type of parallelism needed for doing deep network computation was really very large array math, which is really better suited to execution on a GPU.  Or a bunch of GPUs on a massive memory machine or a cluster of massive memory machines with a bunch of GPUs each.  For example, the Microsoft CNTK has achieved some remarkable results on 8 GPU systems and it will soon be available on the Azure GPU Lab. (I also suspect that supercomputers such as the SDSC Comet with large memory, multi-GPU, multi-core nodes would be ideal.)

TensorFlow: a shallow introduction.

There is a really nice white paper by the folks at Google Research with far more details about the TensorFlow system architecture than I give here.  What follows is a shallow introduction and an experiment.

There are two main concepts in TensorFlow.   The first is the idea of computing on objects that are very large multi-dimensional arrays called tensors.   The second is the fact that the computation you build with tensors are compiled into graphs that are executed in a “dataflow” style.   We need to unpack both of these concepts.

Tensors

Let’s start with tensors.  First, these are not your great-great grandfather’s tensors.   Those tensors were introduced by Carl Friedrich Gauss for differential geometry where they were needed to provide metrics that could be used to describe things like the curvature of surfaces.  Einstein “popularized” tensors in his general theory of relativity.  Those tensors have a very formal algebra of covariant and contravariant forms. Fortunately, we don’t have to go there to understand the use of tensors in TensorFlow’s machine learning applications.   In fact, if you understand Numpy arrays you are off to a good start and Numpy arrays are just really efficient multidimensional arrays.   TensorFlow can be programmed in Python or C++ and I will use the Python binding in the discussion below

In TensorFlow tensors are created and stored in container objects that are one of three types: variables, placeholders and constants.   Let’s start with constants.  As the name implies constant tensors are initialized once and never changed.   Here are two different ways to create constant arrays that we will use in an example below.  One is a tensor of size Nx2 filled with zeros and the other is a tensor of the same shape filled with the value 1.

import numpy as np
import tensorflow as tf
N = 10000
X = np.zeros(shape=(N,2))
Xv = tf.constant(X, name="points")
dones = tf.fill([N,2], np.float64(1.))

We have used a Numpy array of values to create the TensorFlow constant. Notice that we have given our tensor a “name”. Naming tensors is optional but it comes in very handy when debugging.
Variables are holders of multidimensional arrays that persist across sessions and may be modified and even saved to disk. The concept of a “session” is important in TensorFlow. You can think of it as a context where actual TensorFlow calculations take place. The first calculation involving a variable is to load it with initial values. For example, let’s create a 2 by 3 tensor that, when initialized contains the constant 1.0 in every element. Then let’s convert that back to a Numpy array and print it. To do that we need a session and we will call the variable initializer in that session. When working in the IPython notebook it is easiest to use an “InteractiveSession” so that we can easily edit and redo computations.

sess = tf.InteractiveSession()
myarray = tf.Variable(tf.constant(1.0, shape = [2,3]))
init =tf.initialize_all_variables()
sess.run(init)
mynumpy =myarray.eval()
print(mynumpy)
[[ 1.  1.  1.]
 [ 1.  1.  1.]]

As shown above, the standard way to initialize variables is to initialize them all at once. The process of converting a tensor back to a Numpy array requires evaluating the tensor with the “eval” method. As we shall see this is an important operator that we will describe in more detail below.

The final tensor container is a “placeholder”.   Creating a placeholder object only requires specifying a type and some information about its shape.   We don’t initialize a placeholder like a variable because it’s initial values will be provided later in an eval()-like operation.  Here is a placeholder we will use later.

x = tf.placeholder(tf.float32, [None, 784], name="x")

Notice that in this case the placeholder x is two dimensional but the first dimension is left unbound.  This allows us to supply it with a value that is any number of rows of vectors of length 784.   As we shall see, this turns out to be very handy for training neural nets.

Dataflow

The real magic of TensorFlow is in the dataflow execution of computations.   A critical idea behind TensorFlow is to keep the slow world of Python away from the fast world of parallel tensor algebra as much as possible.  Python is used only to describe the dataflow graph of the computation.   TensorFlow has a large library of very useful tensor operators.   Let’s describe a computation we will use in the k-means example.   Suppose we have 8 points in the plane in a vector called Xv and 4 special points in an array called kPoints.   I would like to label each of the 8 points with the index of the special point nearest to it.   This should give me 4 clusters of points.   I now want to find the centroid of each of these clusters.  Assume we have a tensor called “blocked” where row i gives the distance from each of our 8 points to the ith special point.  For example, if “blocked” is as shown below then what we are looking for is the index of the smallest element in each column.tensor-table

To find the centroids I use this min index vector to select the elements of Xv in each cluster. We just add them up to and divide by the number of points in that cluster.    The TensorFlow code to do this is shown below.  TensorFlow has a large and handy library of tensor operators many of which are similar to Numpy counterparts.   For example, argmin computes the index of the smallest element in an array given the dimension over which to search.   Unsorted_segment_sum will compute a sum using another vector to define the segments.   The min index vector nicely partitions the Xv vector into the appropriate “segments” for the unsorted_segment_sum to work.  The same unsorted_segment_sum operator can be used to count the number of elements in each cluster by adding up 1s.

mins = tf.argmin(blocked, 0, name="mins")
sums = tf.unsorted_segment_sum(Xv, mins, 4)
totals = tf.unsorted_segment_sum(dones, mins, 4, name="sums")
centroids = tf.div(sums, totals, name = "newcents")

The key idea is this.  When this python code is executed it builds a data flow graph with three inputs (blocked, Xv, dones) and outputs a tensor called centroids as shown below[3].[4]

The computation does not start until we attempt to evaluate the result with a call to centroids.eval(). This sort of lazy evaluation means that we can string together as many tensor operators as needed that will all be executed outside the Python REPL.

tensor-graph1

Figure 1.  Computational flow graph

TensorFlow has a very sophisticated implementation.  The computing environment upon which it is running is a collection of computational devices.  These devices may be cpu cores, GPUs or other processing engines.   The TensorFlow compilation and runtime system maps subgraphs of the flow graph to these devises and manages the communication of tensor values between the devices.  The communication may be through shared memory buffers or send-receive message pairs inserted in the graph at strategically located points.   Just as the dataflow computer architects learned, dataflow by itself is not always efficient.  Sometimes control flow is needed to simplify execution.  The TensorFlow system also inserts needed control flow edges as part of its optimizations.   As was noted above, this dataflow style of computation is also at the heart of CNTK.   Another very important property of these dataflow graphs is that it is relatively easy to automatically compute derivatives by using the chain-rule working backwards through the graph.   This makes it possible to automate the construction of gradients that are important for finding optimal parameters for learning algorithms.   I won’t get into this here but it is very important.

A TensorFlow K-means clustering algorithm

With the kernel above we now have the basis for a simple k-means clustering algorithm shown in the code below.  Our initial centroid array is a 4 by 2 Numpy array we shall call kPoints.   What is missing is the construction of the distance array blocked.   Xv holds all the N points as a constant tensor.   My original version of this program used python code to construct blocked.  But there is an excellent improvement on computation of “mins” published in GitHub by Shawn Simister at.  Shawn’s version is better documented and about 20% faster when N is in the millions.  Simister’s computation does not need my blocked array and instead expands the Xv and centroids vector and used a reduction to get the distances.  Very nice (This is the revision to the blog post that I referred to above.)

N = 10000000
k = 4
#X is a numpy array initialized with N (x,y) points in the plane
Xv = tf.constant(X, name="points")
kPoints = [[2., 1.], [0., 2.], [-1., 0], [0, -1.]]
dones = tf.fill([N,2], np.float64(1.))

centroids = tf.Variable(kPoints, name="centroids")
oldcents = tf.Variable(kPoints)
initvals = tf.constant(kPoints, tf.float64)

for i in range(20):
    oldcents = centroids
    #This is the Simister mins computation
    expanded_vectors = tf.expand_dims(Xv, 0)
    expanded_centroids = tf.expand_dims(centroids, 1)
    distances = tf.reduce_sum( tf.square(
               tf.sub(expanded_vectors, expanded_centroids)), 2)
    mins = tf.argmin(distances, 0)
    #compute the new centroids as the mean of the points nearest
    sums = tf.unsorted_segment_sum(Xv, mins, k)
    totals = tf.unsorted_segment_sum(dones, mins, k, name="sums")
    centroids = tf.div(sums, totals, name = "newcents")
    #compute the distance the centroids have moved sense the last iteration
    dist = centroids-oldcents
    sqdist = tf.reduce_mean(dist*dist, name="accuracy")
    print np.sqrt(sqdist.eval())
    kPoints = centroids.eval()

However, this version is still very inefficient.  Notice that we construct a new execution graph for each iteration.   A better solution is to pull the graph construction out of the loop and construct it once and reuse it.   This is nicely illustrated in Simister’s version of K-means illustrated on his Github site.

It is worth asking how fast Tensor flow is compared with a standard Numpy version of the same algorithm. Unfortunately, I do not have a big machine with fancy GPUs, but I do have a virtual machine in the cloud with 16 processors. I wrote a version using Numpy and executed them both from an IPython notebook. The speed-up results are in Figure 2. Comparing these we see that simple Python Numpy is faster than TensorFlow for values of N less than about 20,000. But for very large N we see that TensorFlow can make excellent use of the extra cores available and exploits the parallelism in the tensor operators very well.

kmeans-speed

Figure 2. speed-up of TensorFlow on 16 cores over a simple Numpy implementation. The axis is log10(N)

I have placed the source for this notebook in Github. Also an improved version based on Simister’s formulation can be found there.

TensorFlow also has a very interesting web tool call TensorBoard that lets you look at various aspects of the logs of your execution. One of the more interesting TensorBoard displays is the dataflow graph for your computation. Figure 3 below shows the display generated for the k-means computational graph.

tensorboard

Figure 3.   TensorBoard graph displace of the k-means algorithm.

As previously mentioned this is far more complex than my diagram in Figure 1.   Without magnification this is almost impossible to read.  Figure 4 contains a close-up of the top of the graph where the variable newcents is created.   Because this is an iteration the dataflow actually contains multiple instances and by clicking on the bubble for the variable it will show you details as shown in the close-up.

tensorboard3

Figure 4.   A close-up of part of the graph showing expanded view of the multiple instances of the computation of the variable newcents.

A brief look at a neural network example

The k-means example above is really just an exercise in tensor gymnastics. TensorFlow is really about building and training neural networks.   The TensorFlow documents have a number of great example, but to give you the basic idea I’ll give you a brief look at their convolutional deep net for the MNIST handwritten digit example.   I’ll try to keep this short because I will not be adding much new here. The example is the well-known “hello world” world of machine learning image recognition.   The setup is as follows.   You have thousands of 28 by 28 black and white images of hand written digits and the job of the system you build is to identify each.   The TensorFlow example uses a two-layer convolutional neural net followed by a large, densely connected layer and a readout layer.

If you are not familiar with convolutional neural nets, here is the basic idea.   Images are strings of bits but they also have a lot of local 2d structure such as edges or holes or other patterns.  What we are going to do is look at 5×5 windows to try to “find” these patterns.  To do this we will train the system to build a 5×5 template W array (and a scalar offset b) that will reduce each 5×5 window to a point in a new array conv by the formula

conv-formula

(the image is padded near the boundary points in the formula above so none of the indices are out of bounds)

We next modify the conv array by applying the “ReLU” function max(0,x) to each x in the conv array so it has no negative values.   The final step is to do “max pooling”.  This step simply computes the maximum value in a 2×2 block and assigns it to a smaller 14×14 array.   The most interesting part of the convolutional network is the fact that we do not use one 5×5 template but 32 of them in parallel producing 32 14×14 result “images” as illustrated in Figure 5 below.convolutional

Figure 5.   The first layer convolutional network.

When the network is fully trained each of the 32 5×5 templates in W is somehow different and each selects for a different set of features in the original image. One can think of the resulting stack of 32 14×14 arrays (called h_pool1) as a type of transform of the original image much as a Fourier transform can separate a signal in space and time and transform it into frequency space. This is not what is going on here but I find the analogy helpful.
We next apply a second convolutional layer to the h_pool1 tensor but this time we apply 64 sets of 5×5 filters to each the 32 h_pool1 layers (and adding up the results) to give us 64 new 14×14 arrays which we reduce with max pooling to 64 7×7 arrays called h_pool2.

Rather than provide the whole code, which is in the TensorFlow tutorial,  I will skip the training step and show you how you can load the variables from a previous training session and use them to make predictions from the test set. .   (The code below is modified versions of the Google code found at GitHub and subject to their Apache copyright License.)

Let’s start by creating some placeholders and variables.  We start with a few functions to initialize weights.   Next we create a placeholder for our image variable x which is assumed to be a list of 28×28=784 floating point vectors.  As described above, we don’t know how long this list is in advance. We also define all the weights and biases described above.

def weight_variable(shape, names):
  initial = tf.truncated_normal(shape, stddev=0.1)
  return tf.Variable(initial, name=names)

def bias_variable(shape, names):
  initial = tf.constant(0.1, shape=shape)
  return tf.Variable(initial, name=names)

x = tf.placeholder(tf.float32, [None, 784], name="x")

sess = tf.InteractiveSession()

W_conv1 = weight_variable([5, 5, 1, 32], "wconv")
b_conv1 = bias_variable([32], "bconv")
W_conv2 = weight_variable([5, 5, 32, 64], "wconv2")
b_conv2 = bias_variable([64], "bconv2")
W_fc1 = weight_variable([7 * 7 * 64, 1024], "wfc1")
b_fc1 = bias_variable([1024], "bfcl")
W_fc2 = weight_variable([1024, 10], "wfc2")
b_fc2 = bias_variable([10], "bfc2")

Next we will do the initialization by loading all the weight and bias variable that were saved in the training step. We have saved these values using TensorFlow’s save state method previously in a temp file.

saver = tf.train.Saver()
init =tf.initialize_all_variables()
sess.run(init)
saver.restore(sess, "/tmp/model.ckpt")

We can now construct our deep neural network with a few lines of code. We start with two functions to give us a bit of shorthand to define the 2D convolution and max_pooling operators. We first reshape the image vectors into the image array shape that is needed. The construction of the flow graph is now straight forward. The final result is the tensor we have called y_conv.

def conv2d(x, W):
  return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

def max_pool_2x2(x):
  return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                        strides=[1, 2, 2, 1], padding='SAME')

#first convolutional layer
x_image = tf.reshape(x, [-1,28,28,1])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)
#second convolutional layer
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
h_pool2 = max_pool_2x2(h_conv2)
#final layer
h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
y_conv=tf.nn.softmax(tf.matmul(h_fc1, W_fc2) + b_fc2)

Notice that we have not evaluated the flow graph even though we have the values for all of the weight and bias variables already loaded. We need a value for our placeholder x. Suppose tim is an image from the test set. To see if our trained network can recognize it all we need to do is supply it to the network and evaluate. To supply the value to the network we use the eval() function with a special “feed_dict” dictionary argument. This just lists the name of the placeholder and the value you wish to give it.

tim.shape = ((1,28*28))
y = y_conv.eval(feed_dict={x: tim})
label = np.argmax(y)
print(label)

You can read the TensorFlow tutorial to learn about the network training step.   They use a special function that does the stochastic backpropagation that makes it all look very simple.

How does this thing work?

Developing an intuition for how the convolutional neural network actually works is a real challenge.  The fact that it works at all is amazing.  The convolutional steps provide averaging to help create a bit of location and scale invariance, but there is more going on here.   Note that given a 28*28 image the output of the second convolutional step is 64 7×7 array that represent feature activations generated by the weight templates.   It is tempting to look at these as images to see if one can detect anything.  Obviously the last fully connected layer can do a great job with these.   But it is easy to see what they look like.   If we apply h_pool2.eval(feed_dict={x: image}) we can look at the result as a set of 64 images.  Figure 6 does just that.  I picked two random 9 images, two 0s, two 7s and three 6s.   Each column in the figure represents depicts the first 9 of the 64 images generated by each.    If you stare at this long enough you can see some patterns there.  (Such as the diagonals in the 7s and the cup shape of the 4s.) But very little else.   On the other hand, taken together each (complete) column is a sufficient enough signature for the last layer of the network to identify the figure with over 99% accuracy.   That is impressive.

conv_digits.JPG

Figure 6.  Images of h_pool2 given various images of handwritten digits.

I have put the code to generate these images on Github along with the code to train the network and save the learned weights.  In both cases these are ipython notebooks.

A much better way to represent the data learned in the convolutional networks is to “de-convolve” the images back to the pixel layers.  In Visualizing and Understanding Convolutional Networks by Matthew Zeiler and Rob Fergus do this and the results are fascinating.

Final Notes

I intended this note to be a gentle introduction to the programming style of TensorFlow and not an introduction to deep neural networks, but I confess I got carried away towards the end.   I will follow up this post with another that look at recurrent networks and text processing provided I can think of something original to say that is not already in their tutorial.

I should also note that the construction of the deep network is, in some ways, similar to Theano, the other great Python package for building deep networks.  I have not looked at how Theano scales or how well it can be used for tasks beyond deep neural networks, so don’t take this blog as an endorsement of TensorFlow over Theano or Torch.

On a final note I must say that there is another interesting approach to image recognition problem.  Antonio Criminisi has led a team that has developed Deep Neural Decision Forests that combine ideas from decision forests and CNNs.

[1] This was before the explosion of massively parallel systems based on microprocessors changed the high-end computing world.

[2] If you are new to neural networks and deep learning, you may want to look at Michael Nielsen’s excellent new on-line book. There are many books that introduce deep neural networks, but Nielsen’s mathematical treatment is very clear and approachable.

[3] This is not a picture of the actual graph that is generated.   We will use another tool to visualize that later in this note.

[4] The observant reader will note that we have a possible divide by zero here if one of the special points is a serious outlier.   As I said, this is a bare-bones implementation.

Streaming Events to AzureML Through Azure Stream Analytics

 

UPDATE!  Microsoft has just recently released a much better way to integrate Azure Machine Learning with Azure Stream Analytics.  You can now call an AzureML service directly from the SQL query in the stream analytics system.  This means you don’t need the message bus and microservice layer I describe below.   Check out the blog by Sudhesh Suresh and the excellent tutorial from Jeff Stokes.   I will do a performance analysis to compare this method to the one below, but I will wait until the new feature comes out of “preview” mode :).   I will also use a better method to push events to the eventhub.


 

Doing sophisticated data analytics on streaming data has become a really interesting and hot topic.   There is a huge explosion of research around this topic and there are a lot of new software tools to support it.  Amazon has its new Kinesis system,  Google has moved from MapReduce to Cloud DataFlow, IBM has Streaming Analytics on their Bluemix platform and Microsoft has released Azure Stream Analytics.  Dozens of other companies that use data streaming analytics to make their business work have contributed to the growing collection of open source of tools.   For example LinkedIn has contributed the original version of Apache Kafka and Apache Samza and Yahoo contributed Storm.  And there are some amazing start-ups that are building and supporting important stream analytics tools such as Flink from Data-Artisans. Related university research includes systems like Neptune from Colorado State.  I will post more about all of these tools later, but in this article I will describe my experience with the Microsoft EventHub, Stream Analytics and AzureML.   What I will show below is the following

  1. It is relatively easy to build a streaming analytics pipeline with these tools.
  2. It works, but the  performance behavior of the system is a bit uneven with long initial latencies and a serious bottleneck which is probably the result of small size of my experiment.
  3. AzureML services scale nicely
  4. Finally there are some interesting ways in which blocking can greatly improve performance.

In a previous post I described how to use AzureML to create a version of a science document analyzer that I originally put together using Python Scikit-Learn, Docker and Mesosphere.  That little project is described in another post.   The streaming challenge here is very simple.   We have data that comes from RSS feeds concerning the publication of scientific papers.  The machine learning part of this is to use the abstract of the paper to automatically classify the paper into scientific categories.  At the top level these categories are “Physics”, “math”, “compsci”, “biology”, “finance”.    Big streaming data challenges that you find in industry and science involve the analysis of events that may be large and arrive at the rate of millions per second.   While there are a lot of scientists writing lots papers, they don’t write papers that fast.   The science RSS feeds I pull from are generating about 100 articles per day.  That is a mere trickle.  So to really push throughput experiments I grabbed several thousands of these records and wrote a simple server that would push them as fast as possible to the analysis service.

In this new version described here I want to see what sort of performance I could get from AzureML and Azure Stream Analytics.   To accomplish this goal I set up the pipeline shown in Figure 1.

system-diagram-eventhub

Figure 1.   Stream event pipeline

As shown in the diagram events (document abstracts) are initially pushed into the event hub which is configured as the source for the stream analytics engine.  The analytics engine does some initial processing of the data and then pushes the events to a message queue in the Azure Message Bus.  The events are then pulled from the queue by a collection of simple microservices which call the AzureML classifier and then push the result of the call into the azure table service.

In the following paragraphs I will describe how each stage of this pipeline was configured and I will conclude with some rather surprising throughput results.

Pushing events into the Event Hub.

To get the events into the Event Hub we use a modified version the solution posted by Olaf Loogman.  Loogman’s post provides an excellent discussion of how to get an event stream from the Arduino Yun and the Azure event hub using a bit of C and Python code.  Unfortunately the Python code does not work with the most current release of the Python Azure SDK[1], but it works fine with the old one. In our case all that we needed to do was modify Loogman’s code to read our ArXiv RSS feed data and convert it into a simple JSON object form

{ ‘doc’: ‘the body of the paper abstract’,
  ‘class’: ‘one of classes:Physics, math, compsci, bio, finance’,
  ‘title’: ‘ the title of the paper’ 
  }

The Python JSON encoder is very picky so it is necessary to clean a lot of special characters out of the abstracts and titles.  Once that is done it is easy to push the stream of documents to the Event Hub[2].

According to Microsoft, the Event Hub is capable of consuming many millions of events per second.  My simple Python publisher was only able to push about 4 events per second to the hub, but by running four publisher instances I was up to 16 eps.   It was clear to me that it would scale to whatever I needed.

The Stream Analytics Engine

The Azure stream analytics engine, which is a product version of the Trill research project, has a very powerful engine for doing on-line stream processing.   There are three things you must do to configure the analytics engine for your application.    First you must tell it where to get its input.   There are three basic choices:  the event hub,  blob storage and the “IOT” hub.   I configured it to pull from my event hub channel.   The second task is to tell the engine where to put the output.  Here you have more choices.  The output can go to a message queue, a message topic, blob or table storage or a SQL database.  I directed it to a message queue.  The final thing you must do is configure the query processing step you wish the engine to do.   For the performance tests described here I use the most basic query which simply passes the data from the event hub directly to the message queue.   The T-SQL code for this is

SELECT 
     *
INTO
     eventtoqueue
FROM
     streampuller

where eventtoqueue is the alias for my message bus queue endpoint and streampuller is the alias for my event queue endpoint.  The original JSON object that went into the event hub emerges with top level objects which include the times stamps of the event entry and exit as shown below.

{  "doc": "body of the abstract",
   "class": "Physics",
   "title":"title of the paper",
   "EventProcessedUtcTime":"2015-12-05T20:27:07.9101726Z",
    "PartitionId":10, 
    "EventEnqueuedUtcTime":"2015-12-05T20:27:07.7660000Z"
}

The real power of the Analytics Engine is not being used in our example but we could have done some interesting things with it.   For example, if we wanted to focus our analysis only on the Biology documents in the stream, that would only require a trivial change to the query.

SELECT
     *
INTO
    eventtoqueue
FROM
    streampuller
    WHERE class = ‘bio’

Or we could create a tumbling window and group select events together for processing as a block.  (I will return to this later.) To learn more about the Azure Analytics Engine and T-SQL a good tutorial is a real-time fraud detection example.

Pulling events from the queue and invoking the AzureML service

To get the document events from the message queue so that we can push them to the AzureML document classifier we need to write a small microservice to do this.  The code is relatively straightforward, but a bit messy because it involves converting the object format from the text received from the message queue to extract the JSON object which is then converted to the list form required by the AzureML service.   The response is then encoded in a form that can be stored in the table.    All of this must be encapsulated in the appropriate level of error handling.   The main loop is shown below and the full code is provided in Github.

def processevents(table_service, hostname, bus_service, url, api_key):
    while True:
       try:
            msg = bus_service.receive_queue_message('tasks', peek_lock=False)
            t = msg.body
            #the message will be text containing a string with a json object 
            #if no json object it is an error.  Look for the start of the object
            start =t.find("{")
            if start > 0:   
                t = t[start:]
                jt = json.loads(t)
                title = jt["title"].encode("ascii","ignore")
                doc  = jt["doc"].encode("ascii","ignore") 
                tclass = jt["class"].encode("ascii","ignore")
                evtime = jt["EventEnqueuedUtcTime"].encode("ascii", "ignore")
                datalist = [tclass, doc, title]
                #send the datalist object to the AzureML classifier
                try:
                       x = sendrequest(datalist, url, api_key)
                       #returned value is the best guess and 
                       #2nd best guess for the class
                       best = x[0][1]
                       second = x[0][2]
                       #save the result in an Azure Table using the hash 
                       # of the title as the rowkey
                       #and the hostname of this container as the table partition
                       rk = hash(title)
                       timstamp = str(int(time.time())%10000000)
                      item = {'PartitionKey': hostname, 'RowKey': str(rk),
                              'class':tclass, 'bestguess':best, 'secondguess':second,
                              'title': title, 'timestamp': timstamp, 'enqueued': evtime}
                      table_service.insert_entity('scimlevents', item)
                 except:
                        print "failed azureml call or table service error"
           else:
                 print "invalid message from the bus"
       except:
          print "message queue error”

Performance results

Now that we have this pipeline the most interesting experiment is to see how well it will perform when we push a big stream of messages to it.

To push the throughput of the entire pipeline as high as possible we wanted as many instances of the microservice event puller as possible.   So I deployed a Mesos Cluster with 8 dual core worker nodes and one manager node using the template provided  here.   To maximize parallelism for the AzureML service, three additional endpoints were created.   The microservices were assigned one of the four AzureML service endpoints as evenly as possible.

We will measure the throughput in seconds per event or, more accurately, the average interval between event arrivals over a sliding window of 20 events.     The chart below shows the seconds/event as the sliding window proceeds from the start of a stream to the end.   The results were a bit surprising.   The blue line represents one instance of the microservice and one input stream.   As you can see there is a very slow startup and the performance levels out at about 1 second/event.   The orange line is the performance with 12 microservice instances and four concurrent input streams.  Again the startup was slow but the performance leveled off at about 4 events/sec.

stream2

Figure 2.   Performance in Seconds/Event with a sliding window of 20 events.   Event number on the x-axis.

This raises two questions.

  1. Why the slow startup?
  2. Why the low performance for 12 instance case? Where is the bottleneck in the pipeline? Is it with the Azure Stream Analytics service,  the message bus or the AzureML service?

Regarding the slow startup, I believe that the performance of the Azure services are scaled on demand.  In other words, as a stream arrives at the event hub, resources are allocated to try to match the processing rate with the event arrival rate.   For the first events the latency of the system is very large (it can be tens of seconds), but as the pool of unprocessed events grows in size the latency between event processing drops very fast.   However, as the pool of unprocessed events grows smaller small resources are withdrawn and the latency goes back up again.  (We pushed all the events into the event hub in the first minute of processing, so after that point the number in the hub declines.  You can see the effect of this in the tail of the orange curve in Figure 2.)  Of course, belief is not science.   It is conjecture.

Now to find the bottleneck that is limiting performance.   It is clear that some resources are not scaling to a level that will allow us to achieve more than 4 events/second with the current configuration.   So I tried two experiments.

  1. Run the data stream through the event hub, message bus to the microservices but do not call the AzureML service.   Compare that to the version where the AzureML service is called.
  2. Run the data stream directly into the message bus and do not call the AzureML service.  Compare that one to the others.

The table below in Figure 3 refers to experiment 1.   As you can see, the cost of the AzureML service is lost in the noise.   For experiment 2 we see that the performance of the message bus alone was slightly worse that the message bus plus the eventhub and stream analytics.  However, I expect these slight differences are not statistically significant.

stream3

 Figure 3.  Average events per second compairing one microservice instance and one input stream to 8 microservices and 4 input streams to 8 microservices and 4 input streams with no call to AzureML and 8 microservices with 4 input streams and bypassing the eventhub and stream analytics.

The bottom line is that the message bus is the bottleneck in the performance of our pipeline.  If we want to ask the question how well the AzureML service scales independent of the message bus, we can replace it with the RabbitML AMQP server we have used in previous experiments.     In this case we see a very different result.   Our AzureML service demonstrates nearly linear speed-up in terms of events/sec.    The test used an ancreasing number of microservices (1, 2, 4, 8, 12, 16 and 20) and four endpoints to the AzureML service.   As can be seen in Figure 4 the performance improvement lessens when the number of microservice instances goes beyond 16.  This is not surprising as there are only 8 servers running all 20 instances and there will be contention over shared resources such as the network interface.

stream4

Figure 4.   AzureML events per second using multiple microservice instances pulling the event stream from a RabbitMQ server instance running on a Linux server on Azure.

Takeaway

It was relatively easy to build a complete event analytics pipeline using the Azure EventHub, Azure Stream Analytics and to build a set of simple microservices that pull events from the message bus and invoke an AzureML service.   Tests of the scalability of the service demonstrated that performance is limited by the performance of the Azure message bus.    The AzureML service scaled very well when tested separately.

Because the Azure services seems to deliver performance based on demand (the greater the number unprocessed events in the EventHub, the greater the amount of resources that are provisioned to handle the load.)  The result is a slow start but a steady improvement in response until a steady state is reached.   When the unprocessed event pool starts to diminish the resources seem to be withdrawn and the processing latency grows.      It is possible that the limits that I experience were because the volume of the event stream I generated was not great enough to warrant the resources to push the performance beyond 4 events/sec.   It will require more experimentation to test that hypothesis.

It should also be noted that there are many other ways to scale these systems.  For example, more message bus queues and multiple instances of the AzureML server.    The EventHub is designed to ingest millions of events per second and the stream analytics should be able to support that.  A good practice may be to use the Stream Analytics engine to filter the events or group events in a window into a single block which is easier to handle on the service side.   The AzureML server has a block input mode which is capable of processing over 200 of our classification events per second, so it should be possible to scale the performance of the pipeline up a great deal.

 

[1] The Microsoft Azure Python folks seem to have changed the namespaces around for the Azure SDK.   I am sure it is fairly easy to sort this out, but I just used the old version.

[2] The source code is “run_sciml_to_event_hub.py” in GitHub for those interested in the little details.

Performance Analysis of a Cloud Microservice-based ML Classifier

(This is an edited version correcting some of the analysis in the version i posted last week)

Microservice architectures have become an important tool for large scale cloud application deployment. I wanted to understand how well a swarm of microservices could be used to process streams of events where a non-trivial computation is required for each.   I decided a fun test would be to use machine learning to classify scientific document abstract that appear on public RSS feeds.   This is the first result of that experiment.  It uses a very small Mesosphere cluster on Microsoft Azure.   I will release the code to GitHub and data as soon as i clean it up a bit.

In a previous post we looked at building a document classifier for scientific paper abstracts.   In this post we will see how we can deploy it as a network of containerized microservices running on Mesosphere on Microsoft Azure.  We will then push this network of services hard to see how well it scales and we will try to isolate the performance issues. Specifically we are interested in the following challenge. If you have a heavy stream of events coming into a stream analysis system you may overload your services.   The conventional wisdom is that you can scale up the number of servers and services to cope with the load.   Fortunately more than one microservice instance can be deployed on a single VM. But does increasing the number of services instances per VM allow us to scale the system to meet the throughput requirements? Are there fundamental limits to how well this strategy will work? What are the limiting factors?

To summarize where we are so far, we have a set of RSS feeds that are “pushing” scientific publication events to us.   Our system looks at the abstracts of these events and use several different machine learning tools to classify the document into one (or more) scientific disciplines and sub-disciplines.   We have five major disciplines: Physics, Math, Biology, Computer Science and Finance. Each of these major disciplines is further divided into a number of sub-disciplines or specialties. We have divided the work into four main activities

  1. Training the classifiers. For this we used some of the labeled data from the ArXiv RSS feeds as training data.   (This was discussed in detail in the previous post, so we won’t go over it again here.)   This training phase is used to generate models that will be used by the classifier services discussed here.   We generate models for the main topic classifier and each of the sub-domain classifiers.   This is a pre-processing step and it is not counted in our performance analysis.
  2. Pulling the events from the RSS feeds and pushing them to the main topic classifier. The main topic predictor/classifier uses two primary machine learning methods to determine which disciplines the incoming documents belongs to.   As we have seen, these two methods agree on the discipline about 87% of the time. But sometimes they disagree on a document and we have two potential correct answers.   We interpret this as a potentially “interdisciplinary” research document and classify it as belong to both answers.
  3. Doing the sub-domain classification. As shown in the Figure 1 below the major topic classifiers push the document to the one (or two) sub-discipline specific classifiers.   These classifiers use the same ML methods as the main topic classifier to further classify the document into the sub-disciplines.

architecture

Figure 1.   Conceptual model of the document classifier microservice architecture

Managing queues and Pushing the classified document to a table of results.

What is not shown in the diagram in Figure 1 is what happens next. While the conceptual model provides a reasonable top-level view, the real design requires several additional critical components.   First as events move through the system you need a way to buffer them in queues.   We use the Active Message Queuing Protocol (AMQP) which is one of the standards in this space.   A good implementation of AMQP is the RabbitMQ system which we hosted on a server in the Azure cloud.     As shown in Figure 2   we (logically) position the RabbitMQ event hub between the major topic classifiers and the sub-domain classifiers.   We establish 7 queues on the event hub.   There is a queue for each of the major topic areas (Physics, Bio, Math, CS, and Finance), a “status” queue and a “role” queue.   The classify microservices are given classification roles by polling the “role” queue.   This tells them which topic area they are assigned to.   When they complete the sub-domain classification of a document they invoke another microservice that is responsible for storing the result in the appropriate table partition for later lookup by the users. This microservice sends an acknowledgement of the completed classification back to the event “status” queue prior. We shall refer to this microservice as the Table Web Service.  The “Status Monitor” is the overall system log and is critical for our performance measurements.

arch-detail

Figure 2.   The detailed architecture picture.

The Execution Environment

The RabbitMQ server is hosted on a Linux VM on Microsoft Azure and the rest of the system is deployed on a small Mesosphere cluster in a different Azure data center (see Figure 3).   The cluster is indeed small. There is one master node that runs the Mesosphere life cycle management services, the web browser interface and Marathon job management service. There are 5 dual core worker VMs. The Azure table is also a separate service.   The experiment status monitor runs in an IPython Notebook on my laptop.  experiment-setup

Figure 3.  Execution Environment

Using the web interface for Marathon we deploy different service configurations for the experiments described here.   Our table-pusher web service listens on a fixed port, so we can only have one instance per worker VM.   In practice this works out reasonably well because the classify services on any VM can invoke the local copy with very little overhead.   However, as we shall see, there are other major overheads associated with this service that will play a large role in the performance analysis.

We are not limited by the number of classify services we deploy.   If the system operator wants 20 “physics” sub-domain classifiers, the operator need only increase the deployment size of the classifier service by 20 and then submit 20 “physics” role messages into the role queue.

The Microservice Life Cycles

The top-level topic classifiers take the data from the RSS streams and apply the machine learning algorithms to produce the best 1 (or 2) potential topics.   The result is converted to a json object which contains

  1. The domain as determined by the random forest classifier we call RF
  2. The domain as determined by a hybrid classifier we call “the best of 3 classifier” (see the previous post) we call Best.
  3. The title of the document
  4. The body (abstract) of the document.

This doc json object is then pushed into the queue specified by the ML classifiers. If both RF and Best agree on a topic like “math” then the document is put in the “math” queue.   If they disagree and one says “math” and the other says “bio” then the document is placed in both the “math” and “bio” queues.    The classifier microservice has the following life-cycle.

ML Classifier

  1. When launched it opens a connection to the “roles” queue and wait for a topic.
  2. When it receives the topic message from the “role” queue the classifier service, must initialize all the ML models for that topic from the saved trained models. (The models have been previously trained as described in the previous post and the trained models have been “pickled” and saved as blob in Azure blob storage.)
  3. It then begins to scan the queue for that topic.   It pulls the json document objects from the queue and applies the classifier. It then packages up a new json object consisting of the main topic, new sub-classification, title and abstract. For example, if the item came from the “physics” queue and the classifier decides it in the subclass “General Relativity”, then that is the sub-classification that in the object. (The classifier also has a crude confidence estimator. If it is not very certain the sub-classification is “General Relativity?”.   If it is very uncertain and General Relativity is the best of the bad choices, then it is “General Relativity???”.) It then sends this object via a web service call to a local web service (described below) that is responsible for putting the object into an Azure table. (more on this step below. )
  4. The service repeats 3 until it receives a “reset” message in the topic queue.   It then returns to the roles queue and step 1.

Table Web Service

(note: this is a revised version.   the reliable and fast versions were incorrectly described in the  previous version of this post.)

The table web service is the microservice component that receives web service invocations from the classifier. Staying true to the Microservice design concept it has only one job.

  1. When Invoked it pulls the json object from the invocation payload and formats a Table service tuple to insert into the partition of the table associated with the main topic.
  2. We actually have three versions of web service one we call “fast” and the other “reliable” and the other is “null”.
    • The “reliable” version works by making a new connection to the RabbitMQ event server and then opens a channel to send it messages.  It then inserts the tuple in  table and then sends a time stamped status notification to  the event queue.
    • The “fast” version reuses the RabbitMQ connection for as long as possible. For each invocation it opens the channel and inserts the message.
    • The “null” version skips the Azure table entirely and is used only for the performance evaluation.

(Unfortunately, the RabbitMQ connection will timeout if there is a lull in the use and catching the timeout connection and restarting it proved to be problematic.   So the “fast” version is only used to illustrate the performance models below. It is not reliable. The “reliable” version is very reliable.   It runs for months at a time without problems. As we shall see, it is slower.  The “null” version is fast and reliable but not actually useful for anything other than benchmarks.)

  1. Before returning a web service response to the calling classifier instance the web-service sends a time-stamped message to status queue.

The Experiment

It was stated at the outset of this document that we are interested in understanding the effect of scaling the number Docker containers on the ability of the system to meet the flow of events into the system. To test the system under load we pre-loaded 200 abstract for the Biology topic into the message queue and all the classifier service instances were instructed to be Biology sub-topic classifiers.   We then ran this with 1, 2, 4, 8, 16 and 24 instances of the classifier microservice.   We ran this against all three versions of the table web service.   To compute the elapsed time to consume and process all 200 abstracts the table web service appended a timestamp to each status message. By looking at the time stamp for the first message and the last message we could determine the system throughput. (There were five different worker VMs so there is a small issue of clock skew between timestamps, but this was negligible.   What was not negligible was delays cause by network traffic between the data centers.   Hence I would estimate that the error bars for all the number below to be about 10%. We ran many tests and the number below are averages.

The most important number is the throughput of the system in classifications per second.   The table in Figure 4 summarizes the results.   Clearly the best throughput was obtained with the combination of the null classifier and the null table service.   The next best was the combination of the classifier and the null table service.

throughput

Figure 4. Throughput results for 1, 2, 4, 8, 16 and 24 instances of the classifier service.

The combination of the classifier and either the fast or reliable table service showed rather poor performance.   To understand this we need to look at the basic flow of execution of a single classifier and a single table web service as shown in Figure 5.   Note that the classifier that invokes the web service cannot return to fetch another task until the table web service has sent the status message.

classify-thread

Figure 5. Flow of control of the classify-table web service from the message queue and back.

To understand where congestion may exist look at the points where resource contention may occur when multiple classify instances are running.   This is illustrated in Figure 6.

classify-pipeline

Figure 6.   The major points of resource contention occur when the services access the Azure Table service and the RabbitMQ message broker.

Quantifying this let

eq1 - Copy

We can do a very crude speedup analysis based on Amdahl’s law.   Looking at the part of the computation that is purely parallel and assuming sequential access to the Table service and message broker we can say

eq2 - Copy

tseq is a function of the two points of contention.   It turns out that the message queue is fast in comparison to the Azure table service.   This is due to the complexity of inserting an item into a table. Because we are only using one table partition “Biology” all our inserts go into one place.   Unfortunately, inserts into any disk-based system are going to involve locks so this is not surprising.  However, the difference between the fast version and the reliable version illustrate the high cost of establishing a new connection to the message broker for each event.  I am not sure what the function “f” is above but i suspect it is additive.

Let Tserial be the time for one instance of the classify service to complete the execution the set of messages and Tpar (n) be the time it takes n instances to do the job. We then have

eq3 - Copy

The speedup of n classifiers over a single classifier is then

eq4 - Copy

Analysis of the data on the time to do a single classifier with no table service give us tpar = 450 ms (approximately) and tseq  = 60 ms for the fast service and 100 ms for the reliable service.   Using the formula above we have the maximum speed-up Smax   = 8.5 for the fast service an 5.5 for the reliable service.   This approximately agrees with the results we can measure as seen in figure 7 below.

speedup

Figure 7.   Speed-up relative to one instance of the classifier with different table services plotted against different numbers of classifier instances.

Of course speed-up is a relative number, but it does illustrate the way systems scale.   As we can see the limits are approximately what we predicted.   However the case of the null table is different.   There should be essentially no sequential component so why is it flat-lining at around 13? The answer is that there are only 10 cores in our system.   That puts a fundamental limit on the performance.   In fact we are lucky to get more than a speed-up of 10.

Conclusions

One thing that does not come out in the text above is how well the concept of microservices worked.   One of the major selling points for this style of application construction is that factoring of the services into small independent components can help with maintenance of the overall system.   Using Mesosphere and the Marathon task scheduled proved this point well.   It was possible to modify, remove and scale any of the services without having to bring down the others.   This made all of the experiment described above easy to carry out.   And the system remained up for several months.

It is clear that 10 cores is not a very impressive platform to test scalability.   We did demonstrate that we could easily scale to about 2.5 services per core without problems.   Going beyond that did not improve performance.   I am currently trying to build an instance of Mesosphere or other Docker cluster mechanism (perhaps Swarm) with at least 100 servers.   I will update these results when i have that complete.

Another thing to do is cache the table writes and then update the table with an entire block.   A good solution for this is to introduce a Redis cache service.   Fortunately Docker makes this trivial.   Redis containers are  in the library.  Doing this and having a much larger cluster should enable at least a 20x improvement.

Actually, a more interesting experiment is to try this classification problem using one of the more conventional tools that already exist.   For example Microsoft Stream Analytics + Azure Machine Learning & Microsoft StreamInsight, Spark Streaming + Event Hubs and HDInsight, Amazon Kinesis, Google BigQuery, IBM Watson Analytics and numerous open source IoT platforms.   I plan to start with the Azure solution while i still have Azure resources.

I am in the process of moving all the code and data for this experiment into GitHub.   I will post link to these items in the next week or so.