Category Archives: Uncategorized

A Quick Dive into Cloud Data Streaming Technology

This is the second part of a two part series about data streaming technology.  The first part is about streaming data in science and this part describes the programming models for several open source cloud based data streaming tools including Spark Streaming, Storm and Heron, Googles Dataflow and Apache Flink.


Cloud computing evolved from the massive data centers that were built to handle the “big data” challenges that confronted the designers of on-line services like search and e-mail.    For the most part, data from these services accrued into large collections in the cloud where they could be analyzed by massively parallel, batch computing jobs.   The types of knowledge derived from this analysis is used to improve the services that generated the data in the first place.   For example, data analysis of cloud system log files can yield valuable information about how to improve performance of the cloud system.   Analysis of user search terms can improve the search index.  Analysis of vast collections of text can be used to create new machine learning based services such as natural language translation services.

While batch analysis of big collections is extremely important, it is often the case that the results of the analysis must be available as soon as the data is available.   For example, analyzing data from instruments that control complex systems, such as the sensors onboard an autonomous vehicle or an energy power grid.  In these instances, the data analysis is critical to driving the system.  In some cases, the value of the results diminishes rapidly as it gets older.  For example, trending topics in a twitter stream is not very interesting if it is no longer trending.   In other cases, the volume of data that arrives each second is so large that it cannot be retained and real-time analysis or data reduction is the only way to handle it.   This is true of some extremely large science experiments.

We refer to the activity of analyzing data coming from unbounded streams as data stream analytics.  While many people think this is a brand new topic, there is a longer history that goes back to some basic research on complex event processing in the 1990s at places like Stanford, Caltech and Cambridge.  These projects created some of the intellectual foundation for today’s systems.

In the paragraphs that follow we will describe some of the recent approaches to stream analytics that have been developed by the open source community and the public cloud providers.    As we shall see there are many factors that determine when a particular technology is appropriate for a particular problem.   While it is tempting to think that one open source solutions can cover all the bases, this may not be the case.  In fact there is an entire zoo of interesting solutions including Spark Streaming which has been derived from the Spark parallel data analysis system,  Twitter’s  Storm system which has been redesigned by Twitter as Heron, Apache Flink from the German Stratosphere project, Googles Dataflow which is becoming Apache Beam which will run on top of Flink, Spark and Google’s cloud.  Other university projects include Borealis from Brandeis, Brown and MIT,  Neptune and the Granules project at Colorado State.   In addition to Google Cloud dataflow other commercial cloud providers have contributed to the available toolkit: Amazon Kinesis,  Azure Streaming and IBM Stream Analytics are a few examples.   In some cases, the analysis of instrument data streams needs to move closer to the source and tools are emerging to do “pre-analysis” to decide what data should go back to the cloud for deeper analysis.   For example, the Apache Quark edge-analytics tools are designed to run in very small systems such as the Raspberry Pi.   A  good survey of many of these stream processing technologies is by Kamburugamuve and Fox.   They cover many issues not discussed here.

Basic Design Challenges of Streaming Systems

Before continuing it is useful to address several basic problems that confront the designers of these system.   A major problem is the question of correctness and consistency.   Here is the issue.  Data in an unbounded stream is unbounded in time.   But if you want to present results from the analytics, you can’t wait until the end of time.   So instead you present results at the end of a reasonable window of time.  For example, a daily summary based on a complete checkpoint of events for that day. But what if you want results more frequently?   Every second? The problem is that if the processing is distributed and the window of time is short you may not have a way to know about the global state of the system and some events may be missed or counted twice.  In this case the reports may not be consistent.  Strongly consistent event systems will guarantee that each event is processed once and only once.    A weakly consistent system may give you approximate results that you can “back up” by a daily batch run on the daily checkpoint file.  This gives you some ground-truth to fall back on if you suspect your on-line rapid analysis reporting is less reliable.   Designs based on combining a streaming engine with a separate batch system is called the Lambda Architecture.  The goal of many of the systems described below is to combine the batch computing capability with the streaming semantics so having a separate batch system is not necessary.

The other issue is the design of the semantics of time and windows.   Many event sources provide a time stamp when an event is created and pushed into the stream.  However, the time at which an events is processed will be later.   So we have event time and processing time.  To further complicate things events may be processed out of event-time order.   This raises the question of how we reason about event time in windows defined by processing time.

There are at least four types of windows.   Fixed Time windows divide the income stream into logical segments that correspond to a specified interval of processing time.  The intervals do not overlap. Sliding windows allow for the windows to overlap.  For example, windows of size 10 seconds that start every 5 seconds.   Per-session windows divide the stream by sessions of activity related to some key in the data.  For example, mouse clicks from a particular user may be bundled into a sequence of sessions of clicks nearby in time. Finally, there is the global window that can encapsulate an entire bounded stream.   Associated with windows there must be a mechanism to trigger an analysis of the content of the window and publish the summary.   Each of the systems below support some windowing mechanisms and we will discuss some of them and provide some concluding remarks at the end.  A great discussion of this and many related issues is found in a pair of articles by Tyler Akidau.

Another design involves the way the system distributes the work over processors or containers in the cloud and the way parallelism is achieved.   As we shall see the approaches to parallelism of the systems described here are very similar.   This paper will not discuss performance or scalability issues.  That is another topic we will return to later.

Finally, we note that operations on streams often resemble SQL-like relational operators.   However, there are difficulties with this comparison.  How do you do a join operation on two streams that are unbounded?  The natural solution involves dividing streams by windows in time and doing the join over each window.  Vijayakumar and Plale have looked at this topic extensively.  The CEDR system from MSR illustrated how SQL-like temporal queries can have a well-defined semantics.

Cloud Providers and the Open Source Streaming Tools.

One way to distinguish the streaming engines is look at the approach to the programming model.  In one camp is an approach based on batch processing as derived from Hadoop or Spark, and the other is based on the pipelined execution of a directed acyclic graph.

Spark Streaming

Spark streaming is a good example that illustrates how one can adapt a batch style processing analytics tool to a streaming case.   The core idea is very simple.  You break the stream into a bunch of little batches.

To illustrate this and a few of the other technologies discussed here we will frame the discussion in terms of a hypothetical science application.   Assume we have a large set of environmental sensor distributed over some area.  Each sensor is connected by WiFi to the internet and each sends a sequence of messages to a cloud address for analysis.  The sensors may be weather, sound, co2, light, motion, vibration or image capture.   The size of the messages may only be a few bytes (time stamp + geo-location + temperature) or a few megabytes of sound or images.    The goal of the project may be environmental restoration where you are interested in the health and development of the flora and fauna in some devastated forest.   Or it may be something like the NSF ocean observatories project which has a large number of wired as well as untethered instruments along the U.S. coastal waters.

Spark streaming works by taking the input from a stream data source aggregator such as

  1. A high throughput publish-subscribe system like RabbitMQ or a more highly scalable system like Apache Kafka or
  2. The Microsoft Azure Event Hub (which we described in another post) or
  3. Amazon Kinesis.

Kinesis is a robust data aggregator in that it can take from many sources at high rates of speed and it will retain the stream records for up to seven days.   A Kinesis producer is a source of a stream of data records.  Each Producer uses a partition key, such as “co2 sensor” that is attached to each data record as it is sent to Kinesis.   Internally Kinesis partitions data into “shards” and each shard can handle up to 2 MB/sec or 1000 records per second of input data.   Kinesis uses the partition key to map your data record to a shard.   The total capacity of your stream is the sum of the capacity of the shards that it contains.

A Kinesis client is the program that pulls the data records from the Kinesis shards and processes it.   You can roll your own client or you can use spark streaming or one of the other systems described here to do the processing.   Spark streaming is just a version of Spark that processes data in batches where each batch is defined by a time interval.   The Spark name for a stream is a DStream which is a sequence or Spark RDDs (Resilient Distributed Dataset).  We covered Spark in a previous article here.  Spark Streaming provides a nice adaptor which will automatically read the data from Kinesis shards and repackage them into DStreams so that they can be consumed by the Spark Engine as shown in Figure 1.


Figure 1.   Environmental sensor analysis stream example.

Each RDD in the DStream is represents the data in a window of time from the shard associated with the instrument stream.   This RDD is processed in parallel by the spark engine.   Another level of parallelism is exploited by the fact that we have DStreams associated with each shard and we may have many of them.  There is one processing thread for each shard. This is nicely illustrated in Figure 2 from the Spark Streaming Guide.


Figure 2.   Spark Streaming with Kinesis  (image from Spark streaming kinesis integration guide)

DStreams can be transformed into new DStreams using the Spark Streaming library.  For example there are the map() and filter() functions that allows us to apply an analysis or filter on a DStream to produce a new one.   DStreams can be merged together by the union() operator or, if there is a common key, such as a timestamp, one can apply a join() operator to create a new DStream with events with the same key tied together.  Because each RDD in the DStream is process completely by the Spark engine, the results are strongly consistent.   There is a very good technical paper from the Berkeley team that created spark streaming and it is well worth a read.

To illustrate spark streaming let’s assume that every second our sensors from figure 1 each transfer a byte array that encodes a json string representing its output every second. Suppose we are interested in receiving a report of the average temperature for each 10 second window at each location where we have a temperature sensor.   We can write a Python Spark Streaming program to do this as follows.   First we need to create a streaming context and Kinesis connector to grab the stream of instrument data.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

sc = SparkContext("....", "sensortest")
ssc = StreamingContext(sc, 10)

ks = KinesisUtils.createStream(
     sc, [Kinesis app name], [Kinesis stream name], [endpoint URL],
     [region name], [initial position], [checkpoint interval],[StorageLevel])

Ks should now be a DStream where each RDD element is the set of events collected by Kinesis in the last 10 seconds.  (Note: I have not yet actually tried this, so some details may be wrong.  This example is adapted from a Kafka version from Jon Haddad and the Kenisis integration guide).

 Next we will need to convert byte array for each sensor record to a json Python dictionary.  From there we will filter out all but the temperature sensors, then using a simple map-reduce compute the average temperature for each sensor (which we identify by its location).   To do this we can use the reduceByKey() method which will give us a sum and count for each sensor.  We can then map that into a new DStream taking the form of a dictionary of sensor locations and average temperature for that interval as follows.

temps = ks.filter(lambda x: x["sensortype"] == "tempsensor")   \
   .map(lambda x: (x["location"], (x["value"], 1))      \
   .reduceByKey(lambda (x1,y1),(x2,y2): (x1+x2,y1+y2))  \
   .map(lambda z: {"location": z[0], "average temp": z[1][0]/z[1][1]])

We may now dump our result DStream temps to storage at the end of the processing of this RDD.   Alternatively,  we can join this DStream with a static DStream to compute a running average temperature.

Storm and Heron: Streaming with a DAG dataflow style.

There are several significant systems based on executing a directed graph of tasks in a “dataflow” style. We will give a brief overview of three of these.  One of the earliest was Storm which was created by Nathan Marz and released as open source by Twitter in late 2011.   Storm was written in a dialect of Lisp called Clojure that works on the Java VM.    In 2015 Twitter rewrote Storm and it is has deployed it under the name Heron which is being released as an Apache project.  The Heron architecture was described in an article in the ACM SIGMOD 2015 conference. Heron implements the same programming model and API as Storm, so we will discuss Storm first and then say a few words about the Heron design.

Storm (and Heron) run “topologies” which are directed acyclic graphs whose nodes are Spouts (data sources) and Bolts (data transformation and processing).   Actually Storm has two programming models. One of these we can call classic and the other is called Trident which is built on top of the classic model.  In both cases Storm (and Heron) topologies are directed acyclic graphs as shown in Figure 3.


Figure 3.   Storm/Heron topology. On the left is the abstract topology as defined by the program and on the right is the unrolled parallel topology for runtime.

The programming model is based on extending the basic spout and bolt classes and then using a topology builder to tie it all together.   A basic template for a Bolt is shown below.   There are three required methods.  The prepare() method is a special constructor that is called when the actual instance is deployed on the remote JVM.  It is supplied with context about the configuration and topology as well as a special object called the OuputCollector which is used to connect the Bolts output to the output stream defined by the topology.   The prepare() method is also where you instantiate your own data structures.

The basic data model for Storm/Heron is a stream of Tuples.  A tuple is just that: a tuple of items where each item need only be serializable.  Some of the fields in a tuple have names that are used for communicating a bit of semantics between bolts.   The method declareOutputFields() is used to declare the name of the fields in a stream.   More on this point later.   The heart of the bolt is the method execute(). This is invoked for each new tuple that is sent to the bolt and it contains the computational core of the bolt.   It is also where results from the Bolts process is sent to its output streams.

The main programming API for Storm is Java, so we will touch briefly on that here. There are several base classes and styles of bolts, but this is the basic template.  One of the specialized Bolt classes is for sliding and tumbling windows.  Spouts are very similar classes, but the most interesting ones are the Spouts that connect to event providers like Kafka or EventHub.

public class MyBolt extends BaseRichBolt{
	private OutputCollector collector;
	public void prepare(Map config, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	public void execute(Tuple tuple) {
		*execute is called when a new tuple has been delivered.
		*do your real work here.  for example,
		*create a list of words from the tuple and then emit them
		*to the default output stream.
		for(String word : words){
			this.collector.emit(new Values(word));
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		* the declarer is how we declare out output fields in the default
		* output stream.  you can have more than one output stream 
		* using declarestream. the emit() in execute needs to identify
		* the stream for each output value.
		declarer.declare(new Fields("word"));

The topology builder class is to build the abstract topology and provide instructions for how the parallelism should be deployed.   The key methods of the build are setBolt() and setSpout().  These each take three arguments: the name of the spout or bolt instance, an instance of your spout or bolt class and an integer that tells the topology how many tasks will be assigned to execute this instance.  A task is a single thread that is assigned to a spout or bolt instance.   This is the parallelism number.   The code below shows how to create the topology of Figure 3.

TopologyBuilder builder = new TopologyBuilder(); 
builder.setSpout("Spout", new MySpout(), 2); 
builder.setBolt("BoltA", new MyBoltA(), 4).shuffleGrouping("spout"); 
builder.setBolt("BoltB", new MyBoltB(), 3)
                      .fieldsGrouping("BoltA", new Fields("word"));
builder.setBolt("BoltC", new MyBoltC(), 2).shuffelGrouping("spout") 

Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(“mytopology”, config, builder.createTopology());

As you can see, there are 2 tasks for the spout, 4 for bolt A, 3 for bolt B and 2 for bolt C.   Note that the 2 tasks for the spout are sent to 4 for Bolt B.   How do we partition the 2 output streams over the 4 tasks?  To do this we use a stream grouping function.   In this case we have used Shuffle grouping which randomly distributed them.   In the second case we map the 4 outputs streams from Bolt A to the 3 tasks of bolt B using a field grouping based on a field name.   This makes sure that all tuples with the same field name are mapped to the same task.

As mentioned above the Twitter team has redesigned storm as Heron.   The way a topology is executed is that a set of container instances are deployed to manage it as shown in Figure 4.


Figure 4.   Heron architecture detail.

The topology master coordinates the execution of the topology on a set of other containers that each contain a stream manager and heron instance processes which execute the tasks for the bolts and spouts.  The communication between the bolts and spouts are mediated by the stream manager and all the stream managers are connected together in an overlay network.  (The topology master makes sure they are all in communication.)  Heron provides great performance improvements over Storm.  One improvement of the architecture is better flow control of data from spouts when the bolts are falling behind.  Please look at the full paper for more detail. Some of the best Storm tutorial material comes from Michael Noll’s blog (here is a good example).


As mentioned Storm has another programming model that is implemented on top of the basic spout bolt library.   This is called Trident.     The classic Storm programming model is based on the topology instance.  You construct the flow graph by adding spouts and bolts.   It is building a graph by adding the nodes. Trident is somewhat of a dual concept: it is about the edges.   The central figure in Trident is the stream. The first thing to note is that trident processes all events in a stream in batches and Trident works very hard to make sure that each tuple is processed once and only once.  However, in real life failure happens and retries may be required.  Since tuples originate from spouts defining the retry semantics must be closely tied to the spout.  Trident has several configurations for spout depending on the semantics required.  Some are transactional, meaning every batch has a transaction identifier (txid) and a tuple does not appear in any other batch.  Using the txid we can make sure we never process a tuple more than once.  If the tuple caused the processing of the batch to fail, we can re-issue the entire batch.   Regular Storm spouts are non-transactional.   Another type of spout is “opaque transactional”  the third category which guarantees that each tuple is processed exactly once but, if not, it may appear in another batch.

Let’s begin by declaring a trivial artificial (non-transactional) spout that has a single word in each tuple called “name”.   I want the batch size to be 50 tuples.   The code will look something like this.

TridentTopology topology = new TridentTopology();  
FixedBatchSpout spout = new FixedBatchSpout(new Fields("name"), 50, 
                                 ... the word list here ... )      
Stream str1 = topology.newStream("spout", spout)

Now that we have a stream we can start making transformations to it.   For example, we can expand the tuple so each tuple contains the word and also the number of characters in the word.   We can do this by creating a function object that takes the string from the tuple and emits its length.

public static class Getlength extends BaseFunction {
  public void execute(TridentTuple tuple, TridentCollector collector) {
    collector.emit(new Values(tuple.getString(0).length()));

We apply this function to the stream to create a new stream.

Stream str2 = str1.each(new Fields("name"), new Getlength, new Fields("length"));

Notice that the function only emitted the length.   The each() function has the strange property that it appends new field to the end of the tuple, so now each tuple has labels [“name”, “length”].    Next suppose we only want names from a particular list mynames and we want to drop the others.   We will write a filter function to do that and then create a new filtered stream.

public static class NameFilter extends BaseFilter {
  List nameslist

  public NameFilter(List names) {
    this.namelist = names;
  public boolean isKeep(TridentTuple tuple) {
    return namelist.contains(tuple.getString(0));
Stream str3 = str2.each(new Fields("name","length"), new NameFilter(mynames)); 

Now let’s partition the stream by the name field and compute the counts of each. The result is of type TridentState.

TridentState counts = 
   str3.groupBy(new Fields("name"))
       .persistentAggregate(new MemcachedState.opaque(serverLocations), 
	                     new Count(), new Fields("count"))

The details about how the data is sent to the databases behind the memcash are not important here by the idea is we can now keep track of the aggregate state of the stream.

The final thing we should look at is how parallelism is expressed.   This is actually fairly simple annotations to the stream.   Putting all the steps above into one expression we can show how this is done.

TridentState counts = 
topology.newStream("spout", spout)
        .each(new Fields("name"), new Getlength, new Fields("length"))
        .each(new Fields("name","length"), new NameFilter(mynames))
        .groupBy(new Fields("name"))
        .persistentAggregate(new MemcachedState.opaque(serverLocations), 
	                      new Count(), new Fields("count"));   

This version creates two instances of the spout and five instances of the Getlength() function and uses the random shuffle to distribute the tuple batches to the instances.   There is much more to classic Storm and Trident and there are several good books on the subject.

Google’s Dataflow and Apache Beam

The most recent entry to the zoo of solutions we will discuss is Apache Beam (now in Apache’s incubation phase.) Beam is the open source release of the Google Cloud Dataflow system.  Much of what is said below is a summary of their document. An important motivation for Beam (from now on I will use that name because it is shorter than writing “Google Cloud Dataflow”) is to treat the batch and streaming cases in a completely uniform way.   The important concepts are

  1. Pipelines – which encapsulates the computation in the model.
  2. PCollections – the data as it moves through a Pipeline.
  3. Transforms – the computational transformations that operate on PCollections and produce PCollections
  4. Sources and Sinks.


The idea is that a PCollection can be either a very large but fixed size set of element or a potentially unbounded stream.   The elements in any PCollection are all of the same type, but that type maybe any serializable Java type.   The creator of a PCollection often appends a timestamp to each element at creation time.   This is particularly true of unbounded collections. One very important type of PCollection that is used often is the Key-Value PCollection KV<K, V> where K and V are the Key and Value types.   Another important thing to understand about PCollections is that they are immutable.  You can’t change them but you can use transforms to translate them into new PCollections.

Without going into the details of how you initialize a pipeline, here is how we can create a PCollection of type PCollection<String> of strings from a file.

Pipeline p = Pipeline.create(options);
PCollection pc = 

We have used the pipeline operator apply() which allows us to invoke the special transform TextIO to read the file.   There are other pipeline operators, but we will not discuss many of them.  Now, in a manner similar to the way Trident uses the each() operator to create new Trident streams, we will create a sequence of PCollections using the apply() method of the PCollection class.

There are five basic transform types in the library.   Most takes a built-in or user defined function object as an argument and applies the function object to each element of the PCollection to create a new PCollection.

  1. Pardo –  apply the function argument to each element of the of the input PCollection. This is done in parallel by workers tasks that are allocated to this activity.   This is basic embarrassingly parallel map parallelism
  2. GroupByKey – apply this to a KV<K,V> type of PCollection with group all the elements with the same key into the a single list, so the resulting PCollection is of type KV<K, Iterable<V>>.    In other words, this is the shuffle phase of a map-reduce.
  3. Combine – apply an operation that reduces a PCollection to a PCollection with a single element. If the PCollection is windowed the result is a Pcollection with the combined result for each window.   Another type of combining is for key-grouped PCollections.
  4. Flatten – combine PCollections of the same type into a single PCollection.
  5. Windowing and Triggers – These are not transformations in the usual sense, but defining mechanisms for the window operations.

To illustrate some of these features let’s redo the environmental sensor example again but we will compute the average temperature for each location using a sliding window.    For the sake of illustration, we will use an imaginary pub-sub system to get the events from the instrument steam and let’s suppose the events are delivered to our system in the form of a Java object from the class InstEvnt.  That would be declared as follows.

static class InstEvent{
	@Nullable String instType;
	@Nullable String location;
	@Nullable Double reading;
	public InstEvent( ....)
	public String getInstType(){ ...}
	public String getLocation(){ ...}
	public String getReading(){ ...}

This class definition illustrates how a custom serializable type looks like in Beam. We can now create our stream from our fictitious pub-sub system with this line.

PCollection input = 

We next must filter out all but the “tempsensor” events. While we are at it, let’s convert the stream so that the output is a stream of key-value pairs corresponding to (location, reading). To do that we need a special function to feed to the ParDo operator.

static class FilterAndConvert extends DoFn<InstEvent, KV<String, Double>> {
    public void processElement(ProcessContext c) {
         InstEvent ev = c.element();
	  if (ev.getInstType() == "tempsensor")
	     c.output(KV<String, Double>.of(ev.getLocation(), ev.getReading));

We Now we can apply the Filter and Convert operator to our input stream. Let us also create a sliding window of events of duration five minutes that is created every two minutes. We note that the window is measured in terms of the timestamps on the events and not on the processing time.

PCCollection<KV<String, Float>> reslt = input
.apply(Pardo.of(new FilterAndConvert())
.apply(Window.<KV<String, Double>> into(SlidingWindows.of(

Our stream reslt is now a KV<String,Double> type and we can apply a GroupByKey and Combine operation to reduce this to a  KV<String,Double> where each location key maps to the average temperature.   To make life easy Beam has a number of variations of this simple map-reduce operation and one exists that is perfect for this case:  Mean.perKey() which combines both steps in one transformation.

PCollection<KV<String, Double>> avetemps
	= reslt.apply(Mean.<String, Double>perKey());

Finally we can now take the set of average temperatures for each window and send them to an output file.

PCollection outstrings = avetemps
	.apply(Pardo.of(new KVToString())

The function class KVToString()  is one we define in a manner similar to the FilterAndConvert class above. There are two things to notice in what happened above.   First, we have used an implicit trigger that generates the means and output at the end of the window.   Second, note that because the windows overlap, events will end up in more than one window.

Beam has several other types of triggers.   For example, you can have a data driven trigger looks at the data as it is coming and fires when some condition you have set is met.   The other type is based on a concept introduce by Google Dataflow called the watermark.  The idea of the watermark is based on event time.    It is used to emit results when the system estimates that it has seen all the data in a given window. There are actually several very sophisticated ways to define triggers based on different ways to specify the watermark.  We won’t go into them here and we refer you to the Google Dataflow documents.

Apache Flink

Flink is now one of the “runners” for Beam because it is possible to implement the Beam semantics on top of Flink.   Many of the same core concepts exist in Flink and Beam.  As with the other systems, Flink takes input streams from one or more sources, which are connected by a directed graph to a set of sinks.

Like the others, the system is based on a Java virtual machine and the API is rendered in Java and Scala.  There is also an (incomplete) Python API where there is also a similarity to Spark Streaming.   To illustrate this, we can compare the Flink implementation of our instrument filter for figure 1 to the Spark Streaming example above.

The Flink Kinesis Producer is still a “work in progress”, so this code was tested by reading a stream from a CSV file.  The Flink data types do not include the Python dictionary/Json types so we use here a simple tuple format.   Each line of the input stream looks like

instrument-type string, location string, the word "value", floating value

For example,

tempsensor, pike street and second ave, value, 72.3

After reading from the file (or Kinesis shard) the records in the stream data are now 4-tuples of type (STRING, STRING, STRING, FLOAT). The core of the Flink version of the temperature sensor averager is shown below.

class MeanReducer(ReduceFunction):
    def reduce(self, x, y):
        return (x[0], x[1], x[2], x[3] + y[3], x[4] + y[4])

env = get_environment()
data = env.add_source(FlinkKinesisProducer( … ) … )

resuts = data \
    .filter(lambda x: x[0]=='tempsensor') \
    .map(lambda x: (x[0], x[1], x[2], x[3], 1.0)) \
    .group_by(1) \
    .reduce(MeanReducer()) \
    .map(lambda x: 'location: '+x[1]+' average temp %f' % (x[3]/x[4]))

The filter operation is identical to the Spark Streaming case.   After filtering the data we turn each record into a 5-tuple by appending 1.0 to the end of the 4-tuple.  The group_by(1) and reduce using the MeanReducer function.  The group_by(1) is a signal to shuffle these so that they are keyed by field in position 1 which corresponds to the  location string and then we apply the reduction to each of the grouped tuple sets. This operation is the same as the reduceByKey function in the Spark Streaming example.   The final map converts each element to a string that gives the average temperature for each location.

This example does not illustrate is Flink’s windowing operators, which are very similar to Beam’s, nor does it illustrate the underlying execution architecture.    In a manner similar to the other systems described here, Flink parallelizes the stream and tasks during execution.   For example, our temperature sensor example has a logical view as tasks which may be executed in parallel as shown in Figure 5.


Figure 5.   Flink logical task view and parallel execution view.

The Flink distributed execution engine is based on a standard master worker model.   The Flink source program is compiled into an execution data flow graph and sent to a job manager node by a client system.   The job manager executes the stream and transformations on remote Java VMs which run a task manager.  The task manager partitions its available resources into task slots where the individual tasks defined by the graph execution nodes are assigned.  The job manager and task managers manage the data communication streams between the graph nodes.   This is all very nicely illustrated by a figure from the Apache Flink documentation.   This documentation also describes the Flink windowing and other details of the implementation and programming model.

Summary and Conclusions

We have looked at four different systems, Spark Streaming, Storm/Heron, Google Dataflow/Beam and Flink.  Each of these has been used in critical production deployments and proven successful for their intended applications.  While we have only illustrated each with a trivial example we have seen that they all share some of the same concepts and create pipelines in very similar ways.   One obvious difference is in the way Storm/Heron explicitly constructs graphs from nodes and edges and the others use a very functional style of pipeline composition.    (Storm does have the Trident layer that allows a functional pipeline composition but it is not clear if this will be supported in the Heron version.)

Conceptually the greatest difference arises when comparing Spark Streaming to the others and, in particular, Beam.    Akidau and Perry make a very compelling argument for the superiority of the Beam model in comparison to Spark Streaming.   They make a number of important points.   One obvious one is that Spark is a batch system for which a streaming mode has been attached and Beam was designed from the ground up to be streaming with obvious batch capabilities.  The implication is that the windowing for Spark is based on the RDD in the DStream and this is clearly not as flexible as Beam windows.    A more significant point revolves around Beam’s recognition that event time and processing time are not the same.   Where this becomes critical is in dealing with out of order events, which are clearly possible in widely distributed situations.   Beam’s introduction of event-time windows, triggers and watermarks are a major contribution and clarifies a number of important correctness issues when events are out of order while still allowing you to get approximate results in a timely manner.

In terms of performance of these systems, we will leave it to another time to address this issue.    In fact, it would be a very interesting exercise to create a set of meaningful benchmarks that each system can be measured against.   It would be a non-trivial exercise to design the experiments, but well worth the effort.

Fun with Recurrent Neural Nets: One More Dive into CNTK and TensorFlow

In a previous article I set about comparing Microsoft’s Computational Network Took Kit for deep neural nets to Google’s TensorFlow.  I concluded that piece with a deep dive into how recurrent neural nets (RNNs) were represented in each system.   I specifically went after the type of RNNs known by the strange name of Long Short-Term Memory (LSTM) networks.   I wanted to learn a bit more about how these systems worked.  I decided to treat them like laboratory specimens so that I could poke and prod them to see what I could learn and what I could get them to do.  This article is essentially my lab notebook.  Warning:  With the exception of a bit toward the end, this is not technically very deep.   In fact, I did not discover anything that has not been extensively reported on elsewhere.   But I learned a lot and had some fun.   Perhaps it will be of interest to students just starting to learn about this subject.   Before I get to far into this, I would like to mention that I recently discovered an excellent series of tutorials on RNNs by Denny Britz that are definitely worth reading.

CNTK’s LSTM and Hallucinating Bloomberg Financial News

One of the many good examples in CNTK is language modeling exercise in Examples/Text/PennTreebank.   The documentation for this one is a bit sparse and the example is really just of a demo for how easy it is to use their “Simple Network Builder” to define a LSTM network and train it with stochastic gradient decent on data from the Penn Treebank Project.   One command starts the learning:

cntk configFile=../Config/rnn.cntk

Doing so trains the network, tests it and saves the model.  However, to see the model data in an easily readable form you need a trivial addition to the configfile: you need to add the following dumpnode command to put a dump file a directory of your choosing.

    action = "dumpnode"
    modelPath = "$ModelDir$/rnn.dnn"
    outputFile = "$OutputDir$/modeltext/dump"

This creates a big text file with all the trained data.   To experiment with the trained model, I decided to load it into a python notebook and rebuild the LSTM network from the defining equations.  From the CNTK book those equations are


I was pleased to see that the dumped model text had the same W and b tensors names as in the equations, so my job was relatively easy.    I extracted each of the tensors and saved them into a file (I will make these available in Github).   The python code for the LSTM based on the equations above is below.

def rnn(word, old_h, old_c):
      Xvec = getvec(word, E)

      i = Sigmoid(np.matmul(WXI, Xvec) + 
                  np.matmul(WHI, old_h) + WCI*old_c + bI)
      f = Sigmoid(np.matmul(WXF, Xvec) + 
                  np.matmul(WHF, old_h) + WCF*old_c + bF)
      c = f*old_c + i*(np.tanh(np.matmul(WXC, Xvec) + 
                               np.matmul(WHC, old_h) + bC))
      o = Sigmoid(np.matmul(WXO, Xvec)+ 
                  np.matmul(WHO, old_h)+ (WCO * c)+ bO)
      h = o * np.tanh(c)
      #extract ordered list of five best possible next words
      q = h.copy()
      q.shape = (1, 200)
      output = np.matmul(q, W2)
      outlist = getwordsfromoutput(output)
      return h, c, outlist

As you can see, this is almost a literal translation of the equations.    The only different is that this has as input a text string for the input word.  However the input to the equations is a vector encoding of the word.  The model generates the encoding matrix E which has the nice property that the ith column of matrix corresponds to the word in the ith position in the vocabulary list.  The function getvec(word, E) takes the embedding tensor E, and looks up the position of the word  in the vocabulary list and returns the column vector of E that corresponds to that word.   The output of one pass through the LSTM cell is the vector h.  This is a compact representation of the words likely to follow the input text to this point.  To convert this back into “vocabulary” space we multiply it by another trained vector W2.  The size of our vocabulary is 10000 and the vector output is that length.  The ith element of output represents the relative likelihood that that ith word is next word to follow the input so far.  Getwordsfromoutput simply returns the top 5 candidate words in order of likelihood.

Before going further, it is worth looking closer at the properties of the word embedding matrices E and W2.   There is a fascinating paper by  Mikolov, Yih and Zweig entitled “Linguistic Regularities in Continuous Space Word Representations” where they suggest that the embedding space for word has several interesting properties.   I decided to investigate that.   Their point is that words that are similar in a linguistic sense will be nearby in the embedding space.   For example, present tense verbs should be near other present tense verbs and singular nouns should be near each other, etc.   I decided to try that.  However, there are two embedding mappings.  One is based on the tensor E and the other based on the W2 tensor.   E has dimension 150 by 10000 and W2 is 200 by 10000.  The difference in dimensionality are because of arbitrary decisions made in defining the hidden layers in the network.  But both represent word imbeddings.  I experimented with both.  I wrote a function getnear(word, M) which takes a word and looks for the 5 most nearby words in the space where M is transpose of either E or W2. (I used cosine distance as the metric.) Verb tense locality and noun plurals worked best in the W2 space as illustrated below.


These are only illustrations.  For a deeper statistical analysis look at the Mikolov paper.   A more interesting conjecture from their study was that there may be some linearity in these embedding that might allow one to try simple analogies of the form “A is to B, as C is to __”.   Their idea is that if a, b and c are the vector embeddings of the words A, B and C, then the embedding of “__” may be computed as d = c + (b-a).  So I wrote a little function AistoBasCisto(A, B, C) that does this computation.   In the results I had to delete A, B and C from the candidate answers because they came up often as nearby.   In this case my results were less encouraging.  It worked better with the E space than with W2.   For example, for E we have


And for the W2 space the results looked like


As you can see the “run running walk __” example failed with E but was close, but still incorrect, with W2.

You may wonder why these particular words came up.  The data we used to train the system came from a small subset of the Penn TreeBank collection as provided in the CNTK package.   It is heavily dominated by financial news items.    This explains why the plural of person could be managers or customers.   A larger vocabulary and data collection would be needed to truly test the analogy by linearity conjecture.

Now on to hallucinating the financial news.

Now to test the LSTM as a truly recurrent network.   We provide the network with a starting word and let it suggest the next word.  And then we repeat this process constructing a “sentence”.  In the code below we randomly pick one of the top three suggest by the network as the next word.

c = np.zeros(shape = (200, 1))
h = np.zeros(shape = (200, 1))
output = np.zeros(shape = (10000, 1))
word = 'my'
sentence= word 
for _ in range(40):
    h, c, outlist = rnn(word, h, c)
    word = outlist[randint(0,3)]
    sentence = sentence + " " +word
print sentence+"."

In this case we start with the word “my” and let it generate a 40 word sentence.  The output is

my new rules which would create an interest position here unless there should prove   signs of such things too quickly although the market could be done better toward paying further volatility where it would pay cash around again if everybody can.

This is a great example of hallucinating financial news. Let’s try it again starting with the word “president”.

president michael de brown wrote himself against democratic union law which represents an emergency relief agreement during a new york state district or early tuesday before july after a federal government agency created early losses without mr. krenz or perhaps.

Now with the word “the”.

the company reported third-quarter results reflecting a number compared between N barrels including pretax operating loss from a month following fiscal month ending july earlier compared slightly higher while six-month cds increased sharply tuesday after an after-tax loss reflecting a strong.

The “sentences” end rather abruptly because of the 40 word limit I set.  If you let it go it will run on until the state vector for the sentence seems to break down.     Try this yourself.  To make it easy to play with this example, I have put the code in GitHub.  The trained model text files are in OneDrive and is a zipped file of about 50MB.

There are many more excellent and fun examples.  Andrej Karpathy has a great blog article showing how RNNs can mimic Shakespeare, or Latex science articles and many more.

TensorFlow’s seq2seq French Lesson.

One of the most interesting examples in the TensorFlow tutorials is an English to French translator.  As with the CNTK example it was trivial to start the translator learning following the instructions in the tutorial.   After letting this run for about a week, I wanted to see how well it would do.     As with the CNTK example, I created a Jupyter IPython notebook and loaded the trained model.   I will explain how that was done in more detail below but, for now, I will show how we can invoke it to test its translation ability.    This particular trained model was not very big and with a relatively small data set, so I didn’t expect much.    In fact, as you will see, to a French speaker it is a disaster.   On the other hand, it learned more French in a week of training that I did in three semesters of French in college.   (For full disclosure, this was my weakest subject in college and my grade was a hard-fought “C” each semester.)

The code below demonstrates how the model is invoked.   First you have to tokenize the input sentence.  The algorithm uses a system of buckets of fixed sizes to make the training more efficient.  You next find the smallest bucket that can contain your sentence and convert this to the input vector list needed by the model.   The step function takes a Tensorflow session, the input vector list and a null list of decoder inputs (to be explained later) and generates a list of vectors as outputs.  Each vector represents the likelihood that individual vocabulary words are the correct word at that point in the translated sentence.   We pick the most likely and print the sentence.

sentence = " I am not the president of France. "

token_ids = data_utils.sentence_to_token_ids(sentence, en_vocab)
      # Which bucket does it belong to?
bucket_id = min([b for b in xrange(len(_buckets))
                 if _buckets[b][0] > len(token_ids)])
      # Get a 1-element batch to feed the sentence to the model.
encoder_inputs, decoder_inputs, target_weights = 
    model.get_batch({bucket_id: [(token_ids, [])]}, bucket_id)

_, _, output_logits = model.step(sess, encoder_inputs, decoder_inputs, 
                                 target_weights, bucket_id, True)
outputs = [int(np.argmax(logit, axis=1)) for logit in output_logits]
print(" ".join([rev_fr_vocab[output] for output in outputs]))

Je ne suis pas le président de la France .

This example is not too bad.  However, if I ask

“In which city does the president of France live?”

I get

“Dans quelle ville le président de la France ?”.

This is not exactly correct.    If I feed this into Google translate and ask what this means in English I get “In which city the President of France?”.   If I give it this one,

What is the name of a good restaurant?

The system responds with

Quel est le nom d’une bonne bonne bonne ?”

Which translates back to “What is the name of a good good good?”.  Probably not very helpful on the streets of Paris.   It turns out restaurant is not in the tiny training vocabulary used here.   Finally, given this sentence

” The article stated that the President of the United States is here today. “

The translator returned

Le paragraphe a indiqué que le président des États-Unis est aujourd ‘ hui aujourd ‘ hui .”

The end of this reply is “is today today”.    As I said, this is still much better than I could do with my college French.   However, as you can see from the previous two examples, our little translator runs out of gas at the end of sentences and tends to repeat itself.   You should try this yourself.   I have put the notebook file in github or you can execute these directly from the Tensorflow python code.   All you need to do is train the model from TensorFlow and run the notebook with the path to the model output directory.

While loading and using the trained model was easy and fun, understanding the seq2seq model used in this example takes a bit of work.   So this part of this article will get a bit more technical.

The TensorFlow translate program is based on a sequence-to-sequence model constructed from more primitive recurrent neural nets.   By sequence-to-sequence we mean a network that takes a sequence as input and produces a sequence as output.   It consists of two parts: an “encoder RNN” and a “decoder RNN” as shown in Figure 1 below.


Figure 1.   A sequence-to-sequence RNN English to French translator with the encoder and decoder unrolled to show the flow of messages.

In this figure the RNNs are “unrolled” to show the flow of messages.  The state vector at the end of the encoder is a vector embedding of the input sentence.   This state vector is used to start the decoder along with a “GO” token.  The diagram shows the network after it has been trained.   During training the inputs to the decoder are the French version of the English sentences.   I won’t talk about the training here because is enough to try to understand how this works.  Before I go any further I want to point you to some important papers.  Sutskever, Vinyals and Le published an early important paper on sequence to sequence models that is worth reading.

To understand how it is built the network we need to dig into the code a bit. The building blocks are a set of classes of base type RNNCell with specializations

  1. BasicRNNCell
  2. GRUCell
  3. BasicLSTMCell
  4. LSTMCell
  5. OutputProjectionWrapper
  6. InputProjectionWrapper
  7. EmbeddingWrapper
  8. MultiRNNCell

The ones we will see used here are GRUCell, MultiRNNCell and EmbeddingWrapper.   We discussed LSTMCell in our previous article but we need to look at GRUCell here because that is the one used in the example.   The GRUCell is a “Gated Recurrent Unit” invented by Cho et. al.  in “Learning Phrase Representations using RNN Encoder–Decoder for Statistical Machine Translation”.  The “gated” phrase comes from the way the output is defined as coming mostly from the previous state or from a combination with the new input.   The diagram below tries to explain this a bit better.


Figure 2.   GRU wiring diagram

It also helps to see it in terms of the defining equations.


The quantity ut  is a gate vector.  Recall the sigmoid function switches sharply between one and zero.  So when ut is one then h is just a copy of the old h and we are ignoring the input x it is based on the value ct.   The gate rt is determines how much of the old state goes into defining the value of ct.   To understand how this is encoded in TensorFlow you need to understand the function.

linear(args, output_size, bias, bias_start=0.0, scope=None)

where args is a list of tensors each of size batch x n .   Linear computes sum_i(args[i] * W[i]) + bias where W is a list of matrix variables of size n x outputsize and bias is a variable of size outputsize.   In the equations above we have represented linear algebra as a matrix times a column vector.   Tensorflow uses the transpose notation:   row vector on the left times the transpose of the matrix.   So in linear the args are a list of row vectors.   Where is the matrix W and offset b?  This is fetched from memory based on the variable current scope, because W and b are variable tensors that are learned values.   If you look at the first two equations above, you will see they are almost identical.   In fact, we can write them as


If you transpose the last one from column form into row form you can now compute both with one invocation of the linear function.   The code for the GRUCell is below.   As you can see they have encoded one pass through the GRU cell with only two matrix vector multiplies.   You can also see that the way the variable scope is used to pick out the W’s for the gates and the W for the state/output.  Another point to remember that an invocation of the “__call__ function operator does not cause the tensor to execute the operation, rather it builds the graph.

class GRUCell(RNNCell):
  def __init__(self, num_units):
    self._num_units = num_units
   ... stuff deleted ....
  def __call__(self, inputs, state, scope=None):
    with vs.variable_scope(scope or type(self).__name__):  
      with vs.variable_scope("Gates"):  # Reset gate and update gate.
        # We start with bias of 1.0 to not reset and not udpate.
        r, u = array_ops.split(1, 2, linear([inputs, state],  
                               2 * self._num_units, True, 1.0))
        r, u = sigmoid(r), sigmoid(u)
      with vs.variable_scope("Candidate"):
        c = tanh(linear([inputs, r * state], self._num_units, True))
      new_h = u * state + (1 - u) * c
    return new_h, new_h

The top level class we invoke for building our model is seq2seqModel.    When we create an instance of this class it sets in motion a set of flowgraph building steps.  I am going to skip over a lot of stuff and try to give you the big picture.  The first graph building step in the initialization of an instance of this object is

# Create the internal multi-layer cell for our RNN.
    single_cell = rnn_cell.GRUCell(size)
    if num_layers > 1:
      cell = rnn_cell.MultiRNNCell([single_cell] * num_layers)

As you can see we are creating a GRU cell graph generator instance and making a list of num_layers of this object and passing that to the constructor for MultiRNNCell.   In our case, num_layers has been set to 2.   MultiRNNCell is pretty easy to understand.   It builds a graph consisting of a stack of (in this case) GRU cells where the output state vector of each level is fed to the input of the level above it.  This new compound cell has an output that is the state of the top sub-cell and whose output state is the concatenation of the output states of all the sub-cells.

The next part is not so easy to follow.    We will take our MultiRNNCell graph builder and use it to create and encoder and a special decoder.    But first we must make a short digression.

Paying Attention

There is a problem that is encountered in the sequence-to-sequence model.   The encoder encodes the entire sentence into a state vector which is used by the decoder as its input.    That state vector is an abstract representation of our entire sentence as a single point in a very high dimensional space.    The decoder has been trained to use that point as a starting point to unroll a translated version of the sentence.   I find the fact that it works at all to be rather remarkable.   It is as if the decoder takes the English state vector and transforms it into a similar point in “French” space.

Unfortunately, the longer the input sentence, the more difficult it is to decode it.   How much information can we pack into one point?   The problem is that at each decoding step we need a little bit more information than is provided by the state vector as it passes through the decoder loop.   The idea used here is to help the decoder by providing it a bit of focus derived from the input sequence at each stage of the decoder loop.  This is generally referred to as “attention”, as in “at this step of decoding please pay attention to what the encoder was doing here”.   Bahdanau, Cho and Bengio had an early paper about this that used a bidirectional pass over the input sequence. As they put it, they wanted to “automatically (soft-)search for parts of a source sentence that are relevant to predicting a target word”.  (Denny Britz has a lovely blog article about attention and describes several fascinating applications.  It is  well worth  reading.) The mechanism for attention used in the TensorFlow example is based on a paper by Vinyals et. al. and we will follow that one here.   The key idea is rather than take the single final state vector from the encoder, let’s collect the state vectors at each stage of the encoder.   Following Vinyals, let the encoder state vectors for each input word be


And let the decoder state vectors be


Then for each decoder time step t compute


Where the Ws are learned matrices and v is a learned vector.    Then as the input to the t+1 state vector of the decoder we use the concatenation


The idea is this new state vector at time t+1 puts much more focus on the corresponding words in in the encoder string. This all happens in a function called seq2seq.attention_decoder that is called in another constructor function seq2seq.embedding_attention_seq2seq that wraps and an embedding around a graph generated by our MultiCellRNN graph builder to generate the final decoder graph.   These graphs are all stitched together in the Seq2SeqModel constructor.  It is fair to say that there are many levels of abstraction here that are used to build the decoder and link it to the encoder.  I am leaving out many details that are critical for the training such as the part that implements the bucket handler.   The final graph, in its most abstract form is pictured below in figure 3.


Figure 3.  The sequence to sequence translator is based on a two level GRU cell encoder and an attention-augmented two level GRU cell decoder.  The input English is entered in reverse order as an optimization

Final Thoughts

As I have said above, I have not included all the details of how the seq2seq translator is put together, but I tried to include the highlights that I found most interesting.   I encourage you to dive into the code and discover the rest.   You will likely find some errors in what I described above.   If so, please let me know.

There is really a lot of exciting results that have come out in the last few years relating to RNNs.   For example, Lei Ba, Mnih and Kavukcuoglu demonstrated that RNNs with attention can be applied to interesting image analysis challenges, such as reading the house number from a street scene.   In “Teaching Machines to Read and Comprehend” Hermann et. al. excellent paper demonstrate the use of an attentive RNN build to answer simple questions about text.   I personally don’t think any RNN can pass a Turing test yet, so it ain’t A.I.  But these little statistical machines are certainly wonderful mimics and they can speak better French than I.

Raspberry meets Project Oxford: IoT & Judging a Book by Its Cover.

When I was (very) young I enjoyed building electronic devices from kits and from wiring diagrams I found in exciting journals like “Popular Electronics”.  Radio Shack was my friend and Heath Kits were a joy when I could afford them.   Today I marvel at the devices kids can tinker with.   They can build Arduino powered Lego robots that are lightyears beyond my childhood Lincoln Logs and Erector Sets.   And they have access to another relatively recent, world changing technology:  they have the ability to create software that brings their creations to life.  They can add sensors (accelerometers, GPS, altimeters, thermometers, gyroscopes, vibration, humidity and more) to their projects as well as actuators and motors of various types.  And these creations can live on the Internet.   Devices like the Particle (formerly Spark) Photon and Electron IoT include WiFi or cellular links so instruments can send data to the cloud.  They are programmed with the same C-like language used by the Arduino.   Photon devices are extremely small and very inexpensive so they can be deployed in very large numbers.

Indeed, these devices are the atomic particles of the Internet of Things (IoT).  They are being turned into modestly built home safety sensors as well as sophisticated instruments that control highly sensitive scientific experiments.   They will be the tools city planners will use to understand our complex urban environments.   For example, the Array of Things project at the University of Chicago and Argonne Labs is deploying an amazing small sensor pack on that will sit on light polls throughout the city.   (Check out this video.)  Charlie Catlett and Pete Beckman are leading the AoT project.  In discussions with Charlie he told me that he has built a full array of sensors in his home to monitor various devices.   He also pointed me to the Mathworks Thingspeak product which provides a lovely Matlab-based cloud platform for integrating and analyzing the streams of data from your instruments.

In a previous post I described ways in which event data could be streamed to the cloud and I will return to that topic again in the future.   In this article I wanted to look at what could be done with a small device with a video camera when it was combined with analysis on the Cloud.  I chose the Raspberry Pi2 and the cloud services from the new Microsoft Research project Oxford services.

Phone Apps and Project Oxford

While the instruments described above will eventually dominate the Internet of Things, the billions of smart phones out there today give us a pretty good idea of the challenge of managing data from sensors.   Almost every truly useful “App” on a smart phone interacts services in the cloud in order to function properly.    And each of these apps does part of its computation locally and part takes place remotely.   Cortana and Siri can do the speech to text translation by invoking a speech model on the phone but answering the query involves another set of models running in the cloud.    For any interesting computational task, including the tasks assigned to the Array of Things sensors, some computation must be done locally and some must be remote.   In some cases, the local computation is needed to reduce the amount of time spent communicating with the back end services and in other cases, for example for privacy reasons, not all locally collected information should be transmitted to the cloud.  (What happens to my phone in Vegas, should stay in Vegas.)  Indeed, according to Catlett and Beckman, protecting privacy is a prime directive of the Array of Things project.

Companies like Amazon, Google, IBM, Microsoft and Salesforce have been working hard to roll out data analytics services hosted on their cloud platforms.   More recently these and other companies have been providing machine learning services in the cloud.   I have already talked about AzureML, a tool that allows you to build very powerful analysis tools, but for people who want to build smart apps there are now some very specialized services that do not require them to be ML domain experts.  A good example is IBMs Watson Services that is a result of IBM’s work on bringing their great Jeopardy playing capability to a much broader class of applications.  Watson has a great collection of language and speech analysis and translation services.   In addition, it has an impressive collection of image analysis tools.

Project Oxford is another interesting collection of cloud services that cover various topics in computer vision, speech and language.  In the area of speech, they provide the APIs needed for speech to text and text to speech.  We are all familiar with the great strides taken in speech recognition with Siri, Cortana, Echo and others.   With these APIs one can build apps for IOS, Windows and Android that use speech recognition or speech generation.   The language tools include spell checkers, a nifty language understanding tool that you can train to recognize intent and action from utterances such as commands like “set an alarm for 1:00 pm.”  The computer vision capabilities include scene analysis, face recognition, and optical character recognition (OCR).  These are the services I will explore below.

Raspberry Pi2 and the OpenCV computer vision package.

The Raspberry Pi 2 is a very simple, credit card sized, $35 single board computer with a very versatile collection of interfaces. It has a Broadcom VideoCore GPU and a quad-core ARMv7 processor and 1 GB of memory.  For a few extra dollars you can attach a five-megapixel camera.  (In other words, it is almost as powerful as a cellphone.) The Pi2 device can run Windows 10 IoT Core or a distribution of Linux.  I installed Linux because it was dead easy.   To experiment with the camera, I needed the OpenCV computer vision tools.  Fortunately, Adrian Rosebrock has documented the complete installation process in remarkable detail.  (His book also provides many useful coding example that I used to build my experiments.)

Object Tracking

One of the most obvious sensing challenges one can tackle with a small Interconnected device with a camera is object tracking.   With OpenCV there are some very sophisticated ways to do this, but for some tasks it is trivial.   Outside my window there is a small harbor and I decided to track the movements of the boats.   This was easy because they are all pleasure boats so the vast majority of them are white.  By filtering the image to bring out white objects on a black background the OpenCV functions “findContours” and “minAreaRect” it requires only a few lines of code to draw bounding boxes around suspected boats. (Full source code for the examples here are in Github.) With the Pi device and camera sitting near the window.  I was able to capture the scene below in Figure 1.  Note that it took some scene specific editing.  Specifically, it was necessary to ignore contours that were in the sky.   As can be seen some objects that were not boats were also selected.    The next step is to filter based on the motion of the rectangles.  Those are the rectangles worth tracking. Unfortunately, being winter these pleasure craft haven’t moved for a while, so I don’t have a video to show you.


Figure 1.   Using OpenCV to find boats in a harbor

Face Recognition

By face recognition we mean identifying those elements in a picture that are human faces. This is a far lower bar than face identification (who is that person?), but it is still an interesting problem and potentially very useful.  For example, this could be used to approximate the number of people in a scene if you know what fraction of them are facing the camera.   Such an approximation may be of value if you only wanted to know the density of pedestrian traffic.  You could avoid privacy concerns because you would not need to save the captured images to non-volatile storage. It is possible to use OpenCV alone to recognize faces in a picture and never send the image outside the device.

If you do want to know more information about a group of people project Oxford can do much better.  Given an image of a group it can identify the faces looking in the general direction of the camera and for each face it can give a number of interesting features such as gender, estimated age and even a “smile” index.   This information may be of interest to a business, such as a movie theater owner who wants to know the gender makeup and average age of the of the patrons.  It could even tell by counting smiles if they enjoyed the movie.    I used project Oxford to do an analysis of two photos.   The first is our class of amazing students and staff from the 2014 MSR summer school in Moscow.   The vision service also returned a rectangle enclosing each face.   I use OpenCV to draw the rectangle with a pink border for the males and green for females if they were smiling.   If they were not smiling, they got a black rectangle.  As can be seen, the reporting was very accurate for this staged group photo.


Figure 2.   Project Oxford Face Recognition of MSR Summer School 2014 group photo taken at Yandex headquarters Moscow.  Pink = male, green = female, black = no smile

The system also allowed us to compute estimated average age: approximately 21 for females, 24 for males.  (Some “senior” professors in the photo skewed the average for males.).   Figure 3 below shows the result for a stock street scene.  Very few smiling faces, but that is not a surprise.   It also illustrates that the system is not very good at faces in partial or full profile.


Figure 3.   Many faces missing in this crowd, but one is smiling.

Text Recognition

Optical Character Recognition (OCR) has been around for use in document scanners for a long time but it is much harder to read text when it is embedded in a photo with other objects around it.    For example, programing the Pi using a call to oxford OCR and pointing the camera at the image in Figure 4 produced the output in the superimposed rectangle (superimposition done with PowerPoint … not OpenCV).


Figure 4.   Oxford OCR test from Pi with command line output in black rectangle.

As can be seen, this was an easy case and it made only one error.     However, it is not hard to push the OCR system beyond its limits based on distance, lighting and the angle of the image.

To demonstrate the power of doing some computation on the local device and some of it remotely in the cloud, we can take an scene that is too hard for oxford to understand directly and so some preprocessing with OpenCV.   In Figure 5 we have taken a scene with the same piece of paper with text and placed it far from the camera and at a funny angle.   This yielded no result with Oxford OCR directly.   But in this case we used the same filtering technique used in the boat tracking experiment and located the rectangle containing the paper.  Using another OpenCV transformation we transformed that rectangle into a 500 by 300 image with the rotation removed (as shown in the insert).   We sent that transformed image to Oxford and got a partial result (shown in the black rectangle).


Figure 5.  Transformed image and output showing the coordinates of the bounding rectangle and output from Oxford OCR.   The green outline in the picture was inserted by OpenCV drawing functions using the bounding rectangle.

This experiment is obviously contrived.  How often do you want to read text at a distance printed on a white rectangle?  Let’s look at one final example below.

Judging a Book by Its Cover

An amusing App that may be able to use the Oxford OCR would be to use it to find information about a book based on an image of the cover.   Of course, this is not a new idea.  Amazon has an app called “Flow” from their A9 Innovations subsidiary.  More on that later.    What  I did was to integrate Oxford OCR with a call to Bing.    It works as follows.  The Pi is shown the image of a book,  the OCR app looks for text on the image like the title or author.   Then that returned string is sent to Bing via a web service call.   The top five results are then put into an HTML document along with the image and served up by a tiny webserver on the PI.   The first results are shown below.


Figure 6.   Output of the book cover reader app webserver on the PI.  The first result is correct.

The interesting thing about this is that Bing (and I am sure Google as well) is powerful enough to take partial results from the OCR read and get the right book on the top hit.  The correct text is

The Fabric of Reality.   A leading scientist interweaves evolution, theoretical Physics and computer science to offer a new understanding of reality.”  (font changes are reproduced as shown on the book cover)

What the OCR systems was able to see was “THE FABRIC I scientist evolution, physics. Computer science a new understanding.”   The author’s name is occluded by a keyboard cable.

Unfortunately, the reliability at a distance was not great.  I next decided to try to enhance the image using the technique used in the “text on white rectangle” as illustrated in Figure 5.    Of course this needed a white book, so it is not really practical.  However, the results did improve the accuracy (for white books).   An example that the OCR-only version failed to recognize but that worked using the OpenCV enhanced version is shown in Figure 7.


Figure 7.   Using OpenCV with Oxford and Bing.

As can be seen the OCR saw “the black sivan nassim aicholas tal”, which is not very accurate. Even with the lower right corner of the book out of the image I would have expected it to do a bit better.   But Bing was easily able to figure it out.

I downloaded the Flow app and installed it on a little Android tablet.   It works very well up close but it could not handle the distance shots illustrated above.   Of course this is not a proper scientific comparison.    Furthermore, I have only illustrated a few trivial features of OpenCV and I have not touch the state-of-the-art object recognition work.  Current object recognition systems based on deep learning are amazingly accurate.  For example, MSR’s Project Adam was used to identify thousands of different types of object in images (including hundreds of breeds of dogs).   I expect that Amazon has the images of tens of thousands of book covers.   A great way to do the app above would be to train a deep network to recognize those objects in an image.  I suspect that Flow may have done something like this.

Final Thoughts

The examples above are all trivial in scope, but they are intended to illustrate what one can do with a very simple little device, a few weeks of programming and access to tools like Project Oxford.   We started with a look at IoT technologies and moved on to simple “apps” that use computer vision.   This leads us to think about the topic of augmented reality where our devices are able to tell us about everything and everyone around us.   The “everyone” part of this is where we have the most difficulty.   I once worked on a project called the “intelligent memory assistant”.   Inspired by a comment from Dave Patterson, the app would use a camera and the cloud to fill in the gaps in your memory so that when you met a person but could not remember their name or where they were from.  It would whisper into your ear and tell you “This is X and you knew him in 1990 …. “.  It is now possible to build this, but the privacy issues raise too many problems.   It is sometime referred to as the “creepy factor”. For example, people don’t always want to be “tagged” in a photo on Facebook.   On the other hand uses like identifying lost children or amnesiacs are not bad. And non-personal components of augmented reality are coming fast and will be here to stay.

PS Code for these examples is now in this github repo

An Encounter with Google’s TensorFlow

NOTE: This is a revised version of this blog that reflects much better ways to do some of the tensor algebra in the first example below.

Google has recently released some very interesting new tools to the open source community.   First came Kubernetes, their container microservice framework, and that was followed by two new programming systems based on dataflow concepts.   Dataflow is a very old idea that first appeared in the computer architecture community in the 1970 and 80s.   Dataflow was created as an alternative to the classical von-Neumann computer design.  It was hoped that it would have a performance advantage because it would exploit much greater levels of parallelism than thought possible with classical computers[1].  In dataflow systems computation can be visualized as a directed graph where the vertices of the graph are operators and data “flows” through the system along the edges of the graph.  As soon as data is available on all the input edges of an operator node the operation is carried out and new data is put on the output edges of the node.  While only a few actual data flow computers were built, the concept has been fundamental to distributed and parallel computing for a long time.  It shows up in applications like complex event processing, stream analytics and systems like the Microsoft AzureML programming model I described earlier.

Google’s newly released Cloud Dataflow is a programming system for scalable stream and batch computing.  I will return to Cloud Dataflow in another post, but for now, I will focus on the other dataflow system they released.  Called TensorFlow, it is a programming system designed to help researchers build deep neural networks[2] and it appears to be unrelated to Cloud Dataflow.  All the documentation and downloadable code for TensorFlow are on-line.  TensorFlow is also similar to Microsoft Research’s Computational Network Toolkit (CNTK) in several ways that I will describe later.

TensorFlow is designed allow the programmer to easily “script” a dataflow computation where the basic units of computing are very large multi-dimensional arrays.   The scripts are written in Python or C++ and it works very well with IPython/jupyter notebooks.   In the following pages I will give a very light introduction to TensorFlow programming and illustrate it by building a bare-bones k-means clustering algorithm.   I will also briefly describe one of their examples of a convolutional neural network.

TensorFlow can be installed and run on your laptop, but as we shall show below, it really shines on bigger more powerful hardware.  An interesting thing happened in the recent evolution of deep neural networks.   The most impressive early work on really large deep neural network was done on large cloud-scale clusters.  However, the type of parallelism needed for doing deep network computation was really very large array math, which is really better suited to execution on a GPU.  Or a bunch of GPUs on a massive memory machine or a cluster of massive memory machines with a bunch of GPUs each.  For example, the Microsoft CNTK has achieved some remarkable results on 8 GPU systems and it will soon be available on the Azure GPU Lab. (I also suspect that supercomputers such as the SDSC Comet with large memory, multi-GPU, multi-core nodes would be ideal.)

TensorFlow: a shallow introduction.

There is a really nice white paper by the folks at Google Research with far more details about the TensorFlow system architecture than I give here.  What follows is a shallow introduction and an experiment.

There are two main concepts in TensorFlow.   The first is the idea of computing on objects that are very large multi-dimensional arrays called tensors.   The second is the fact that the computation you build with tensors are compiled into graphs that are executed in a “dataflow” style.   We need to unpack both of these concepts.


Let’s start with tensors.  First, these are not your great-great grandfather’s tensors.   Those tensors were introduced by Carl Friedrich Gauss for differential geometry where they were needed to provide metrics that could be used to describe things like the curvature of surfaces.  Einstein “popularized” tensors in his general theory of relativity.  Those tensors have a very formal algebra of covariant and contravariant forms. Fortunately, we don’t have to go there to understand the use of tensors in TensorFlow’s machine learning applications.   In fact, if you understand Numpy arrays you are off to a good start and Numpy arrays are just really efficient multidimensional arrays.   TensorFlow can be programmed in Python or C++ and I will use the Python binding in the discussion below

In TensorFlow tensors are created and stored in container objects that are one of three types: variables, placeholders and constants.   Let’s start with constants.  As the name implies constant tensors are initialized once and never changed.   Here are two different ways to create constant arrays that we will use in an example below.  One is a tensor of size Nx2 filled with zeros and the other is a tensor of the same shape filled with the value 1.

import numpy as np
import tensorflow as tf
N = 10000
X = np.zeros(shape=(N,2))
Xv = tf.constant(X, name="points")
dones = tf.fill([N,2], np.float64(1.))

We have used a Numpy array of values to create the TensorFlow constant. Notice that we have given our tensor a “name”. Naming tensors is optional but it comes in very handy when debugging.
Variables are holders of multidimensional arrays that persist across sessions and may be modified and even saved to disk. The concept of a “session” is important in TensorFlow. You can think of it as a context where actual TensorFlow calculations take place. The first calculation involving a variable is to load it with initial values. For example, let’s create a 2 by 3 tensor that, when initialized contains the constant 1.0 in every element. Then let’s convert that back to a Numpy array and print it. To do that we need a session and we will call the variable initializer in that session. When working in the IPython notebook it is easiest to use an “InteractiveSession” so that we can easily edit and redo computations.

sess = tf.InteractiveSession()
myarray = tf.Variable(tf.constant(1.0, shape = [2,3]))
init =tf.initialize_all_variables()
mynumpy =myarray.eval()
[[ 1.  1.  1.]
 [ 1.  1.  1.]]

As shown above, the standard way to initialize variables is to initialize them all at once. The process of converting a tensor back to a Numpy array requires evaluating the tensor with the “eval” method. As we shall see this is an important operator that we will describe in more detail below.

The final tensor container is a “placeholder”.   Creating a placeholder object only requires specifying a type and some information about its shape.   We don’t initialize a placeholder like a variable because it’s initial values will be provided later in an eval()-like operation.  Here is a placeholder we will use later.

x = tf.placeholder(tf.float32, [None, 784], name="x")

Notice that in this case the placeholder x is two dimensional but the first dimension is left unbound.  This allows us to supply it with a value that is any number of rows of vectors of length 784.   As we shall see, this turns out to be very handy for training neural nets.


The real magic of TensorFlow is in the dataflow execution of computations.   A critical idea behind TensorFlow is to keep the slow world of Python away from the fast world of parallel tensor algebra as much as possible.  Python is used only to describe the dataflow graph of the computation.   TensorFlow has a large library of very useful tensor operators.   Let’s describe a computation we will use in the k-means example.   Suppose we have 8 points in the plane in a vector called Xv and 4 special points in an array called kPoints.   I would like to label each of the 8 points with the index of the special point nearest to it.   This should give me 4 clusters of points.   I now want to find the centroid of each of these clusters.  Assume we have a tensor called “blocked” where row i gives the distance from each of our 8 points to the ith special point.  For example, if “blocked” is as shown below then what we are looking for is the index of the smallest element in each column.tensor-table

To find the centroids I use this min index vector to select the elements of Xv in each cluster. We just add them up to and divide by the number of points in that cluster.    The TensorFlow code to do this is shown below.  TensorFlow has a large and handy library of tensor operators many of which are similar to Numpy counterparts.   For example, argmin computes the index of the smallest element in an array given the dimension over which to search.   Unsorted_segment_sum will compute a sum using another vector to define the segments.   The min index vector nicely partitions the Xv vector into the appropriate “segments” for the unsorted_segment_sum to work.  The same unsorted_segment_sum operator can be used to count the number of elements in each cluster by adding up 1s.

mins = tf.argmin(blocked, 0, name="mins")
sums = tf.unsorted_segment_sum(Xv, mins, 4)
totals = tf.unsorted_segment_sum(dones, mins, 4, name="sums")
centroids = tf.div(sums, totals, name = "newcents")

The key idea is this.  When this python code is executed it builds a data flow graph with three inputs (blocked, Xv, dones) and outputs a tensor called centroids as shown below[3].[4]

The computation does not start until we attempt to evaluate the result with a call to centroids.eval(). This sort of lazy evaluation means that we can string together as many tensor operators as needed that will all be executed outside the Python REPL.


Figure 1.  Computational flow graph

TensorFlow has a very sophisticated implementation.  The computing environment upon which it is running is a collection of computational devices.  These devices may be cpu cores, GPUs or other processing engines.   The TensorFlow compilation and runtime system maps subgraphs of the flow graph to these devises and manages the communication of tensor values between the devices.  The communication may be through shared memory buffers or send-receive message pairs inserted in the graph at strategically located points.   Just as the dataflow computer architects learned, dataflow by itself is not always efficient.  Sometimes control flow is needed to simplify execution.  The TensorFlow system also inserts needed control flow edges as part of its optimizations.   As was noted above, this dataflow style of computation is also at the heart of CNTK.   Another very important property of these dataflow graphs is that it is relatively easy to automatically compute derivatives by using the chain-rule working backwards through the graph.   This makes it possible to automate the construction of gradients that are important for finding optimal parameters for learning algorithms.   I won’t get into this here but it is very important.

A TensorFlow K-means clustering algorithm

With the kernel above we now have the basis for a simple k-means clustering algorithm shown in the code below.  Our initial centroid array is a 4 by 2 Numpy array we shall call kPoints.   What is missing is the construction of the distance array blocked.   Xv holds all the N points as a constant tensor.   My original version of this program used python code to construct blocked.  But there is an excellent improvement on computation of “mins” published in GitHub by Shawn Simister at.  Shawn’s version is better documented and about 20% faster when N is in the millions.  Simister’s computation does not need my blocked array and instead expands the Xv and centroids vector and used a reduction to get the distances.  Very nice (This is the revision to the blog post that I referred to above.)

N = 10000000
k = 4
#X is a numpy array initialized with N (x,y) points in the plane
Xv = tf.constant(X, name="points")
kPoints = [[2., 1.], [0., 2.], [-1., 0], [0, -1.]]
dones = tf.fill([N,2], np.float64(1.))

centroids = tf.Variable(kPoints, name="centroids")
oldcents = tf.Variable(kPoints)
initvals = tf.constant(kPoints, tf.float64)

for i in range(20):
    oldcents = centroids
    #This is the Simister mins computation
    expanded_vectors = tf.expand_dims(Xv, 0)
    expanded_centroids = tf.expand_dims(centroids, 1)
    distances = tf.reduce_sum( tf.square(
               tf.sub(expanded_vectors, expanded_centroids)), 2)
    mins = tf.argmin(distances, 0)
    #compute the new centroids as the mean of the points nearest
    sums = tf.unsorted_segment_sum(Xv, mins, k)
    totals = tf.unsorted_segment_sum(dones, mins, k, name="sums")
    centroids = tf.div(sums, totals, name = "newcents")
    #compute the distance the centroids have moved sense the last iteration
    dist = centroids-oldcents
    sqdist = tf.reduce_mean(dist*dist, name="accuracy")
    print np.sqrt(sqdist.eval())
    kPoints = centroids.eval()

However, this version is still very inefficient.  Notice that we construct a new execution graph for each iteration.   A better solution is to pull the graph construction out of the loop and construct it once and reuse it.   This is nicely illustrated in Simister’s version of K-means illustrated on his Github site.

It is worth asking how fast Tensor flow is compared with a standard Numpy version of the same algorithm. Unfortunately, I do not have a big machine with fancy GPUs, but I do have a virtual machine in the cloud with 16 processors. I wrote a version using Numpy and executed them both from an IPython notebook. The speed-up results are in Figure 2. Comparing these we see that simple Python Numpy is faster than TensorFlow for values of N less than about 20,000. But for very large N we see that TensorFlow can make excellent use of the extra cores available and exploits the parallelism in the tensor operators very well.


Figure 2. speed-up of TensorFlow on 16 cores over a simple Numpy implementation. The axis is log10(N)

I have placed the source for this notebook in Github. Also an improved version based on Simister’s formulation can be found there.

TensorFlow also has a very interesting web tool call TensorBoard that lets you look at various aspects of the logs of your execution. One of the more interesting TensorBoard displays is the dataflow graph for your computation. Figure 3 below shows the display generated for the k-means computational graph.


Figure 3.   TensorBoard graph displace of the k-means algorithm.

As previously mentioned this is far more complex than my diagram in Figure 1.   Without magnification this is almost impossible to read.  Figure 4 contains a close-up of the top of the graph where the variable newcents is created.   Because this is an iteration the dataflow actually contains multiple instances and by clicking on the bubble for the variable it will show you details as shown in the close-up.


Figure 4.   A close-up of part of the graph showing expanded view of the multiple instances of the computation of the variable newcents.

A brief look at a neural network example

The k-means example above is really just an exercise in tensor gymnastics. TensorFlow is really about building and training neural networks.   The TensorFlow documents have a number of great example, but to give you the basic idea I’ll give you a brief look at their convolutional deep net for the MNIST handwritten digit example.   I’ll try to keep this short because I will not be adding much new here. The example is the well-known “hello world” world of machine learning image recognition.   The setup is as follows.   You have thousands of 28 by 28 black and white images of hand written digits and the job of the system you build is to identify each.   The TensorFlow example uses a two-layer convolutional neural net followed by a large, densely connected layer and a readout layer.

If you are not familiar with convolutional neural nets, here is the basic idea.   Images are strings of bits but they also have a lot of local 2d structure such as edges or holes or other patterns.  What we are going to do is look at 5×5 windows to try to “find” these patterns.  To do this we will train the system to build a 5×5 template W array (and a scalar offset b) that will reduce each 5×5 window to a point in a new array conv by the formula


(the image is padded near the boundary points in the formula above so none of the indices are out of bounds)

We next modify the conv array by applying the “ReLU” function max(0,x) to each x in the conv array so it has no negative values.   The final step is to do “max pooling”.  This step simply computes the maximum value in a 2×2 block and assigns it to a smaller 14×14 array.   The most interesting part of the convolutional network is the fact that we do not use one 5×5 template but 32 of them in parallel producing 32 14×14 result “images” as illustrated in Figure 5 below.convolutional

Figure 5.   The first layer convolutional network.

When the network is fully trained each of the 32 5×5 templates in W is somehow different and each selects for a different set of features in the original image. One can think of the resulting stack of 32 14×14 arrays (called h_pool1) as a type of transform of the original image much as a Fourier transform can separate a signal in space and time and transform it into frequency space. This is not what is going on here but I find the analogy helpful.
We next apply a second convolutional layer to the h_pool1 tensor but this time we apply 64 sets of 5×5 filters to each the 32 h_pool1 layers (and adding up the results) to give us 64 new 14×14 arrays which we reduce with max pooling to 64 7×7 arrays called h_pool2.

Rather than provide the whole code, which is in the TensorFlow tutorial,  I will skip the training step and show you how you can load the variables from a previous training session and use them to make predictions from the test set. .   (The code below is modified versions of the Google code found at GitHub and subject to their Apache copyright License.)

Let’s start by creating some placeholders and variables.  We start with a few functions to initialize weights.   Next we create a placeholder for our image variable x which is assumed to be a list of 28×28=784 floating point vectors.  As described above, we don’t know how long this list is in advance. We also define all the weights and biases described above.

def weight_variable(shape, names):
  initial = tf.truncated_normal(shape, stddev=0.1)
  return tf.Variable(initial, name=names)

def bias_variable(shape, names):
  initial = tf.constant(0.1, shape=shape)
  return tf.Variable(initial, name=names)

x = tf.placeholder(tf.float32, [None, 784], name="x")

sess = tf.InteractiveSession()

W_conv1 = weight_variable([5, 5, 1, 32], "wconv")
b_conv1 = bias_variable([32], "bconv")
W_conv2 = weight_variable([5, 5, 32, 64], "wconv2")
b_conv2 = bias_variable([64], "bconv2")
W_fc1 = weight_variable([7 * 7 * 64, 1024], "wfc1")
b_fc1 = bias_variable([1024], "bfcl")
W_fc2 = weight_variable([1024, 10], "wfc2")
b_fc2 = bias_variable([10], "bfc2")

Next we will do the initialization by loading all the weight and bias variable that were saved in the training step. We have saved these values using TensorFlow’s save state method previously in a temp file.

saver = tf.train.Saver()
init =tf.initialize_all_variables()
saver.restore(sess, "/tmp/model.ckpt")

We can now construct our deep neural network with a few lines of code. We start with two functions to give us a bit of shorthand to define the 2D convolution and max_pooling operators. We first reshape the image vectors into the image array shape that is needed. The construction of the flow graph is now straight forward. The final result is the tensor we have called y_conv.

def conv2d(x, W):
  return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

def max_pool_2x2(x):
  return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                        strides=[1, 2, 2, 1], padding='SAME')

#first convolutional layer
x_image = tf.reshape(x, [-1,28,28,1])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)
#second convolutional layer
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
h_pool2 = max_pool_2x2(h_conv2)
#final layer
h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
y_conv=tf.nn.softmax(tf.matmul(h_fc1, W_fc2) + b_fc2)

Notice that we have not evaluated the flow graph even though we have the values for all of the weight and bias variables already loaded. We need a value for our placeholder x. Suppose tim is an image from the test set. To see if our trained network can recognize it all we need to do is supply it to the network and evaluate. To supply the value to the network we use the eval() function with a special “feed_dict” dictionary argument. This just lists the name of the placeholder and the value you wish to give it.

tim.shape = ((1,28*28))
y = y_conv.eval(feed_dict={x: tim})
label = np.argmax(y)

You can read the TensorFlow tutorial to learn about the network training step.   They use a special function that does the stochastic backpropagation that makes it all look very simple.

How does this thing work?

Developing an intuition for how the convolutional neural network actually works is a real challenge.  The fact that it works at all is amazing.  The convolutional steps provide averaging to help create a bit of location and scale invariance, but there is more going on here.   Note that given a 28*28 image the output of the second convolutional step is 64 7×7 array that represent feature activations generated by the weight templates.   It is tempting to look at these as images to see if one can detect anything.  Obviously the last fully connected layer can do a great job with these.   But it is easy to see what they look like.   If we apply h_pool2.eval(feed_dict={x: image}) we can look at the result as a set of 64 images.  Figure 6 does just that.  I picked two random 9 images, two 0s, two 7s and three 6s.   Each column in the figure represents depicts the first 9 of the 64 images generated by each.    If you stare at this long enough you can see some patterns there.  (Such as the diagonals in the 7s and the cup shape of the 4s.) But very little else.   On the other hand, taken together each (complete) column is a sufficient enough signature for the last layer of the network to identify the figure with over 99% accuracy.   That is impressive.


Figure 6.  Images of h_pool2 given various images of handwritten digits.

I have put the code to generate these images on Github along with the code to train the network and save the learned weights.  In both cases these are ipython notebooks.

A much better way to represent the data learned in the convolutional networks is to “de-convolve” the images back to the pixel layers.  In Visualizing and Understanding Convolutional Networks by Matthew Zeiler and Rob Fergus do this and the results are fascinating.

Final Notes

I intended this note to be a gentle introduction to the programming style of TensorFlow and not an introduction to deep neural networks, but I confess I got carried away towards the end.   I will follow up this post with another that look at recurrent networks and text processing provided I can think of something original to say that is not already in their tutorial.

I should also note that the construction of the deep network is, in some ways, similar to Theano, the other great Python package for building deep networks.  I have not looked at how Theano scales or how well it can be used for tasks beyond deep neural networks, so don’t take this blog as an endorsement of TensorFlow over Theano or Torch.

On a final note I must say that there is another interesting approach to image recognition problem.  Antonio Criminisi has led a team that has developed Deep Neural Decision Forests that combine ideas from decision forests and CNNs.

[1] This was before the explosion of massively parallel systems based on microprocessors changed the high-end computing world.

[2] If you are new to neural networks and deep learning, you may want to look at Michael Nielsen’s excellent new on-line book. There are many books that introduce deep neural networks, but Nielsen’s mathematical treatment is very clear and approachable.

[3] This is not a picture of the actual graph that is generated.   We will use another tool to visualize that later in this note.

[4] The observant reader will note that we have a possible divide by zero here if one of the special points is a serious outlier.   As I said, this is a bare-bones implementation.

Cloud SaaS: Machine Learning with Azure ML

The large public cloud vendors including Amazon, Microsoft and Google have invested heavily to provide Infrastructure as a Service (IaaS) to their customers.   The result of this intense competition has been a race to the bottom for pricing of the basic compute and storage services.   Great news for the customers.  We are also beginning to see a similar convergence around Platform as a Service as more and more basic tooling for building cloud apps becomes standard.   Clearly each vendor has some unique capabilities, but the real differentiation between vendors is taking place at the level of cloud software-as-a-service (SaaS) offerings.     The most interesting areas for science relevant services are machine learning, stream analytics and big data analysis tools.    Each of the three big cloud vendors have offering in this space and so do others like IBM, Salesforce and Cloudera.   The next few posts will be about some experience using these tools.   I will start with AzureML because I have access to it.

I decided to  redo some of my streaming scientific text analysis projects (described earlier: part 1 and part 2) using Microsoft’s new AzureML.   I’ll return to the full streaming topic using Azure stream analytics in the next post.

If you are not familiar with AzureML, the machine learning toolkit for Microsoft Azure, you should give it a try.   In fact, you can try it for free.  Go to   AzureML is based on a “drop-and-drag” component composition model where you can build a solution to a machine learning problem by dragging parts of the solution from a pallet of tools.   This post is not intended as a tutorial for AzureML. There are tons of good tutorials on line.    I would start with the ones on the home page.     This post is a description of what I was able to do with AzureML for the basic task of classifying scientific documents.  More specifically, we have a collection of RSS feed documents that describe new scientific results and research papers from various sources, but the best stuff for our purposes comes from the Cornell University Library ArXiv RSS feed.   Each item in the collection is a tuple consisting of the article title, the abstract and a classification into one of the basic science disciplines including Physics, Mathematics, Computer Science, Biology and Finance.   Here is a sample.

['A Fast Direct Sampling Algorithm for Equilateral Closed Polygons. (arXiv:1510.02466v1 [cond-mat.stat-mech])',
'Sampling equilateral closed polygons is of interest in the statistical study of ring polymers. Over the past 30 years, previous authors have proposed a variety of simple Markov chain algorithms (but have not been able to show that they converge to the correct probability distribution) and complicated direct samplers (which require extended-precision arithmetic to evaluate numerically unstable polynomials). We present a simple direct sampler which is fast and numerically stable. ',

The challenge is to use only the abstract to predict the classification.  (As you can see from this example a reasonable guess might be Physics, Math or Computer Science, so it is not that easy.)

A typical AzureML solution looks like the one below that has been configured to train a neural network to classify the items in our list of science abstracts.  It is easy to mistake this diagram for a data flow graph but it is really a workflow dependency diagram represented as a directed acyclic graph.   Each arrow represents a dependency of the output of one task as part of the input of the next.   Each box represents one of the analysis subtasks.   When you run the training experiment the subtasks that complete have a green checkmark.   It is possible to inspect the result of any subtask by clicking on the tail of the result arrow.  Doing this presents you with several possible choices that include saving the result, visualizing it in tabular form or, in some some cases, viewing it in a IPython (Jupyter) notebook.


Figure  1.  AzureML Studio workflow diagram for the Multiclass Neural network and the Python module for creating the studio version of the arxivdata data set.

To understand this workflow, it is best to start at the top which is where the data source comes into the picture.     In the case here we are going to take the data from a Azure blob storage public archive   The dataset is sciml_data_arxiv.p which is a Python pickle file.   A recent addition to AzureML that I was very pleased to see was the introduction of a way to build a new component from R or Python.  Hence it was easy to write a small Python preprocessing file that could read the data and clean it up a bit and present it to the rest of the AzureML studio.   The data interface between Python and AzureML is based on Pandas data frames.  The output of the Python module can be accessed on the output labeled 1.   We could have put this directly into the next stage in our workflow, but we can also save it to AzureML studio.   We have done that in this case and we used a copy of that dataset as the box “arxivdata”.    The data set has three columns and each row represents one document and it is a triple (classification, the abstract of the documents, the title of the document).   As we move through the workflow we will add columns and, for various tasks, we restrict attention to only a few columns

The second box down is “Feature Hashing”.    This box builds a vectorizer based on the vocabulary in the document collection.   This version comes from the Vowpal Wabbit library and its role is to convert each document into a numerical vector corresponding to the key words and phrases in the document collection.    This numeric representation is essential for the actual machine learning phase.   To create the vector, we tell the feature hasher to only look at the document text.   What happens on the output is that the vector of numeric values for the abstract text is now appended to the tuple for each document.   Our table now has a very large number of columns: class, the document text, the title,  vector[0], … vector[n] where n is the number of “features”

In the next box “Split Data” we split the resulting table into a training set and a test set.   In this case we have configured the Split Data box to put 75% into the training set and the remainder in the test set.

For the machine learning we need to select a machine learning algorithm,  and some columns of the data to use for the training.  To select the columns for the training we use a “project columns” task and we select the “class” and the feature vector components.  (we don’t need the document text or title.)  AzureML has a reasonably large number of the standard machine learning modules.   We found three that were good but by a small margin “Multiclass Neural Network” was the best performer.   Each machine learning module has various parameters that can be selected to tune the method.   For all the experiments described here, we just used the default parameter settings[i].   The “Train Model” component accepts as one input a binding to one of the ML methods (recall this is not a dataflow graph) and the other input is the projected training data.    The output of the Train Model task is not data per se but a trained model that may also be saved for later use.    This trained model can now be used to classify our test data and that is what we do with the “Score Model” component.   The score model appends another new column to our table called Scored Label which is the classification predicted by the trained model for that each row.

Finally can see how we did by using the “Evaluate Model” component which computes a confusion matrix.   Each row of the matrix tells up how the documents in that class were classified.   In this experiment the confusion matrix is shown in Figure 2 below.


Figure 2.  Confusion matrix for the ArXiv data set which includes some duplicate of bio and math documents.

There are several points worth noting here.   First bio and finance documents were recognized with high accuracy.   This is somewhat artificial because documents in those category were each repeated twice in the original data.   Hence after the splitting (by ¾ test, ¼ training)  a large fraction of the test set for these documents (about 75%) will be in both the training set and the test set hence they are easily recognized.  We have a more recent collection of ArXiv documents which do not include any of the training set items.  Figure 3 below shows the confusion matrix for this case.  It is clear than the classifier had a hard time distinguishing Physics, Math and Computer Science.    We have no doubt that we could achieve better results if fine-tuned the neural net parameters.


Figure 3.   Confusion matrix for the multiclass neural network classifier using the more recent ArXiv data

We will show a better classifier in the second half of this article.

[i] By not tuning the parameters of each ML algorithm we are doing them an injustice.  But it takes expert knowledge of what the algorithm does and lots of time to experiment to find the right settings.   I was surprised at how well the default parameters worked.

Creating a web service from our trained classifier

One of the most impressive features of AzureML is how easily it can convert a trained model like the one above into a function web service.    In fact, it is very cool.   One click on the pallet button for creating a web service transform the diagram in Figure 1 to the diagram in Figure 4.


Figure 4.   A web service that was automatically created from the experiment in Figure 1.

We can test this webservice from the studio or we can go ahead and deploy it to the web.   Once it has been deployed AzureML will even generate the C# or Python or R code you can use to deploy it.  In this case the code for Python is

import urllib2
import json
data =  {
        "Inputs": {
                    "ColumnNames": ["class","document","title"],
                    "Values": [["value","value","value"], ["value","value","value"], ]
                },        },
            "GlobalParameters": {
body = str.encode(json.dumps(data))
url = '
api_key = 'abc123' # Replace this with the API key for the web service
headers = {'Content-Type':'application/json',
           'Authorization':('Bearer '+ api_key)}
req = urllib2.Request(url, body, headers)
    response = urllib2.urlopen(req)
    result =
except urllib2.HTTPError, error:
    print("The request failed with status code: " + str(error.code))

The input is defined by the data template where you can supply one or more of the arxiv tuples.   A copy of an IPyhon notebook that invokes the webservice and computes the confusion matrix is linked here.

Creating a more interesting classifier.

The example so far does not fully illustrate the power of the AzureML studio.    I decided to try to build a classifier that uses three different machine learning algorithms all trained on the same data.   Then I would use a majority vote to select the wining choice.   I have argued in previous posts that picking a single choice for this science data is not a good idea because science is very multidisciplinary.   The example sited above illustrates this point.   So I trained two additional ML algorithms: a boosted decision tree and a two class support vector machine (converted to a multiclass algorithm using a “one vs all multiclass” module I found in the studio pallet.  I saved the trained models for each.   Then I started with the diagram in Figure 4 and I started adding things.   The result is in Figure 5


Figure 5.   A best of three classifier service created by modifying the service in Figure 4.

You will notice that this is almost like three copies of the service in Figure 4.  The difference is that I needed to reduce the output to a smaller set of tuples to give to a simple python module to do the majority voting.   This was done with column projections.   The leftmost project selects the “class” and “Scored Labels” columns (discarding the title document and the doc vector)  the second selects “class” and “Scored Labels” and the third selects only the “Scored Labels”.    Then using an “Add Column” component we append the last column to the output of the second project.   By doing this we now have two inputs to a Python Script module (which is limited to two dependences).    The Python code inside the script module is shown below.   In this version we assume that if all three scored labels disagree we should pick the first (from the multiclassNN classifier) as the main one and, arbitrarily pick the second scored Label as the “second choice”.   Otherwise if any two agree that becomes the first choice.

#   Param: a pandas.DataFrame
#   Param: a pandas.DataFrame
def azureml_main(dataframe1 = None, dataframe2 = None):
    tclass = dataframe1["class"]
    scored1 = dataframe1["Scored Labels"]
    scored2 = dataframe2["Scored Labels"]
    scored3 = dataframe2["Scored Labels (2)"]
    scored = []
    second = []     
    lclass = []
    for i in range(0, len(tclass)):
        if scored2[i] == scored3[i]:
data = {'class': lclass, 'Scored Labels': scored, 'second': second}
df = pd.DataFrame(data, columns=['class', 'Scored Labels', 'second'])

# Return value must be of a sequence of pandas.DataFrame
return df,

The webservice now returns three values.   The original class designation from ArXiv, our best-of-three choice and a second choice (which may be the same of best-of-three).    If we now look at the confusion matrix for the original arxivdata data (including the training examples) we get


Figure 6.   Best-of-three with original data.

Figure 7 shows the result when we use the dataset arxiv-11-1-15 that contains no documents from the training.


Figure 7.  Best of three with arxiv-11-1-15 data.

The improvement over the original method shown in Figure 3 is about 15%.   Of course we are now giving the classifier two chances to get the answer right.

As mentioned above we probably could do much better by tuning the ML parameters.   But the point of this post was to show you what is possible with a very modest effort with AzureML.    In the next post we will look at performance issues, scalability and streaming examples.

Science Gateways and Sustaining Science Software

I recently had the pleasure of attending the 10th Gateway Computing Environments (GCE) Workshop. The GCE program was created about 10 years ago as part of NSF TeraGrid project.   Nancy Wilkins-Diehr of the San Diego Supercomputing Center has been the leader of GCE from day one and she continues to do an amazing job with the project and organizing the workshops.  GCE has become a large and very productive collaboration in the U.S. and there is also now an International branch with meetings in Europe and Australia.

If you are unfamiliar with the science gateway concept a one paragraph tutorial is in order.   A science gateway is a web portal or app that is tailored to research needs of a specific scientific community. The gateway can allow its users access to advanced applications and data collections.   For example, the Cyberinfrastructure for Phylogenetic Research (CIPRES) gateway ( explores the evolutionary relationships among various biological species.   It allows its users to run dozens of computationally intensive phylogenetic tree inference tools (e.g., GARLI, RAxML, MrBayes) on NSF’s XSEDE supercomputers.   Another great example is NanoHub that was built at Purdue University to provide education and research tools for nanotechnology. In the genomics research world GenomeSpace is a gateway for a large collection of tools and it represents a broad community of developers and users.

I was around for the early days of GCE and I also attended the International event in Dublin in 2014. I have been amazed by the progress this community has made. Ten years ago the science gateway program in TeraGrid was a small project that was seen by the big computing centers as “mostly harmless”[1]. However it is now a big deal. NanoHub has evolved into and it supports many additional domains including cancer research, pharmaceutical manufacturing, earthquake mitigation, STEM education.   They have had 1.8 million users in 2014.   CIPRES is now one of XSEDE’s largest users consuming 18.7M compute core hours in 2014.   Of course there are many big individual users doing great science at scale on the NSF supercomputer centers, but CIPRES and some other gateways represent about one half of all XSEDE users. The bottom line is that the nation’s investment in supercomputing is paying off for a very wide spectrum of scientists.

The workshop had some excellent talks.   There was a fantastic Keynote by Alexandra Swanson from Oxford and the Zooniverse project. Zooniverse is a rather unusual gateway.   Rather than being supercomputer powered it is “people powered”.     It started with the Galaxy Zoo project but now supports just about any research project that needs large teams of volunteers to help sift through vast piles of (mostly) image data and help with classification and discovery. It is a bit like the Amazon mechanical Turk but much more science focused. Ali’s talk (slides here) really stressed the challenge of building and designing the portal technology need to allow a user to build a productive instance of a universe.

The slides for all the presentations are on the program page for the workshop. They were all good but two of them were very interesting to me. Pankaj Saha from Binghamton University gave a great talk about how they are now integrating Docker, Marathon and Mesos with the Apache Airavita project. Airavita is a framework for orchestrating the computational jobs and workflows of a science gateway onto supercomputers, local clusters and commercial clouds.   It is high component based and a natural fit for Docker, Marathon and Mesos.   The talk was only a snapshot of work in progress and I look forward to seeing how well they are able to exploit these tools to make Airavita extremely easy to deploy and manage.

Shava Smallen of the SDSC gave a fascinating talk about how machine learning can be used to predict failures and manage resources for the CIPRES gateway.   My experience with one science gateway many years ago was that various parts were always crashing. (At the time, I preferred to blame the users and not our software.)   This SDSC project is really interesting. I am convinced that this is a great application for ML.   Distributed system software like Zookeeper, Mesoshpere and Marathon along with extremely reliable messaging services like RabbitMQ have made huge strides in our ability to design robust systems.   But Supercomputing platforms are designed to push the edge in terms of performance and are not very friendly to long running systems like science gateways.   Doing more of this sort of “smart” monitoring and failure prediction is very important.

Sustainable Open Source Scientific Software

The other Keynote at the event was by Dan Katz from Chicago and NSF.   While his talk was not about science gateways he did raise an issue that is of great concern to the gateway community.   Specifically how do we maintain software when the grant funding that paid for its construction runs dry?   This question has been around for as long as scientists have been writing code.

I think we have to ask the following question. What makes open source software sustainable over long periods of time? In other words, who pays the salary of the programmers tasked with maintaining and improving it? I believe there are several answers to these questions.   Fundamentally when a piece of software becomes a critical component of the infrastructure of an entity or entities with deep pockets those entities will make sure that the software is sustained.   There are a number of big tech companies that depend upon various bits of open source and they pay their people to make sure it continues to work.   I know that Microsoft is an active contributor to several open source platforms. Companies like IBM, Facebook, Twitter, Amazon, and Google are all supporting open source projects. Often these same companies will take something that was developed internally and open source it because they see value in attracting other contributors. Good examples are Yahoo and Hadoop and Google and Kubernetes. Other sustaining entities are organizations like CERN, the National Institute of Health, the Department of Energy and NASA.   But in these cases the software being sustained is critical to their specific missions. The NSF has a greater challenge here.   Its budget is already loaded with supporting more critical infrastructure than it can afford if it is to maintain its mission to support cutting edge science across a vast spectrum of disciplines.

The other way open source software originating in universities is sustained is to spin-off a new start-up company. But to make this happen you need angel or venture funding.   That funding will not appear if there is no perceived value or business model. Granted that in today’s start-up frenzy value and business model are rather fuzzy things.   Often these companies will find markets or they will be bought up by larger companies that depend upon its product.   UC Berkeley has been extremely impressive in this regard. has spun out of the successful Spark data analysis system and is a spin-off form the Mesos project.   Both of these are very exciting and have a bright future.

The third way open source survives is the labor of love model.   Linux was maintained by dedicated cadre of professionals, many of whom contribute in the off-hours after their day jobs.   The Python community is another remarkable example and the best example of sustaining open source scientific software. Companies like Enthought and Continuum Analytics have emerged from the community to support Python for technical computing. Enthought supports, the great resource for scientific Python tools.   Continuum provides the peerless free Anaconda python package. And the Numfocus foundation has been created by the community to channel financial sponsorship to many of the most significant science projects of interest to the Python user community.

Looking at the success of the Python, Hadoop, Mesos and Spark one can see critical common threads that are missing in many research software projects.  First there must be a user community that is more than casual.   It must be passionate. Second it must be of value to more than these passionate scientists. It must help solve business problems.   And this is the challenge facing the majority of science software which is often too narrowly focused on a specific science discipline.   I have often seen groups of researchers that have the critical mass to make a difference, but they are not sufficiently collaborating.   How many different scientific workflow systems do we need? All too often the need for academic or professional promotion gets in the way of collaboration and the result is duplication.

I believe the Science Gateway community is doing the right thing. I see more consolidation of good ideas into common systems and that is great. Perhaps they should create a Numfocus-like foundation? And they need to solve some business problems. 🙂

[1] A description borrowed from the Hitchhikers Guide to the Galaxy. The full description for the planet Earth was “mostly harmless”.   This was the revised extended entry.   The earlier entry for the planet Earth was “harmless”.

Data and Code Available for Science Doc ML and microservice experiment

The data and code used in the azure python streaming data experiments that were described in the blog on microservice performance architecture and Processing Scholarly Event Streams in the Cloud. The code has been rewritten to be free of all azure dependencies with the exception of the use of Azure Tables for the final storage from the table web service. It is certainly possible to rewrite this to use another database.

There are four type of data

  1. The arxiv configuration files. They take the form of config_name.json where name can be all4 (the top level), bio (the arxiv q-bio objects), compsci (computer science), math, phy (Physics), finance (finance).
  2. The machine learning model files (as generated by described below)
  3. The raw daa from the streams. There are three of these. The sciml_data_arxiv is the original data set from arxiv. sciml_data_arxiv_new_9_28_15 is a recent snapshot of arxiv data not used in the training a portion of this was used for the training set. The sciml_data_scimags is the rss data from the various science mags.
  4. The output of the main (top level) classifer. This was used to push events directly to the message broker for use in the performance analysis. This takes the form dump_all_subtopic_name where name is one of q-bio, Physics, compsci, math or physics. (note these are not the same as the names on the config files.)

The data is stored in two places.

  1. The configuration data, the rss feed input data and model data is stored on a publicly readable oneDrive site. The url for this is (cut this link and paste it into your browser.)
  2. The oneDrive files are ok for download from the browser, but not as efficient for program level access. So the programs here read the files from an public, read-only account “” The code for reading the files is included in the source codes.

The code is now on gitHub at

it is not the prettiest code.   wouldn’t pass a code review at MS, but i am moving on to something else now.

Two New Research News Portals

I have always been a fan and avid reader of on-line news portals such as those from the New York Times and the Washington Post, but I am always looking for better sources of science news.   Of course there are several stalwarts, such as Nature on-line and Scientific American . Nature is the “go to” site for life science related news. SA is a property of Nature and, like Nature, only available on a paid subscription basis.   In the same high quality league is the AAAS on-line version of Science Magazine. PLOS, the Public Library of Science has a vast collection of research articles but they are not really a “news” site. However they do have an interesting set of blogs. The Society for Science & the Public (SSP) has sciencenews which has a strong science education mission.   Other sites that are more focused on bringing science to the people include Discovery and, to some extent, Wired.

There are a number of sites that are curated news aggregators, i.e. there is little original content, but rather a well curated selection of links to other useful sites.   One good example is Science Central.   One of the sites they point to is LiveScience which is an online publication of a media content company called Purch.

These sites are all valuable and they can definitely help elevate the low level of science knowledge among the general population (though I fear most US Presidential candidates are immune to scientific thought or discoveries). But what I have always wanted to see are more news sites related to my interests in computer science, high performance computing and applications.   So far the best have been the ACM and IEEE portals such as the ACM Communications site and for HPC news there is the venerable HPCwire. The Communications site give a great preview of the ACM print version and it also does a good job of news aggregation for the computer science community.   HPC wire is very focused on the supercomputing community supported by DOE and NSF.   It is a great way to keep up on who is doing what.   For example there is a nice piece by Paul Messina on code optimization steps being taken to get the most out of the new Aurora system.researchnews_sciencenode

Now we have two additional computer and computational science news sites that are certainly nice new resources.   One is from my old friends at Microsoft Research called Research News.   This site is still Beta but it is worth a look.   It is a very well done news aggregator that is focused on computer science and applications.   It also has a reasonable event calendar and well curated spotlight and feature selections. The editorial advisors are all from MSR and I thought I would see more MSR-related content, but it was not as much as I expected. (I did learn about Pranja, a Spark-like data analysis tool done in F# and I will to follow up more on that later.) I was disappointed to find no search function. Where is the Cortana and Bing tie-in?   I expect they are working on that one.

The other new site is   Science Node is a redesign and rethink of the iSGTW (International Science Grid This Week) and it is much improved.   (Full disclosure. Science Node is now largely led by other former colleagues at Indiana University, but I only discovered that later when I decided to write about it.) In many ways ScienceNode is a nice compliment to ResearchNews.   It is an aggregation site, but it is also nicely edited with some original content. For example there is a nice piece about the 3D digital sketching work by Julie Dorsey. They do have a search function, but more interesting is that they encourage community involvement by allowing reader to post links to blogs or to pitch a story.

In summary, I am excited to see ResearchNews and ScienceNode appear.   They are both great contributions and I look forward to seeing how they evolve.

Processing Scholarly Event Streams in the Cloud

Every day scientists publish research results. Most often the resulting technical papers are sent to the scientific journals and often the abstracts of these papers make their way onto the Internet as a stream of news items that one can subscribe to through RSS feeds[1]. A major source of high quality streamed science data is the Cornell University Library which is a collection of over one million open-access documents.   Other sources include the Public Library of Science (PLOS ONE), Science and Nature.   In addition when science reporters from the commercial press hear about new advances, they often publish articles about these discoveries that that also appear as part of the RSS stream. For example Science Daily, Scientific American, New Scientist, Science Magazine and the New York Times all have science news feeds.

Suppose you are doing research on some topic like “black holes and quantum gravity” or “the impact of DNA mutation on evolution” or “the geometry of algebraic surfaces”.   Today you can subscribe to some of these topics through the RSS feed system, but suppose you had a virtual assistant that can alert you daily to any newly published progress or news reports related to your specific scientific interests. You may also want to see science results from disciplines that are different from your standard science domains that are may be related to your interests.   What I would really like is a “Science Cortana” that can alert me with messages like “here are some interesting results from mathematics have been published today that relate to your science problem…” or “Yo! You know the problem you have been working on … well these people have solved it.”

Object classification using machine learning is a big topic these days.   Using the massive on-line data collection provided by user photos in the cloud and movies in YouTube, the folks at Google, Microsoft, Facebook and others have done some amazing work in scene recognition using deep neural networks and other advanced techniques. These systems can recognize dogs down to the breed … “this dog is a Dalmatian” … or street scenes with descriptions like “a man with a hat standing next to a fire truck”.   Given this progress, it should be an easy task to use conventional tools to be able to classify scientific paper abstracts down to the scientific sub-discipline level.   For example, determine if a paper is about cellular biology or genomics or bio-molecular chemistry and then file it away under that heading. In this post I describe some experience with building such an application.

This is really the first of a two-part article dealing with cloud microservices and scientific document analysis.   The work began as I was trying to understand how to build a distributed microservice platform for processing events from Internet sources like twitter, on-line instruments, and RSS feeds.   Stream analytics is another very big topic and I am not attempting to address all the related and significant issues such as consistency and event-time correlation (see a recent blog post by Tyler Akidau that covers some interesting ideas not addressed here.) There are many excellent tools for stream analytics (such as Spark Streaming and Amazon Kinesis and Azure Stream Analytics) and I am not proposing a serious alternative here. This and the following post really only address the problem of how can a microservice architecture be used for this task.

The application: classifying scientific documents by research discipline

It turns out that classifying scientific documents down to the level of academic sub-discipline is harder than it looks.   There are some interesting reasons that this is the case.   First, science has become remarkably interdisciplinary with new sub-disciplines appearing every few years while other topics become less active as the remaining unsolved scientific problems become scarce and hard.   Second a good paper may actually span several sub-disciplines such as cellular biology and genomics.   Furthermore articles in the popular press are written to avoid the deeply technical science language that can help classify them.   For example, a story about the search for life on the moons of Jupiter may be astrophysics, evolution, geophysics or robotics.   Or all of the above.   If the language is vague it may not be possible to tell.   Finally there is the issue of the size of the available data collections. There are about 200 science articles published each day. As an event stream this is clearly not an avalanche.   The work here is based on a harvest of several months of the science feeds so it consists of about 10,000 abstracts.   This is NOT big data.   It is tiny data.

To approach the machine learning part of this problem we need two starting points.

  1. A list of disciplines and subtopics/sub-disciplines we will use as classification targets
  2. A training set.   That is a set of documents that have already been classified and labeled.

For reasons described below the major disciplines and specific sub-disciplines are

Astrophysics and astronomy
General Relativity, Gravitation, Quantum Gravity
Condensed Matter Physics
High Energy Physics
Mathematical Physics
Nuclear Physics
Quantum Physics
Cell Behavior
Subcellular Organization
Computer Science
Artificial Intelligence & Machine Learning
Engineering & Math Software
CS and Game Theory
Data Structures, Algorithms & Theory
Systems, Programming Languages, Soft Eng
HCI & Graphics
Databases & Information Retrieval
Networks, Security and Soc Nets
Computational Finance
Statistical Finance
Portfolio Management
Risk Management
General Finance

Table 1.   Semi-ArXiv based topic and sub-discipline categories.  (ArXiv has even finer gained categories that are aggregated here.)

Unfortunately, for the many of our documents we have no discipline or subtopic labels, so they cannot be used for training.   However the ArXiv[2] collection is curated by both authors and topic experts, so each document has a label.   Consequently we have built our training algorithms to correspond to the discipline subtopics in ArXiv.   In some cases the number of discipline subtopics in ArXiv was so large it was impossible for our learning algorithm to distinguish between them. For example mathematics has 31 subtopics and computer science has 40. Unfortunately the size of the data sample in some subtopics was so tiny (less than a few dozen papers) that training was impossible. I decided to aggregate some related subtopics for the classification. (For mathematics I chose the four general areas that I had to pass as part of my Ph.D. qualification exam! A similar aggregation was used for computer science.)

ArXiv has another discipline category not represented here: Statistics. Unfortunately, the interdisciplinary problem was so strong here that most of Statistics looked like something else.   A large fraction of the statistics articles were in the category of machine learning, so I moved them to the large sub-discipline of CS called AI-ML.   The rest of statistics fell into the categories that looked like applied math and theory so they were moved there.

Using the ArXiv classification as training data worked out well but there was one major shortcoming. They do not have all major science disciplines covered.   For example there is no geoscience, ecology or psychology. (PLOS covers a much broader list of disciplines and we will use it to update this study soon.)   This lack of earth sciences became very noticeable when classifying papers from the popular science. I will return to this problem later.

The application is designed as three layers of microservices as shown in the diagram below (Figure 1). The first layer pulls recent docs from the RSS feeds and predicts which major category is the best fit for the document.   It then pushes this document record to second level services specific to that topic. In fact, if our classifier cannot agree on a single topic, it will pick the best two and send he document to both subtopic analyzers.  The second level service predicts which subtopics are the best fit and then pushes the document to a storage service where it is appended to the appropriate sub-discipline

[1] Technically RSS is not a “push” event technology. It needs another service to poll the sources and push the recent updates out as events.   That is what we do here.

[2] FYI: ArXiv is pronounced archive.   Very clever.


Figure 1.   Basic Microservice architecture for science document classifier (click to enlarge)

The output will be a webpage for each subtopic listing the most recent results in that sub-domain.

What follows is a rather long and detailed discussion of how one can use open source machine learning tools in Python to address this problem.   If you are not interested in these details but you may want to jump to the conclusions then go to the end of the post and read about the fun with non-labeled documents from the press and the concluding thoughts.   I will return to the details of the microservice architecture in the next post.

Building the Document Classifiers

The document analyzer is a simple adaptation of some standard text classifiers using a “Bag of Words” approach.   We build a list of all words in the ArXiv RSS feeds that we have harvested in the last few months. This collection is about 7100 ArXiv documents. The feed documents consist of a title, author list, abstract and URL for the full paper. The ArXiv topic label is appended to the title string.   Our training is done using only the abstract of the article and the topic label is used to grade the prediction.

The major topic predictor and the sub-discipline classifiers both use the same code base using standard Python libraries.   In fact we use five different methods in all.   It is common believe that the best classifier is either a deep neural net or the Random Forest method.  We will look at deep neural nets in another post, but here we focus on Random Forest. This method works by fitting  a large number of random binary decision trees to the training data.   Another interesting classifier in the Scikit-Learn Python library is the Gradient Tree Boosting algorithm. This method uses a bunch of fixed size trees (considered weak learners) but then applies a gradient boosting to improve their accuracy. In both cases it is the behavior of the ensemble of trees that is used for the classification.

For any Bag-of-words method you first must convert the words in the training set into a dictionary of sorts so that each document is a sparse vector in a vector space whose dimension is equal to the number of words in the dictionary.   In this case we use a “count vectorizer” that produces a matrix of word token counts.   We then apply a Term Frequency – Inverse Document Frequency (TFIDF) normalization to a sparse matrix of occurrence counts. This is a sparse matrix where each row is a unit vector corresponding to one of the training set documents.  The size of the row vector (the number of columns) is the size of the vocabulary. The norm of the row vector is 1.0.   In the code, shown below, we then create the random forest classifier and the gradient tree boosting classifier and train each with the training data.


Figure 2.  Sample Python code for building the random forest and gradient tree boosting classifiers.

Finally we make a prediction for the entire set of documents in the ArXiv collection (all_docs in the code above).   This is all pretty standard stuff.

The training set was a randomly selected subset of ArXiv data. Using a relatively small training set of no more than 400 of the documents from each category we got the following results. With five major categories that represents about 33% of all of the ArXiv documents.   This provided a correct classification for 80% of the data using only the Random Forest Classifier.   Unfortunately the distribution of documents among the various categories is not uniform. Physics and Math together constitute over half of the 5500 documents.   Using a larger training set (75% of the documents from each category) gave a stronger result. However, when we use both Random Forest (RF) and Gradient Boosting (GB) and consider the result correct if either one gets the answer right, we get a success rate 93%. Looking more closely at the numbers, the success rate by topic is shown below.   We have also listed the relative false positive rate (100*number of incorrect predictions/the total size of the category)

Major topic % RF or GB correct % relative false positive rate
Physics 97.5 3.2
Math 92.3 14.6
Bio 88.9 1.6
Compsci 86.8 9.7
Finance 90.8 2.8

Table 2.   Basic top-level classifier results with success percentage if one of the two methods are correct.   The relative false positive rate is
100*total incorrect predictions of a document belonging to domain-x/sizeof(domain-x)

It is also worth noting that RF and GB agree only about 80% of the time and RF is only slightly more accurate than GB.   The false positive rates tell another story.   For some reason Mathematics is over predicted as a topic.   But there are additional more sociological factors at work here.

The choice of the ArXiv label often reflects a decision by the author and tacit agreement by the expert committee.   For example the article titled

“Randomized migration processes between two epidemic centers.”

Is labeled in ArXiv as [q-bio.PE] which means quantitative biology, with the sub area “population and evolution”.   This paper is illustrates one of the challenges of this document classification problem.   Science has become extremely multidisciplinary and this paper is an excellent example.   They authors are professors in a school of engineering and a department of mathematics. If you look at the paper it appears to be more applied math then quantitative biology. This is not a criticism of the paper or the ArXiv evaluators.   In fact this paper is an excellent example of something a quantitative biologist may wish to see. The Random Forest classifier correctly identified this as a biology paper, but the gradient boosting method said it was math. Our best of three (describe below) method concluded it was either computer science or math.   (In addition to some rather sophisticated mathematics the paper contains algorithms and numerical results that are very computer science-like.)

All of this raises a question.   Given a science article abstract, is there a single “correct” topic prediction? Textbooks about machine learning use examples like recognizing handwritten numerals or recognizing types of irises where there is a definitive correct answer.   In our case the best we can do is to say “this document is similar to these others and most of them are in this category, but some are in this other category”.   Given that science is interdisciplinary (and becoming more so) that answer is fine.   The paper should go in both.

This point can be further illustrated by looking at the subtopic level where the distinction between sub-disciplines is even more subtle.   Physics is the most prolific science discipline in the ArXiv.   It has many sub-discipline topics.   For our purpose we have selected set listed in Table 1.   Our classifier produced the following results when restricted to Physics using a training set consisting of 75% of the items in each category but no more than 200 from each.   The overall score (either RF or GB correct) was 86%, but there were huge false positives rates for General Relativity (GR-QG) and High Energy Physics (HEP) where we restricted HEP to theoretical high energy physics (HEP-th).

Sub-discipline %correct 100*relative false positive rate
Astrophysics 88.5 6.24
General Relativity & Quantum Grav 89.4 28.33
Condensed Matter 74.7 0.0
High Energy Physics 77.8 21.4
Mathematical Physics 73.7 0.0
Nuclear Physics 66.7 0.0
Quantum 74.6 0.0

Table 3.  Classifier results for Physics based on RF or GB predicting the correct answer

In other words the system predicted General Relativity in places were that was not the correct category and High energy physics also incorrectly in others.   These two subtopics tended to over-predict. A closer look at the details showed many HEP papers were labeled as GR-QG and visa-versa.   There is a great way to see why.   Below are the “word cloud” diagrams for HEP, GR-QC and Astrophysics.   These diagrams show the relative frequency of words in the document collection by the size of the font.  Big words are “important” and little ones less so.   As you can see, the HEP and GR-QC word clouds are nearly identical and both are very distinct from Astrophysics.


Figure 3.  Word cloud for documents in General-Relativity  and Quantum Gravity

high_engFigure 4.  Word cloud for Theoretical High Energy Physics documents


Figure 5.  Word cloud for Astrophysics

(Restricting the High Energy Physics category to only Theoretical papers caused this problem and the results were not very valuable, so I added additional subcategories to the HEP area including various topics in atomic physics and experimental HEP. This change is reflected in the results below.)

A Clustering Based Classifier

Given the small size of the training set, the random forest method described above worked reasonably well for the labeled data. However, as we shall see later, it was not very good at the unlabeled data from the popular press.   So I tried one more experiment.

There are a variety of “unsupervised” classifiers based on clustering techniques.   One of the standards is the K-means method.   This method works by iterating over a space of points and dividing them into K subsets so that the members of each subset are closest to their “center” than they are to the center of any of the others clusters.   Another is called Latent Sematic Indexing (LSI) and it is based on a singular value decomposition of the document-term matrix.   And a third method is called Linear Discriminant Analysis (LDA) which build linear expressions of features that can be used to partition documents.

The way we use these three methods to build a trained classifier is a bit unconventional.   The basic idea is as follows.   Each of the three methods, K-means, LSI and LDA has a way to ask where a document fits in its clustering scheme.   In the case of K-means this is just asking which cluster is nearest to the point represented by the document.   We use the LSI and LDA implementations from the excellent gensim package. In the case of LSI and LDA, we can use a special index function which performs a similarity query.

As before we begin by converting documents into unit vectors in a very high dimension space.   We begin with the list of documents.

The list of words from these abstracts is then filtered to remove Standard English stop words as well as a list of about 30 stop words that are common across all science articles and so are of no value for classification.   This includes words like “data”, “information”, “journal”, “experiment”, “method”, “case” etc. Using the excellent pattern Python package from the Computational Linguistics & Psycholinguistics Research Center at the University of Antwerp, we extract all nouns and noun phrases as these are important in science.   For example phrases like “black hole”, “abelian group”, “single cell”, “neural net”, “dark matter” has more meaning that “hole”, “group”, “cell”, “net” and “matter” alone.   Including “bi-grams” like these is relatively standard practice in the document analysis literature.

For each document we build a list called “texts” in the code below. texts[i] is a list of all the words in the i-th document.   From this we build a dictionary and then transform the Text list into another list called Corpus. Each element in Corpus is a list of the words in the corresponding document represented by their index in the dictionary.   From this corpus we can build an instance of the LSI or LDA model.   We next transform the corpus into tfidf format and then into the format LDA or LSI need.   From these we create an index object that can be used to “look up” documents.


For example, suppose I have a new four-word document “dna genome evolution rna” and I want to find the abstracts in our collection that are most similar.
Index_lsi (or Index_lda) returns a list of floating point numbers where the i-th number is the similarity in lsi space to our new document.   To find the top five most similar documents we enumerate and sort the list. Looking at the head of the list we have


Document 534 is one with title “Recombinant transfer in the basic genome of E. coli” and is indeed a good match for our rather short query document.

The best of three classifier

So now we have three ways to find documents similar to any given document.   Here is how we can use it to build a crude classifier we call “best-of-three”.

  1. Assume you have n target categories.   At the top level we have n = five: Physics, Biology, CS, Math and Finance.
  2. Sort the training set of documents into n buckets where each bucket consist of those documents that correspond to one of the categories. So bucket 1 contains all training documents from physics and bucket 2 has all training documents from Biology, etc.
  3. Given a new document x we compute the top 5 (or so) best matches from each of the three methods (KM, LSI and LDA) as shown above.   We now have 15 (or so) potential matches.
  4. We now have 15 (or so) potential matches. Unfortunately each of the three methods uses a different metric to judge them and we need to provide a single best choice. We take a very simple approach. We compute the cosine distance between the vector representing the document x and each of the 15 candidates.   The closest document to x is the winner and we look to see in which bucket that document lives and the label of that bucket is our classification.

The Centroid classifier

Unit length vectors are points on a high dimensional sphere. The cosine distance between two vectors of unit length is simply the distance between those points.  Because the best-of-three method uses the cosine distance as the final arbiter in a decision process we can envision an even simpler (and very dumb) method.   Why not “summarize” each bucket of training set items with a single vector computed as the “centroid” of that set of training set documents. (By centroid here we mean the normalized sum of all the vectors in training set bucket k. )  We can then classify document x by simply asking which centroid is closest to x.   This turns out to be a rather weak classifier. But we can improve it slightly using he best-of-three method.   For each bucket k of documents with training label k, we can ask KM, LDA and LSI which other documents in the training set are close to those elements in k.   Suppose we pick 5 of the nearby documents for each element of the bucket and add these to an extended bucket for label k as follows.


We now have buckets that are larger than the original and the buckets from different labels will overlap (have non-trivial intersections).   Some physics documents may now also be in computer science and some math documents will now also be in biology, but this is not so bad.   If it is labeled as being in physics, but it looks like computer science then let it be there too.

The Results

We now compare the methods RandomForest (rf), Best-of-Three (best) and the new centroid classifier (cent) against each other for all the labeled data.   The results are complicated by two factors related to the selection of the training sets.   The balance between the numbers of papers in each subcategory is not very good.   At the top level, physics has thousands of papers and finance has only a few hundred. Similar imbalances exist in all of the individual disciplines.   Consequently if you take a random selection of X% of the labeled data as the training set, then the prolific disciplines will be heavily represented compared to the small disciplines.  The result is that the false positive rates for the heavy disciplines can be extremely high (greater than 10%  of all documents.)  On the other hand if you take a fixed number of papers from each discipline then the training set for the big disciplines will be very small. The compromise used here is that we take a fixed percent (75%) of the documents from each discipline up to a maximum number from each then we get a respectable balance, i.e. false positive rates below 10% . The table below is the result for the top level classification.

final-allTable 4.  Top-level results from 7107 labeled documents with a training set consisting of 63% (with a maximum of 1500 documents from each sub-discipline.)

(In Table 4 and the following tables we have calculated the “correct” identifications as the percent of items of domain x correctly identified as being x.  This is the same as “recall” in the ML literature. The “false pos” values is the percent of items incorrectly identified as being in category x.  If n is the total number of documents and m is the number identified as physics but are not physics then the table represents 100*n/m in the physics column.  This is not the same as the textbook definition of false positive rate. )

In the tables below we show the results for each of the disciplines using exactly the same algorithms (and code) to do the training and analysis.

final-phyTable 5.  Physics sub-domain consisting of 3115 documents and a training set of 53%.  (300 doc max per subcategory)


Table 6.  Math sub-domain consisting of 1561 documents and a training set size of 39% (200 doc max per subcategory)


Table 7.  Biology sub-domain consisting of 1561 documents and a 39% training set size (200 docs max size per subcategory)


Table 8.  Computer Science sub-domain consisting of 1367 documents and 56% training set size (200 doc max docs per subcategory)


Table 9.  Finance sub-domain consisting of 414 documents and a 64% training set size (50 documents max per subcategory)

 As can be seen from the numbers above that Random Forest continues to be the most accurate, but the Best-of-Three method is not bad and in some cases outperformed Random Forest.   And, because we assume that science is very interdisciplinary, we can use both methods as a way to push documents through our system.   In other words if bf and best agree that a document belong in category X, then we put it there.   But if they disagree and one says category X and the other says category Y, then we put the document in both.

Non-labeled documents

Now we turn to the non-labeled data coming from the RSS feeds from Science Daily, Scientific American, New Scientist, Science Magazine and the New York Times.   The reader may wonder why we bothered to describe the centroid method and list its performance in the table above when it is clearly inferior to the other methods on the labeled ArXiv data.   The surprise is that it beats the Random Forest and Best-of-Three on the popular press data.

I do not have a good explanation for this result other than the fact that the popular press data is very different from the clean scientific writing in science abstracts. Furthermore the science press feeds are often very short.   In fact they often look more like tweets.   For example, one from Science Daily reads

“Scientists have found that graphene oxide’s inherent defects give rise to a surprising mechanical property caused by an unusual mechanochemical reaction.”

The subject here is materials science and the Best-of-Three algorithm came the closest and called this physics. In other cases all the algorithms failed. For example from the science day feed this article

“While advances in technology have made multigene testing, or \’panel testing,\’ for genetic mutations that increase the risk of breast or other cancers an option, authors of a review say larger studies are needed in order to provide reliable risk estimates for counseling these patients.”

was determined to be “Finance” by all three algorithms.

And there were many posts that had no identifiable scientific content.   For example
“Casual relationships, bittersweet news about chocolate, artisanal lightbulbs and more (full text available to subscribers)”


“If social networks were countries, they’d be police states. To change that we may have to rebuild from the bottom up.


“Footsteps of gods, underground dragons or UFOs? Rachel Nuwer joins the fellowship of the rings out to solve the enigma in the grassy Namibian desert.”

This does not mean these stories had no scientific or educational value.   It only means that the abstract extracted from the RSS feed was of no value in making a relevance decision.

Another major problem was that our scientific categories are way too narrow. A major fraction (perhaps 70%) of the popular press news feeds are about earth science, ecology, geophysics, medicine, psychology and general health science.   For example, the following three have clear scientific content, but they do not match our categories.

“Consumption of sugary drinks increases risk factors for cardiovascular disease in a dose-dependent manner — the more you drink, the greater the risk. The study is the first to demonstrate such a direct, dose-dependent relationship.”

“Nearly a third of the world’s 37 largest aquifers are being drained faster than water can be returned to them, threatening regions that support two billion people, a recent study found.”

“While the risk of suicide by offenders in prison has been identified as a priority for action, understanding and preventing suicides among offenders after release has received far less attention.”

By extracting a random sample of 200 postings and removing those that either had no scientific content or no relationship to any of our main disciplines, the results were that Centroid was the best classifier of the remainder correctly identifying 56%.   Best together with Centroid the success rate rose to 63%.   Random forest only correctly identified 16% of the papers because it tended to identify many papers as mathematics if the abstract referred to relative quantities or had terms like “increased risk”.

Final Thoughts

The greatest weakness of the approaches I have outlined above is the fact that the methods do not provide a “confidence” metric for the selections being made. If the classifier has no confidence in its prediction, we may not want to consider the result to be of value.   Rather than putting the document is the wrong category it can be better to simply say “I don’t know”.   However it is possible to put a metric of “confidence” on Best and Centroid because each rank the final decisions based on the cosine “distance” (actually for the unit length vectors here, the cosine is really just the dot product of the two vectors).   The nearer the dot product is to 1.0 the closer two vectors are.

For the Centroid method we are really selecting the classification category based on the closest topic centroid to the document vector.   It turns out that for the majority of the public press documents the dot products with the centroids are all very near zero. In other words the document vector is nearly orthogonal to each of the topic centroids. This would suggest that we may have very little confidence in the classification of these document. If we simply label every document with a max dot product value less than 0.04 as “unclassifiable” we eliminate about 90% of the documents. This includes those documents with no clear scientific content and many of those that appear to be totally unrelated to our five theme areas.   However, if we now compute the true positive rate of the centroid classifier on the remainder we get 80% correct.   For Best or Centroid together we are now over 90% and RF is up to 35% (now slight above a random guess).

We can now ask the question what max dot product cutoff value will yield a true positive rate of 90% for the centroid method? The table below shows the true positive value and the fraction of the document collection that qualify for dot product values ranging from 0.04 to 0.15 (called the “confidence level” below.


Another way of saying this is that if the maximum of the dot products of the document vector with the topic centroids is above 0.05 the chance of a correct classification by the Centroid method is above 90%.   Of course the downside is that the fraction of the popular press abstracts that meet this criteria is less than 7%.

Next Steps

The two great weaknesses of this little study are the small data set size and the narrowness of the classification topics.   The total number of documents used to train the classifiers was very small (about 6,000).   With 60,000 or 600,000 we expect we can do much better.   To better classify the popular press articles we need topics like geoscience, ecology, climate modeling, psychology and public health. These topics make up at least 70% of the popular press articles. In a future version of this study we will include those topics.

Another approach worth considering in the future is an analysis based on the images of equations and diagrams. Scientists trained in a specialty can recognize classes of scientific equations such as those for fluid dynamics, electromagnetism, general relativity, quantum mechanics or chemical reaction diagrams or biochemical pathways, etc.   These images are part of the language of science.   Image recognition using deep neural networks would be an ideal way to address this classification problem.

In the next post I will describe the implementation of the system as a network of microservices in the cloud.   I will also release the code use here for the document training and I will release the data collection as soon as I find the best public place to store it.

Programming the Cloud with Microservices, Docker and Mesosphere


The cloud is designed to be a host for applications that are realized as scalable services. For example a web server or the backend of a mobile app. In this case the application accepts random connections from remote clients and, based on the client’s request, it does some computation and returns a response. If the number of clients that are concurrently requesting service grows too large for one server to handle, the system should automatically spawn new instances of the server to share the load. Another scenario is an application that processes events from remote sensors to inform a control system on how to respond.   For example, geo-sensors detecting ground motion tremors occurring in a significant pattern, then sound an earthquake warning.   In this case the cloud part of the application may involve multiple components: sensor signal decoders, pattern analysis integrators, database searches, and alarm system interfaces.   Another example is a large-scale data analytics system processing large amounts of text data. For example, a search index for the World Wide Web.

Each of these cloud applications illustrates one of three basic paradigms of programming the cloud. They are

Basic web server gateway. A single server that responds to requests from a user. Scaling is accomplished by replication. We will discuss this further in another section on Dockerized Science Gateways.

Bulk Synchronous Parallel methods. This is a very old and respected model for parallel programming characterized by running computation in a sequence of phases where each phase involves a single task that is executed in parallel on multiple servers. When the tasks terminate a barrier synchronization is used to permute the data so that the second phase of computation can begin.   For example, Spark commands from the Scala REPL or Python API operate on distributed data collections in this manner.   Hadoop Map Reduce is essentially a BSP program.


An asynchronous swarm of communicating processes distributed over a virtual network in the cloud.   The individual processes may be stateless, such as a simple web service or stateful such as found in the actor programming model.


An excellent example of a tool for programming asynchronous swarms via actors is Orleans from Microsoft Research   Orleans has been used internally at Microsoft on some very large scale projects including the game Halo Online and it is now available as open source.


Recently this asynchronous swarm style has been rebranded with a new philosophy under the name “Microservices’’. The problem addressed by microservices is that of how to design and build a large, heterogeneous app so that it will be secure, maintainable, fault tolerant and scalable? This is particularly important for very large, on-line services that need to support thousands of concurrent users. The app must remain up twenty-four hours a day but still maintained and upgraded. The software engineering approach to this problem is called “Dev Ops” and it integrates product delivery, quality testing, feature development, and maintenance releases in order to improve reliability and security and provide faster development and deployment cycles.   From the programmer’s perspective this is a “you built it, so now you run it” philosophy. Updates to parts of this system occur while the system is running so there is a very tight integration of the developers with the IT-pros managing the system.

The microservice solution to this challenge is to partition the app into small, independent service components communicating with simple, lightweight mechanisms. The microservice paradigm design rules dictate that each microservice must be able to be managed, replicated/scaled, upgraded and deployed independent of the others microservices. Each microservice must have a single function and operates in a “bounded context”, i.e. having very limited responsibility and limited dependence on other service. When possible one should reuse existing trusted services such as databases, caches, directories, etc. All microservices should be designed for constant failure and recovery. The communication mechanisms used are varied and include REST web service calls, RPC mechanisms like SWIFT and the Advance Message Queueing Protocol AMQP system that are common in IoT applications and are well supported in the commercial clouds.

The microservice philosophy has been adopted in various form by many companies including Netflix, Google, Microsoft, Spotify, Amazon and others.

Docker, Swarm, Kubernetes and Mesosphere

To build a substantial microservice application one needs to build on a platform that can manage large collections of distributed, communicating services. Thanks to the open source movement we now have an excellent collection of tools for building and managing microservices.   These are

  1. Containers. This revolutionary technology that allows us an alternative to deploying services as heavy-weight virtual machines.   “Containers” run on a host OS and use basic services such as kernel namespaces and resource isolation provided by that OS. Docker containers contain all the OS components and libraries needed by the app, various user-added files and apps and instructions for what process to run. Docker exploits the host OS namespaces for isolation, control groups for resource management.   It uses the Union File Systems so that containers are built as layers on existing FS. The company also provides a registry of dockerized tools and application that can be downloaded. We show an example of using Docker and the registry in another section.
  1. CoreOS. As noted the host OS for Docker need not be very heavy because each container will have its own OS features and libraries required by the application. CoreOS is a basic “stripped down” version of Linux that contains only those tools needed to support Docker containers.

The tools for managing clusters of microservices hosting containerized microservices are many.   The ones we have tested are

  1. Swarm. provides a tool for managing clusters of servers running Docker container instances. We tested Swarm on Azure using the client Docker-machine and it was relatively easy to bring up a cluster of machines and deploy apps.   Swarm shows great promise but it is still beta quality and we found the documentation still rather sparse.
  2. Kubernetes was recently released by Google and is based on the cluster management tools they use. We also found it relatively easy to deploy Kubernetes on Azure using another tool called Weave to support the networking.   Kubernetes has an important feature not present in the other microservice cluster management tools.   In Kubernetes every container lives inside a “pod”.   Pods can contain a single Docker container instance or several instances.   This is important because often a single microservice instance may always collaborate with another microservice instance of the same or different type. For example a Dockerized web server may need a private instance of a Redis cache or SQL database.   These are relatively standard docker components, so there is no reason to deploy them in your web server container.   Because each pod instance runs on a single server, the containers in the same pod share resources such as private directories. Communication within a server is going to be faster and more reliable than communication across the network.
  3. Mesosphere. One of the many products from the UC Berkeley AMP lab was a distributed computing operating system (dcos) called Mesos.   It has since been spun off as a startup called   Mesosphere installed easily on Azure and it has proven to be extremely reliable.   Because we use it so extensively we have an addition page about it here.
  4. Microsoft Azure Service Fabric. Microsoft recently released a development kit for a microservice orchestration framework that they have been using internally for a while. The SDK is easy to install into visual studio and it comes with an emulator so you can start building apps. I have not yet had a chance to try the full deployment.   Mark Russinovich has a nice blog about this and an online video of a talk he gave describing the system.

In the next post, we will describe our experience building a microservice application to do document classification using machine learning tools.   However if you want to get a feeling for how one can use Mesosphere and Python to build simple microservices, then we have a page where we show you all the gory details.  It is called “Fun with Mesosphere and Microservices“.