Observations about Serverless Computing

With a few examples from AWS Lambda, Azure Functions and OpenWhisk.

Cloud computing is going through an interesting evolution.  It has gone from a platform for deploying virtual machines to planet-scale systems with extensive collections of data storage, analysis and machine learning services.   Most recently we have seen the emergence of “cloud native” computing, which in its most basic form involves a design pattern of microservices where big applications are decomposed into hundreds of basic stateless components that run on  clusters managed by tools like Kubernetes, Mesos and Swarm.

Serverless computing is the next step in this evolution.  It addresses the following challenges.  Suppose I have a small computation that I want to run against some database at the end of each month. Or suppose I want to have the equivalent of a computational daemon that wakes up and executes a specific task only when certain conditions arise.  For example, when an event arrives in a particular stream or when a file in a storage container has been modified or when a timer goes off.  How do I automate these tasks without paying for a continuously running server?   Unfortunately, the traditional cloud computing infrastructure model would require me to allocate computing resources such as virtual machines or a microservice cluster and my daemon would be a continuously running process.   While I can scale my cluster of VMs up and down, I can’t scale it to zero without my daemon becoming unresponsive.  I only want to pay for my computing WHEN my computation is running.

This is not a totally new idea.  Paying only for the compute that we use goes back to early timesharing and persists with compute-hour “allocations” on supercomputers today.  And there are cloud services such as Azure Data Lake Analytics, Amazon Kinesis or the AWS API gateway, that charge you only for the computation that you use or data that you move and they do not require you to deploy server infrastructure to use them.

However, there is something deeper going on here and it has to do with triggers and another computational paradigm called “Function-as-a-Service”(FaaS). Unlike the serverless examples above which depend upon me invoking a specific well-defined service, FaaS allows a cloud user to define their own function, and then “register” it with the cloud and specify the exact events that will cause it to wake up and execute. As mentioned above, these event triggers can be tied to changes in state of a storage account or database, events associated with queues or streams of data from IoT devices, web API invocations coming from mobile apps.  Triggers can even be defined by steps in the execution of a workflow.   And, of course, the user only pays when and while the function is executing.

faas-diagram

Figure 1.  Function, triggers and output concept. (from markoinsights.com)

There have been two conferences that have focused on the state of serverless computing. The paper “Status of Serverless Computing and Function-as-a-Service (FaaS) in Industry and Research” by Geoffrey Fox, Vatche Ishakian, Vinod Muthusamy and Aleksander Slominski provides an excellent overview of many of the ideas, concepts and questions that surround serverless computing that surfaced at these conferences. In that paper they refer to an IBM tutorial that defines serverless FaaS as
1. short-running, stateless computation
2. event-driven applications
3. scales up and down instantly and automatically
4. based on charge-by-use

Notice that there is a distinction between FaaS and serverless FaaS and it has to do with item 4 above.   A good example of this is Google’s App Engine, which was arguably the first FaaS available from a commercial cloud.   In its current form App Engine can run in one of two modes.  In its standard mode, your applications run in a sandbox and you are charged only when the app is running.  In the “flexible” mode you deploy a container  and then specify the compute infrastructure needed in terms of CPU power, memory, disk and you are charged by the hour.  You could say that App Engine in running Flexible mode is server-lite, but clearly not fully serverless, while standard mode is truly serverless.

What are the serverless FaaS choices?

There are a number of FaaS implementations.  Some of these are used for research while others are commercial products.  The Status report refers to many of these and the slides for the workshop are on-line.   A good example of the research work is OpenLambda from the University of Wisconsin and first introduced dat HotCloud ’16.  Based on this experience the Wisconsin team described Pipsqueak, an experiment to reduce the deployment latencies caused by Python library initializations.   Ryan Chard described Ripple which is an excellent example of distributing event trigger management from the source to the cloud. Ripple has been designed and use for several significant science applications including beamline science (ALS and APS).  Another related technology is if-this-then-that IFTTT that is a service for chaining together other service.

Two other open source projects raise an interesting question about what is behind the curtain of serverless. Funktion  and Fission are both implementations of FaaS on top of Kubernetes. As we discuss serverless computing we must remember that there is a “server” somewhere. The basic infrastructure for serverless computing needs to run somewhere as a persistent service and hence a microservice platform like Kubernetes is a reasonable choice. This relates to the economics of severless computing and we return to that at the end of this report.

The most commonly referenced open source FaaS service is Apache OpenWhisk which was developed by IBM and is available on their Bluemix cloud as a service. The other commercial services include Google Function, Microsoft Azure Functions and Amazon Lambda. At the end of this article we will show some very simple examples of using some of these systems.

When can FaaS replace a mesh of microservices for building an app?

The Status paper also makes several other important observations. For example, they note that while serverless is great for the triggered example described above, it is not good for long-running or statefull applications like databases, deep learning training, heavy stream analytics, Spark or Hadoop analysis and video streaming. In fact, many large-scale cloud-native applications that have thousands of concurrent users require continuous use of massive networks of microservices. This will will not be based on serverless FaaS. However, there may be many cases where a user-facing application running in the cloud could be easily implemented with serverless FaaS rather than as a big microserivce deployment. What we do not know is where the cross-over point from a serverless FaaS implementation of an app to a full kubernetes based massive microservice deployment lies. This relates to the economic of FaaS (discussed briefly at the end of this article). The Cloud Native Computing Foundation has a working group on serverless computing that is addressing this topic.

There is one interesting example of using serverless FaaS to do massively parallel computing and it called pywren. The lovely paper “Occupy the Cloud: Distributed Computing for the 99%”  by Eric Jonas, Qifan Pu, Shivaram Venkataraman, Ion Stoica and Benjamin Recht describes the concepts in pywren which allow it to scale computation to thousands of concurrent function invocations achieving 40teraflops of compute performance. Pywren uses AWS Lambda in a very clever way: it serializes the computational function which are then passed to a lambda function to execute. We will return to Pywren in another post.

Computing at the Edge?

Perhaps the hottest topic in the general area of cloud computing is when the computing spills out of the cloud to the edge of the network. There are many reasons for this but most boil down to latency and bandwidth. A good example is the use cases that motivate the Ripple system described above. In order to generate a stream of events from sensors at the edge of the network, one needs very light weight computing that can monitor them and generate the events. In many cases it is necessary to preprocess the data in order to send the message to the cloud where a function will be invoked to respond. In some cases, the response must return to the source sooner than a remote invocation can respond because of the latencies involved. The computing at the edge may need to execute the function there and the actual communication with the cloud may be just a final log event or a trigger for some follow-up action.

Another possibility is that the functions can migrate to the places where they are needed. When you deploy the computing at the edge you also deploy a list of functions that that must be invoked. When the edge process starts up it could cache some of the function it needs locally. We expect there will be many variations on these ideas in the future.

A Tiny FaaS Tutorial

By their very nature FaaS systems are very simple to use: you write short, stateless function and tie them to triggers. We will take a quick look at three of these: AWS Lambda, Microsoft Function and IBM Bluemix OpenWhisk.

AWS Lambda Functions

Amazon was the first to introduce serverless functions as a service and it is very well integrated into their ecosystem of other services. Called Lambda functions, they can be easily created in a number of standard programming languages. Lambda functions can be associated with a variety of trigger events including changes to the state of a storage account, web service invocations, stream events and even workflow events.

We will illustrate Lambda with a simple example of a function that responds to Kinesis Stream events and for each event it adds an item to a dynamoDB table. Here is the python code for the function that accomplishes this task.

from __future__ import print_function
#import json
import base64
import boto3
def lambda_handler(event, context):
    dyndb = boto3.resource('dynamodb', region_name='us-west-2')
    table = dyndb.Table("lambdaTable")
    
    for record in event['Records']:
       #Kinesis data is base64 encoded so decode here
       payload=base64.b64decode(record["kinesis"]["data"])
       x = eval(str(payload))
       metadata_item = {'PartitionKey': x['PartitionKey'], 
                        'RowKey': x['RowKey'], 'title': x['text']}
       table.put_item(Item=metadata_item)

There are several critical items that are not explicit here.   We need to invoke the AWS Identity and Access Management (IAM) system to delegate some pemissions to our function and we will need to grab copies of the Amazon Resource Names (ARNs) for this and other objects. First we create a IAM role that will allow access to Kineses streams and DynamoDB. Using the IAM portal we created a role called:  “lambda-kinesis-execution-role” and attached two policies ‘AmazonDynamoDBFullAccess” and “AWSLambdaKinesisExecutionRole”. We then made a copy of the Role ARN.

The next step is to install this function into the lambda system. To do that we put the function above into a file called “ProcessKinesisRecords.py” and then zipped it. We then uploaded the zipped file to S3 (in the us-west-2) in the bucket “dbglambda”. With that we can create our function with the boto3 call from our laptop as:

lambdaclient = boto3.client('lambda')

response = lambdaclient.create_function(
    FunctionName='lambdahandler',
    Runtime='python2.7',
    Role='arn:aws:iam::066xx534:role/lambda-kinesis-execution-role',
    Handler='ProcessKinesisRecords.lambda_handler',
    Code={
        'S3Bucket': 'dbglambda',
        'S3Key': 'ProcessKinesisRecords.zip',
    }
    )

The final item we need is the link between this function and the Kinesis Stream service.
To do that we went to the portal for Kinesis and created a stream called “mylambdastream” and grabbed its ARN. Creating the binding is accomplished with the following.

response = lambdaclient.create_event_source_mapping(
    EventSourceArn=
        'arn:aws:kinesis:us-west-2:06xxx450734:stream/mylambdastream',
    FunctionName='lambdahandler',
    BatchSize=123,
    StartingPosition='TRIM_HORIZON',
)

We can verify the properties of the function we have created by looking at the AWS Lambda portal pages. As shown below, we can verify that our lambdahandler function does indeed have our stream as its trigger.

lambda1

Figure 2. AWS Lambda Function dashboard showing our trigger.

Finally we can invoke it by pushing an event to the kinesis stream. This is shown below.

client = boto3.client('kinesis')
record = "{'PartitionKey':'part2', 'RowKey': '444', 'text':'good day'}"
resp = client.put_record(StreamName='mylambdastream',
    Data=bytearray(record),
    PartitionKey='a')

Checking the DynamoDB portal will verify that the function has picked the message from Kinesis and deposited it in the database. The full details are in the notebook “simple-kinesis-lambda-sender.ipynb”.

Azure Functions

The Azure function portal has a large number of basic templates we can use to build our function as shown below.

azure-funcs1

Figure 3.  Azure Function Template samples

We have selected one of the Python examples that creates a simple web service. Bringing this up on the portal we see the code below.

import os
import json

postreqdata = json.loads(open(os.environ['req']).read())
response = open(os.environ['res'], 'w')
response.write("hello world to "+postreqdata['name'])
response.close()

To get this running, we now must go to the “integrate” page to provide one more detail: we set the authorization level to “anonymous” as shown below.

azure-func2

Figure 4.  Setting the authorization level for the trigger to “anonymous”.

The client is also simple and shown below.

import os
import json
import requests
import json

data = {}
data['name'] = 'goober'
json_data = json.dumps(data)
r = requests.post("https://pylook.azurewebsites.net/api/pylook", 
                   data=json_data)
print(r.status_code, r.reason)
print r.text

Using Azure Functions to pull data from a queue.

We now give another version of the function that reads messages from a queue and puts them in a table. There is no Python template for this one yet, so we will use JavaScript. To use template this we must first create a new storage account or use an existing one. Go to the storage account page and you will seefunction5

Figure 5. Creating a new table and queue in a storage account.

We click on Table and create a new table and remember its name. Then we go to Queues and create a new one and remember its name. Looking in the storage explorer should show for these items. Clicking on the table name should give us a picture of an empty table.

Go back to the portal main page and click + and look for “Function App” and click create. There is a table to fill in like the one below.

function4

Figure 6.  Creating a new function.

We give it a name and allow it to create a new resource group. For the storage we want to use the dropdown and look for the storage account name. (It is important that the storage account is in the same location as the function. We click create and wait for the function to appear on the function portal page. You should see it in your resource groups. Follow that link and you will see that it is running.

Go to the functions tab and hit “+”. It will ask you to pick one of the templates. At the top where it says “language” select Javascript and pick the one that is called QueueTrigger. This will load a basic template for the function. Now edit the template so that it looks like the following example.

function2

 

The main difference between this and the template is that we have added an output table and instructions to push three items into the table. The function is assuming that the items in the queue are of the form

{‘PartitionKey’: ‘part1’, ‘RowKey’: ’73’, ‘content’: ‘some data ‘}

Next we need to tie the queue to our queue and the table to our table. So click on “Integrate” on the left. You need to fill in the form so that it ties it to your stuff as illustrated below.

function3

Figure 8.  The association between the storage account queue and table service with the variable in our function.  Here we have highlighted the “show value” to verify that it has the right storage account.

You should see your storage account in the dropdown menu. Select it. And they add the Table name. You need to do the same for the AzureQueueStorage. Once this is done and your function is saved and the system is running your function should be instantiated and invoked as soon as you send it queue items. For that we have a simple python script in a Jupyter notebook. You can get from  https://SciEngCloud.github.io/py-functions-queue-driver.ipynb.
You will need to fill in your account key for the storage account, but then you should be able to step through the rest. The notebook runs a few tests and then sends 20 items to the queue. Using the Azure storage explorer we see the results in the table as shown below.

function1

Figure 9.   The view of the table after running the Jupyter Notebook

OpenWhisk and IBM Bluemix

OpenWhisk is the open source serverless function system developed by IBM that is also supported as a commercial service on their Bluemix cloud.  Like the others it is reasonably easy to use from their command-line tools, but the Bluemix portal provides an even easier solution.   Creating a function is as easy as selecting the runtime and downloading a template.  Figure xx below illustrates a simple function derived from template for a Python3 function.

action

Figure 10.  An OpenWhisk function that decodes a dictionary input and returns a dictionary.

Notice that the parameter to the function is a python dictionary (all Openwhisk messages are actually Json objects, which for Python are rendered as dictionaries).  While this template can be run as a web service in its present form, it is also possible to connect it to an external trigger.  To illustrate this we connected it to a trigger  that monitors activity in a Github account.  Using the portal it was easy to create the trigger (called “mygithubtrigger” and bind it to push and delete actions on a repository called dbgannon/whisk.   All this required was an access token which was easy available by logging in to the GitHub portal.   In the case of GitHub triggers the event returns a massive Json object that is a complete description of the repository.   In the action “myaction” we go two levels down in the dictionary and extract the official repository description and make that the response of the action to the event.

When a trigger fires you need a rule which bind the firing to an action.   We bound it to the trivial “myaction” example above.  The view from the portal of the rule is below

ruleview

Figure 11.   The rule view of our association of the trigger to the function

We next added a new file to the repository.  This activated the trigger and the rule invoked the action.   The portal has a nice monitor facility and the image is below.

monitor
Figure 12.  This is the “Monitor” view after the trigger has fired.

Finally, drilling down on the “myaction” log event we see the description of the GitHub repository we created.

json-data
Figure 13.  The output of the “myaction” function after a file was added to the GitHub repository.

Finally

These examples above are all very trivial.   The next thing to explore is how functions can be composed into workflows.   Each of the three systems has its own way of doing that and if we have time later we will show some examples of this capability.   We have also not disussed performance or cost which is greatly dependent on the rate at which your triggers are firing and the amount of work in each function execution.

The economics of serverless computing are also very interesting.  As we pointed out  earlier, for a cloud provider to offer a FaaS serverless capability it must be supported by actual infrastructure.   How can the provider afford this if it is only charging by the second?  There are two possible answers.   First if your FaaS is serving very large numbers of function per second then it will certainly pay for itself.   But there is another consideration.   Many of the big cloud providers are running vast microservice frameworks that support most, if not all of their big internal and publiclly available applications.  Running  a FaaS system on top of that is as easy as running any other microservice based application.  One only needs to set the cost per second of function evaluation high enough to cover the cost of that share of the underlying platform the FaaS system is using.   For Amazon Lambda “The price depends on the amount of memory you allocate to your function. You are charged $0.00001667 for every GB-second used.”  This amounts to $0.06 per GB hour.  So for a function that take 8 GB of memory to execute that is $.48 per hour, which is 4.8 times greater than the cost of an 8GB m4.large ec2 instance.   Consequently a heavily used FaaS system is a financial win and a lightly used one may have little impact on your infrastructure.

Of course, single executions of a FaaS function are limited to five minutes, so you would need 12 5 minute concurrent executions to reach an hour of execution time. Furthermore AWS give you 1 million invocations (at 100ms) each month for free or 400,000 GB-seconds per month free.  That is 111 GB hours per month for free.   This is not a bad deal and it would seem to indicate that Lambda is not a big drain on their infrastructure yet.

Acknowledgement:  Thanks go to Ryan Chard for helpful comments and suggestions on a draft of this post.

Azure’s new CosmosDB Planet-Scale Database

The Microsoft Azure team has recently released CosmosDB, an new entry to the cloud data storage management marketplace. Cosmos is actually the name of a data storage system that has been used internal to Microsoft for many years. That original Cosmos has now morphed into Azure Data Lake Storage (ADLS) and Analytics. ADLS is focused on Hadoop/HDFS compatible scalable analytics and, at the present time, it is available in only two of the US data centers. CosmosDB is related to the original Cosmos in name only and it is available at all of the data centers.

There is an excellent technical description [https://azure.microsoft.com/en-us/blog/a-technical-overview-of-azure-cosmos-db], so we will hit only a few highlights here.  CosmosDB has four main API components and a number of interesting properties. From the user’s perspective these components are

  1. DocumentDB; “a NoSQL document database service designed from the ground up to natively support JSON and JavaScript directly inside the database engine.”
  2. MongoDB; CosmosDB implements the MongoDB API so that users of that popular NoSQL database can easily move to CosmosDB.
  3. Azure Table Storage: a simple key-value table mechanism that has been part of Azure from it earliest days.
  4. Gremlin Graph Query API; a graph traversal language based on functional programming concepts.

There is a common “resource model” that unifies all of these capabilities. A database consists of users with varying permissions and containers. A container holds one of the three content types, document collection, tables or graphs. Special resources like stored procedures, triggers and user-defined-functions (UDFs) are also stored within the container. At the top level the user creates a cosmosDB database account and in doing so the user must pick one of these four APIs. In the examples we show here we focus on the DocumentDB API. In a later post we will look at Gremlin.

There are four important properties that every CosmosDB databased has.

  1. Global distribution: your database is replicated in any of 30+ different regions and you can pick these from a map on the Azure portal.
  2. Elastic scale-out: you can scale throughput of a container by programmatically provisioning throughput at a second or minute granularity. provision throughput (measured in using a currency unit called, Request Unit or RU).
  3. Guaranteed low latency: For a typical 1KB item, Azure Cosmos DB guarantees end-to-end latency of reads under 10ms and indexed writes under 15ms at the 99th percentile within the same Azure region.
  4. Five consistency models.
  5. A Comprehensive Service Level Agreement (SLA).

Global Distribution

CosmosDB has two forms of distribution. Each database is composed of one or more collections and every data collection is stored in a logical container that is distributed over one or more physical server partitions. And, of course, everything is replicated. Every item in a collection has a partition key and a unique ID. The partition key is hashed and the hash is used to make the assignment of that item to one of the physical servers associated with that container. This level of partitioning happens automatically when you create a collection and start filling it.

Global distribution is more interesting. When you first create a database it has an initial “write location” which refers to one of the 30 or so Azure regions around the globe. You can use the portal to say how your data is distributed to other regions. In the example we discuss at the end of this document our original region “North Central US”. We have used the “Replicate Data Globally” function, which gives us a map in the portal, where we selected three additional regions, “West Europe”, “South India” and “Brazil South” as shown in Figure 1 below.

geodistributed-db

Figure 1. Selecting three additional locations for a database using a map tool.

With a small (170MB) database of documents it took only a few minutes to replicate the data to these locations. The data collections in the replicate sites are considered “read locations”. Which means that a user in these locations will by default read data from the local replica. However if that remote user attempts a write the request is routed to the “write location”. To understand how long a remote reader takes to see an update from the remote write location we need to discuss the consistency models.

Elastic scale-out and The Cost of Computing

Every operation you do in CosmosDB requires bandwidth and computation. You can control the rate at which “energy” is consumed for your database with something called Request Units (RU) and RUs/sec and RUs/min. You specify the RUs/sec you are willing to “spend” on your database and CosmosDB will provision the resources to meet your throughput requirements.

Consistency Models

Consistency is one of the most difficult topics in distributed systems. In the case of distrubted or replicated databases, consistency the consistency model tell the user how changes in one copy of a database are reflected updates to other copies. By insisting that consistency be very strong, this may have an adverse impact on latencies and throughput. So the choice of consistence model can have a profound impact on application performance and cost.

One of the truly unique features of CosmosDB is that it gives the user a choice of five different consistency models. In order from the weakest to the strongest and in terms of RUs, the cheapest to most expensive they are:

  1. Eventual consistency. Eventual consistency is the weakest form of consistency wherein a client may get the values which are older than the ones it had seen before, over time. In the absence of any further writes, the replicas within the group will eventually converge.
  2. Consistent prefix. Consistent prefix level guarantees that reads never see out of order writes. If writes were performed in the order `A, B, C`, then a client sees either `A`, `A,B`, or `A,B,C`, but never out of order like `A,C` or `B,A,C`.
  3. Session consistency. When you connect to a cosmosDB database through its URL you are creating a session. Session consistency guarantees monotonic reads, monotonic writes, and read your own writes (RYW) guarantees for the duration of the session. In the Python API you create client object that encapsulates your connection to the database. The parameter “Consistency_level” has the default value “Session”. Reads in session consistency take a bit longer than consistent prefix which takes longer than eventual consistency.
  4. Bounded stateness. This model has two ways to insist on consistency. You can define a time interval such that beyond that interval of time from the present, the system will guarantee consistency. Alternatively you can specify a the upper bound on the number of reads that lag behind writes your reader can be. Beyond that point consistency is guaranteed. Of course the smaller you make the window, the more computing you may consume and delay you may encounter.
  5. Strong consistency. The most expensive and most complete. it is linearizable in that reads always returns the most recent writes. But it is limited to collections that are not geodistributed.

A word about the performance of these models. In the example at the end of this section we put the write region of the database in North America and replicated in three other locations with one in Western Europe. We then created a virtual machine in Western Europe and connected it to the database and verified by the IP address that it was connecting to the local replica. We then fired changes at the North America region and read with each of the consistency models. We were unable to detect any out of order reads and the responses were extremely fast. This is obviously not a full scientific study, but it was clear we had a system that performed very well. We also attempted a write to the local Western Europe “read location”. We were able to detect that the API call immediately dropped the read location connection and reconnected to the North American copy to do the write.

Comprehensive SLA

The service level agreement (SLA) for CosmosDB is actually a contract that Azure and you agree to when you create a database. It is indeed comprehensive. It covers guarantees for availability, throughput, consistency and latency giving you upper bounds for each measurable attribute of performance. As you are the one to specify how many RUs/sec you are willing to allocate, some of the temporal guarantees may be dependent upon this allocation not being exceeded. We will not go into it in detail here. Read it with your lawyer.

A look at Azure DocumentDB using Python and GDELT Data

To illustrate the basics of DocumentDB we will use a tiny part of the amazing GDELT event collection. This data collection is available on AWS S3 as well as Google’s BigQuery. The GDELT project (http://gdeltproject.org) is the brainchild of Kalev Leetaru from Georgetown University and it is an serious “Big Data” collection. GDELT’s collection is the result of mining of hundreds of thousands of broadcast print and online new sources from every corner of the world every day. What we will look at here is a microscopic window into the daily collection of news items. In fact, we will take a look at the news from from one day: June 30, 2017.

AWS keeps the data on S3 and downloading it is dead easy. The command

$ aws s3 cp s3://gdelt-open-data/events/20170630.export.csv . 

will download the 20MB June 30 2017 dataset as a CSV file. Each row of the file is a record of an “event” consisting of some 60 attributes that catalog a publication of a news item. Among the fields in the record are a unique identifier, a timestamp and the name and geolocation of two “actors” that are part of the event. In addition, there is a url a published news item about the event. The actors are derived from the even. Actor1 is called an initiator of the event and actor2 is a recipient or victim of the event. These actors are usually represented by the names of the cities or countries associated with the actual actors.

For example the story “Qatar’s defense minister to visit Turkey amid base controversy” (http://jordantimes.com/news/local/qatar’s-defence-minister-visit-turkey-amid-base-controversy) appeared in the Jordan Times. It describes an event where the Qatar defense minister visited Turkey as Turkey resists pressure from Saudi Arabia, the United Arab Emirates, Egypt and Bahrain to close bases after the five other nations pressed sanctions agains Qatar on June 5. This URL appears three time in the database for June 30. In one case, actor1 is Doha Qatar and actor2 is Ankora Turkey. In the second case actor1 is Riyadh, Saudi Arabia actor2 is Turkey. In the third case actor1 and actor2 are both Riyadh. Though the identity of the actor cities is based on an automated analysis of the story, their may be three different stories to analyze. However, only one URL has been selected to represent all three.

For our purposes we will describe this as a single event that links the three cities, Doha, Ankora and Riyadh. Many researchers have used this GDELT data to do deep political and social analysis of the world we live in, but that goal is way beyond what we intend here. What we will try to do in this example is to use CosmosDB to graphically illustrate different ways we can represent the events of June 30 and the cities they impact.

Creating the DocumentDB from the CSV file.

We will begin by creating a simple document database where each document is the JSON record for one line of the CSV file. Actually we will only include the URL, the two actor cities and their geolocations. To begin we must connect you client program to the CosmosDB system.

We use Python and we assume that the documented tools have been installed.

#run this if documentdb is not installed
#!pip install pydocumentdb
import csv
import pydocumentdb;
import pydocumentdb.document_client as document_client

We next create the database and the collection. But first we need an Azure CosmosDB account and is best done on the Azure portal. One that is there you can retrieve the account key so that we can create the collection.
Our account is called bookdockdb and our database is called ‘db3’ and the new collection will be called ‘gdelt’.

config = { 
    'ENDPOINT': 'https://bookdocdb.documents.azure.com',
    'MASTERKEY': 'your db key here',
    'DOCUMENTDB_DATABASE': 'db3',
    'DOCUMENTDB_COLLECTION': 'gdelt'
};
#we create the database, but if it is already there the "CreateDatabase will fail"
#In that case, we can query for it. 
#this is handy because we can have more than one collection in  a given database.
try:
    db = client.CreateDatabase({ 'id': config['DOCUMENTDB_DATABASE'] }, )
except:
    db_id = config['DOCUMENTDB_DATABASE']
    db_query = "select * from r where r.id = '{0}'".format(db_id)
    db = list(client.QueryDatabases(db_query))[0]

Next we will create the collection. Again, we may already have the collection there, so we can add new items to an existing collection but we need a different method to retrieve the collection handle.

options = {
    'offerEnableRUPerMinuteThroughput': True,
    'offerVersion': "V2",
    'offerThroughput': 400
}
try:
    collection = client.CreateCollection(db['_self'], 
         { 'id': config['DOCUMENTDB_COLLECTION'] }, options)
except:
    collection = next((coll for coll in client.ReadCollections(db['_self']) \
         if coll['id']==config['DOCUMENTDB_COLLECTION']))

The next step is to read the CSV file and add the new documents to the database collection.
We are going to do this one line at a time and keep only the time stamps, actor cities and geolocations (latitude and longitude) and the associated URL. We will also keep a separate list of the URLs for later use.

url_list = []
with open('20170630.export.csv') as csvfile:
    readCSV = csv.reader(csvfile, delimiter='\t')
    for row in readCSV:
        url_list.append(row[57])
        document1 = client.CreateDocument(collection['_self'],
            { 
             'id': row[0],
             'time_stamp': row[1],
             'time_stamp2': row[4],
             'city0_lat': row[46],
             'city0_lon': row[47],
             'city0_name': row[43],
              'city1_lat': row[53],
              'city1_lon': row[54],
              'city1_name': row[50],
              'url': row[57] 
              }
            )

The next thing we will do is to look at the URLs that involve a large number of cities (because they are often more interesting). Because we have the geolocations of each city we can draw a map and link the cities associated with the URL.

To do this we need to remove all the duplicates in our URL list using a standard Python trick: convert the list to a set and then back to a list.

s = set(url_list)
url_list = list(s)

We next query the database collection to find the cities associated with each URL. The best way to do this in the cloud is a simple map-reduce operation where we map each record to a pair consisting of a URL and a list of cities. We then invoke a reduce-by-key function to reduce all pairs with the same key to a single pair with that key with all of the lists associated with that key concatenated.
If we were doing more than a single day’s worth of data, It would be worth bringing Spark into the picture and using that for the map-reduce step. Instead we will do it sequentially using a data base query for each URL. The following function accomplishes that task.

def get_cities_for_url(url):
    db_query = {'query':"select * from r where r.url = '{0}'".format(url) }
    options = {}
    options['enableCrossPartitionQuery'] = True
    options['maxItemCount'] = 1000
    q = client.QueryDocuments(collection['_self'], db_query, options)
    results = list(q)
    cities = []
    for x in results:
        if x['city1_name'] != '':
            cities.append(x['city1_name']+" lat="+x['city1_lat'] + ' lon='+x['city1_lon'])
        if x['city0_name'] != '':
            cities.append(x['city0_name']+" lat="+x['city0_lat'] + ' lon='+x['city0_lon'])
    city_set = set(cities)
    cities = list(city_set)
    return cities

The function returns a list of strings where each string is a city and its geolocation. We have used the matplotlib Basemap tools to render the most interesting sets of cities where we define “interesting” to be those URL sets with more than 5 cities. In this case we have only plotted a few hundred of the city clusters.

gdelt-geoimage

Figure 2.   Cities linked by shared stories.

While this is colorful, it is not very informative. We can tell there was some events in Australia that involved Europe and the US (this turned out to be a scandal involving an important person in the catholic church) and there was a number of stories that involve North Korea, the US and China. Something happened in the Northwest that involved cities in the east coast of the US.n general the information content is low.

To create a different view, let’s transform the collection to one that reflects the set of cities associated with each URL. To accomplish that we call the get_cities_for_url() function above and create a new document that only three attributes: an ID, the URL and a list of cities associated with the URL. The new collection is called “cities”, but it is really more like a dictionary that associates the URLs with a list of cities.  (The Jupyter Notebook used to build this collection and illustrate the example below is available here.)

One thing we can do is to search for special words or phrases in the URLs and list the cities that are associated. To do this we can make use of another feature of DocumentDB and create a user defined function to do the string search. This is extremely easy and can be done from the Azure Portal. The functions are all java script. The data explorer tab for the “cities” collection and select the “New User Defined Function” tab. In our case we create a function “findsubstring” that searches for a substring “sub” in a longer string “s”.

function  findsubstring(s, sub){
    t = s.indexOf(sub)
    return t
}

We can now call this function in a database query as follows.

def checkInDB(name):
    db_query = {'query':"select r.url, r.cities from r where udf.findsubstring(r.url,'{0}')>0".format(name) }
    options = {}
    options['enableCrossPartitionQuery'] = True
    options['maxItemCount'] = 10000
    q = client.QueryDocuments(collection['_self'], db_query, options)
    results = list(q)
    return results

Picking a name from the news, “Ivanka”, we can see what stories and cities mentioned “Ivanka” on that day. The call checkInDB(“Ivanka”) returns

[{'cities': ['Oregon, United States', 
'White House, District of Columbia, United States',
'Buhari, Kano, Nigeria',
'Canyonville Christian Academy, Oregon, United States ', 
'Chibok, Borno, Nigeria', 
'Virginia, United States'],
'url': 'http://www.ghanaweb.com/GhanaHomePage/world/Trump-and-Ivanka-host-two-Chibok-girls-at-the-White-House-553963'}]

which is a story about a reception for two girls that were rescued from the Boko Haram in Africa. The list of associated cities include Canyonville Christian Academy, a private boarding school that has offered to continue their education.

A More Interactive Abstract View of the Data.

Another interesting approach to looking at the data is to consider stories that link together smaller cities in some ways. We are going to look for “local” story connections rather than big news items. More specifically, we will look at cities that appear in at least 3 but no more than 5 stories in the days news. This leaves out big cities like washington DC and big global headlines. Next we will build a graph that we can use to explore the stories and cities that share these more regional stories. There are actually 245 cities that appear in our restricted set.

In order to make our graph we need a dictionary that maps cities into the list of stories that mention it. Call that dictionary “stories” Our visualization has nodes that are cities and we will connect them by an edge if they appear in a story together. Let “linkcities” be the list of 245 cities describe above. We can then compute the edges using as follows.

edges = []
for i in range(0, len(linkcities)):
    for j in range(0, len(linkcities)):
        if i < j:             
		iset = set(stories[linkcities[i]])             
		jset = set(stories[linkcities[j]])             
		kset = iset.intersection(jset)             
		if len(kset)> 0:
                	edges.append((i,j))

The complete code for this example is given in this Jupyter notebook. We use the Plotly package to build and interactive graph. The nodes are distributed approximately over the surface of a sphere in 3D. You can rotate the sphere and mouse over nodes Doing so, shows the URLs for the stories associated with that town. The sphere appears as the image below.

city-graph

Figure 3.  Interactive city graph

A better way to see this is to interact with it. You can do this by going to the bottom panel in this file [https://SciEngCloud.github.io/city-graph.html].

Conclusion

Our experience with CosmosDB has been limited to using the DocumentDB API, but we were impressed with the experience. In particular, the ease with which one can globally distribute the data was impressive as were the selection of consistency protocols. As we said, we were unable to force the database to show temporary inconsistency even with the weakest protocol choice using very distant copies of the database. This should not be considered a fault. Our tests were too easy.  The examples above do not begin to stretch all the capabilities of CosmosDB, but we hope they provide a reasonable introduction for the Python programmer.)

One final comment. It seems CosmosDB was built on top of the Azure microservice container system. It would be very interesting to see more details of the implementation.

 

 

Containers for HPC. A look at Singularity.

Vanessa Sochat and I recently completed a little study of Singularity, the container system that brings the advantages of Docker-style encapsulation to HPC.  If you are interested this is now a supplementary chapter to the book “Cloud Computing for Science and Engineering” that is being published by MIT Press.  You can see the chapter here: https://cloud4scieng.org/singularity-a-container-system-for-hpc-applications/  The chapter has a high level overview and a brief tutorial showing how to build a singularity container for an MPI application and how to run it on an small cluster on AWS. 

Manifold Learning and Deep Autoencoders in Science

One way to think about machine learning is to view it as building a model of a system based on samples of data that are artifacts of that system.  This view does not resonate very well when the problem at hand is identifying photos containing sail boats, but it is apt when ML is applied to scientific data.

The data from scientific experiments is often takes the form of vectors in a very high dimensional space and we are looking for an underling organization of that data that reflects properties our system.   Manifold learning is based on the assumption that the system you are trying to model generates data that lies on or near a lower dimensional surface in the higher dimension coordinate space of the data.   Picture the surface of a sphere or a curve in 3-D.   If this manifold assumption about the data is true, it may be possible to “unfold’’ the surface so that a projection or other linear analysis makes the data easier to understand.

Autoencoders are deep neural networks that can be used to discover properties of the manifold of data that characterizes the system you are trying to model.  Illustrated below, autoencoders have an encoder phase and a decoder phase with a space of much lower dimension than the input in the middle.  By studying the properties of this lower dimensional space, we have a better view of the data manifold. You train an autoencoder to be the identity function.  

autoencoder

Recently, a lovely blog article A look at deep learning for science by Prabhat gave us an excellent overview of some uses of deep learning technology in science applications and several of these were applications of autoencoders.  Partly inspired by that article and by our own experiments with some very interesting images of neuron cells collected by Maryana Alegro at UCSF, we put together a little tutorial on autoencoders in science.   In the article we discuss two types of autoencoders: denoising and variational.  The variational autoencoder we examine is applied to the cell images in an effort to create a model that can be used to both classify samples as well generate new samples by following a path along the surface of the manifold. 

We have published this little study as a supplement to our chapter on machine learning in the book “Cloud Computing for Science and Engineering”.   The link to the article is here.

Cloud Computing for Science and Engineering

Ian Foster and I have just completed a final draft of a book that is designed to introduce the working scientist, engineer or student to cloud computing.  It surveys the technology that underpins the cloud, new approaches to technical problems enabled by the cloud, and the concepts required to integrate cloud services into scientific work.  Many of the blog posts that have appeared here have been reworked and, we hope, greatly improved and integrated into the text.  In addition the book contains introductions to the Globus data management and services infrastructure that has become widely used by the research community.

We have a website for the book https://Cloud4SciEng.org that contains draft chapters,  jupyter notebooks that illustrate most of the concepts and a collection of lecture slides for the tutorial at the IEEE International Conference on Cloud Engineering based on the material in the Book.  This collection will grow over time.  The book website will also contain updates to the book material as the current cloud technology evolves.

The Table of contents for  the book is below.   We look forward to your feedback.

Table of Contents

Acknowledgments
Preface
1 Orienting in the cloud universe

Part I. Managing data in the cloud

2 Storage as a service
3 Using cloud storage services

Part II. Computing in the cloud

4 Computing as a service
5 Using and managing virtual machines
6 Using and managing containers
7 Scaling deployments

Part III. The cloud as platform

8 Data analytics in the cloud
9 Streaming data to the cloud
10 Machine learning in the cloud
11 The Globus research data management platform

Part IV. Building your own cloud

12 Building your own cloud with Eucalyptus (with Rich Wolski)
13 Building your own cloud with OpenStack (with Stig Telfer)
14 Building your own SaaS

Part V. Security and other topics

15 Security and privacy
16 History, critiques, futures
18 Afterword: A discovery cloud?

Big Data Congress 1017- CFP

I wanted to help get the word out concerning the final call for papers for the 2017 IEEE Big Data Congress.   The deadline is February 28, 2017.  This meeting is part of an annual gang of meetings including the IEEE Cloud conference, the IEEE International Conference on Web Services and others.   The conferences are going to be in Honolulu, Hawaii June 25 – June 30, 2017.

The Big Data meeting will have four tracks: research, applications, short papers and special topics.  The special topics including vision papers to point out emerging challenges, papers that describe new  data sets and benchmarks,   experience and  surveys.

The full call for papers is here: http://www.ieeebigdata.org/2017/cfp.html.

Should be a good meeting.

 

 

 

 

Cloud-Native Applications – call for papers

The IEEE Cloud Computing Journal is going to publish a special issue on the topic of Cloud-Native applications. This is an extremely interesting topic and it cuts to the heart of what makes the cloud a platform that is, in many ways, fundamentally different from what we have seen before.

What is “cloud-native”? That is what we want you to tell us. Roger Barga from Amazon, Neel Sundaresan from Microsoft and this blogger have been invited to be guest editors. But we want you to tell us soon. The deadline is March 1, 2017. The papers do not need to be long (3,000 to 5,000 words) and some of the topics possible include:

  • Frameworks to make it easier for industry to build cloud-native applications;
  • Educational approaches and community based organizations that can promote cloud-native design concepts;
  • The role of open source for building cloud-native applications;
  • VM and container orchestration systems for managing cloud-native designs;
  • Efficient mechanisms to make legacy applications cloud-native;
  • Comparing applications – one cloud-native and the other not – in terms of performance, security, reliability, maintainability, scalability, etc.;

And  more.   please go to https://www.computer.org/cloud-computing/2016/09/27/cloud-native-applications-call-for-papers/  to read more.

 

CNTK Revisited. A New Deep Learning Toolkit Release from Microsoft

In a pair of articles from last winter (first article, second article) we looked at Microsoft’s “Computational Network Toolkit” and compared it to Google’s Tensorflow.   Microsoft has now released a major upgrade of the software and rebranded it as part of the Microsoft Cognitive Toolkit.  This release is a major improvement over the initial release.  Because these older articles still get a fair amount of web traffic we wanted to provide a proper update.

There are two major changes from the first release that you will see when you begin to look at the new release.   First is that CNTK now has a very nice Python API and, second, the documentation and examples are excellent.   The core concepts are the same as in the initial release.   The original programming model was based on configuration scripts and that is still there, but it has been improved and renamed as “Brain Script”.  Brain Script is still an excellent way to build custom networks, but we will focus on the Python API which is very well documented.

Installing the software from the binary builds is very easy on both Ubuntu Linux and Windows.   The process is described in the CNTK github site.    On a Linux machine, simply download the gziped tared binary and execute the installer.

$wget https://cntk.ai/'BinaryDrop/CNTK-2-0-beta2-0-Linux-64bit-CPU-Only.tar.gz’
$tar -xf CNTK-2-0-beta2-0-Linux-64bit-CPU-Only.tar.gz
$cd cntk/Scripts/linux
$./install-cntk.sh

This will install everything including a new version of Continuum’s Anaconda Python distribution.  It will also create a directory called “repos’’.   To start Jupyter in the correct conda environment do the following.

$source “your-path-to-cntk/activate-cntk"
$cd ~/repos/cntk/bindings/python/tutorials
$Jupyter notebook 

A very similar set of commands will install CNTK on your Windows 10 box. (If you are running Jupyter on a virtual machine or in the cloud you will need additional arguments to the Jupyter notebook command such as “-ip 0.0.0.0 –no browser” and then then you can navigate you host browser to the VM ip address and port 8888. Of course, if it is a remote VM you should add a password. ) What you will see is an excellent set of tutorials as shown in Figure 1.

jupyter-cntk

Figure 1.   CNTK tutorial Jupyter notebooks.

CNTK Python API

CNTK is a tool for building networks and the Python and Brain Script bindings are very similar in this regard.   You use the Python program to construct a network of tensors and then train and test that network through special operations which take advantage of underlying parallelism in the hardware such as multiple cores or multiple GPUs.   You can load data into the network through Python Numpy arrays or files.

The concept of constructing a computation graph for later execution is not new.   In fact, it is an established programming paradigm used in Spark, Tensorflow, and Python Dask.   To illustrate this in CNTK consider the following code fragment that creates two variables and a constructs a trivial graph that does matrix vector multiplication and vector addition.  We begin by creating three tensors that will hold the input values  to the graph and then tie them to the matrix multiply operator and vector addition.

import numpy as np
import cntk
X = cntk.input_variable((1,2))
M = cntk.input_variable((2,3))
B = cntk.input_variable((1,3))
Y = cntk.times(X,M)+B

In this X is a 1×2 dimensional tensor, i.e. a vector of length 2 and M is a matrix that is 2×3 and B is a vector of length 3. The expression Y=X*M+B yields a vector of length 3. However, no computation has taken place. We have only constructed a graph of the computation. To invoke the graph we input values for X, B and M and then apply the “eval’’ operator on Y. We use Numpy arrays to initialize the tensors and supply a dictionary of bindings to the eval operator as follows

x = [[ np.asarray([[40,50]]) ]]
m = [[ np.asarray([[1, 2, 3], [4, 5, 6]]) ]]
b = [[ np.asarray([1., 1., 1.])]]

print(Y.eval({X:x, M: m, B: b}))
array([[[[ 241.,  331.,  421.]]]], dtype=float32)

CNTK has several other important tensor containers but two important ones are

  • Constant(value=None, shape=None, dtype=None, device=None, name=”): a scalar, vector or other multi-dimensional tensor.
  • Parameter(shape=None, init=None, dtype=None, device=None, name=”): a variable whose value is modified during network training.

There are many more tensor operators and we are not going to go into them here.   However, one very important class is the set of operators that can be used to build multilevel neural networks.   Called the “Layers Library’’ they form a critical part of CNTK.    One of the most basic is the Dense(dim) layer which creates a fully connected layer of output dimension dim. As shown in Figure 2.

cntk-dense

Figure 2.   A fully connected layer created by the Dense operator with an implicit  3×6 matrix and a 1×6 vector of parameters labeled here M and B.   The input dimension is taken from the input vector V.  The activation here is the default (none), but it could be set to ReLu or Sigmod or another function.

There are many standard layer types including Convolutional, MaxPooling, AveragePooling and LSTM. Layers can also be stacked with a very simple operator called “sequential’’. Two examples taken directly from the documentation is a standard 4 level image recognition network based on convolutional layers.

with default_options(activation=relu):
    conv_net = Sequential ([
        # 3 layers of convolution and dimension reduction by pooling
        Convolution((5,5), 32, pad=True), MaxPooling((3,3), strides=(2,2)),
        Convolution((5,5), 32, pad=True), MaxPooling((3,3), strides=(2,2)),
        Convolution((5,5), 64, pad=True), MaxPooling((3,3), strides=(2,2)),
        # 2 dense layers for classification
        Dense(64),
        Dense(10, activation=None)
    ])

The other fun example is a slot tagger based on a recurrent LSTM network.

tagging_model = Sequential ([
    Embedding(150),         # embed into a 150-dimensional vector
    Recurrence(LSTM(300)),  # forward LSTM
    Dense(labelDim)         # word-wise classification
])

The Sequential operator can be thought of as a concatenation of the layers that in the given sequence.   In the case of the slot tagger network, we see two additional important operators: Embedding and Recurrence.

Embedding is used for word embeddings where the inputs are sparse vectors of size equal to the word vocabulary (item i = 1 if the word is the i-th element of the vocabulary and 0 otherwise) and the embedding matrix is of size vocabulary-dimension by, in this case, 150.     The embedding matrix may be passed as a parameter or learned as part of training.

The Recurrence operator is used to wrap the correct LSTM output back to the input for the next input to the network.

A Closer Look at One of Tutorials.

The paragraphs above are intended to give you the basic feel of what CNTK looks like with its new Python interface.  The best way to learn more is to study the excellent example tutorials.

CNTK 203: Reinforcement Learning Basics

CNTK version 1 had several excellent tutorials, but version 2 has the Python notebook versions of these plus a few new ones.  One of the newest demos is an example of reinforcement learning.   This application of Neural Nets was first described in the paper Human-level control through deep reinforcement learning, by the Google DeepMind group.  This idea has proven to be very successful in systems that learn to play games.  This topic has received a lot of attention, so we were happy to see this tutorial included in CNTK. The example is a very simple game that involves balancing a stick.   More specifically they use the cart-pole configuration from OpenAI.   As shown in figure 3, the system state can be described by a 4-tuple: position of the cart, its velocity, the angle of the pole and the angular velocity.   The idea of the game is simple.  You either push the cart to the left or the right and see if you can keep the stick vertical.   If you drift too far off course or the pole angle goes beyond an angle of 15 degrees, the game is over.   Your score is the total number of steps you take before failure. The full example is in the github repository and we are not going to go through all the code here.  The Jupyter notebook for this example is excellent, but if you are new to this topic you may find some additional explanation of value in case you decide to dig into it.cntk-cart-pole

Figure 3. Cart-Pole game configuration.

The part of reinforcement learning used here is called a Deep Q-Network. It uses a neural network to predict the best move when the cart is in a given state. This is done by implicitly modeling a function Q(s,a) which is the optimal future reward given state s and the action is a and where the initial reward is r. They approximate Q(s,a) using the “Bellmann equation” which describes how to choose action a in a given state s to maximize the accumulated reward over time based inductively on the same function applied to the following states s’.

cntk-bellmann

The parameter gamma is a damping factor that guarantees the recurrence converges. (Another excellent reference for this topic is the blog by Jaromír Janisch.) The CNTQ team approached this problem as follows. There are three classes.

  • Class Brain.    This hold our neural net and trainer.  There are three methods
    • Create() which is called at initialization.   It creates the network.   There are two tensor parameters: observation, which is used to hold the input state and q_target which is a tensor used for training.   The network is nice and simple:
      l1 = Dense(64, activation=relu)
      l2 = Dense(2)
      unbound_model = Sequential([l1, l2])
      model = unbound_model(observation)
      

      The training is by the usual stochastic gradient descent based on a loss measure.

      loss = reduce_mean(square(model - q_target), axis=0)
      meas = reduce_mean(square(model - q_target), axis=0)
      learner = sgd(model.parameters, lr,    
             gradient_clipping_threshold_per_sample=10)
      trainer = Trainer(model, loss, meas, learner)
      
    • Train(x, y)  which calls the trainer for batches of states x and predicted outcomes y which we will describe below
    • Predict(s) which invokes the model for state ‘s’ and returns a pair of optimal rewards given a left or right move.
  • Class Memory. This hold a record of recent moves.   This is used by the system to create training batches.  There are two methods
    • Add(sample configuration)  – adds a four tuple consisting of a starting state, an action and a result and a resulting  state tuple to a memory.
    • Sample(n) returns a random sample of n configurations samples from the memory.
  • Class Agent which is the actor that picks the moves and uses the memory to train the network.  There are three methods here.
    • Act(state) returns a 0 or 1 (left move or right move) that will give the best reward for the given state.     At first it just makes random guesses, but as time passes it uses the Predict method of the Brain class to select the best move for the given state.
    • Observe(sample configuration) records a configuration in the memory and keeps track of the time step and another parameter used by act.
    • Replay() is the main function for doing the learning.    This is the hardest part to understand in this tutorial. It works by grabbing a random batch of memorized configurations from memory.   What we will do is use the current model to predict an optimal outcome and use that as the next step in training the model.  More specifically for each tuple in the batch we want to turn it into a training sample so that the network behaves like the Bellmann equation.  A tuple consists of the start state, the action, the reward and the following state.   We can apply our current model to predict the award for the start state and also for the result state.  We can use this information to create a new reward tuple for the given action and start state that models the Bellmann recurrence.   Our training example is the pair consisting of the start state and this newly predicted reward.  At first this is a pretty poor approximation, but amazingly over time it begins to converge. The pseudo code is shown below.
      x = numpy.zeros((batchLen, 4)).astype(np.float32)
      y = numpy.zeros((batchLen, 2)).astype(np.float32)
      
      for i in range(batchLen):
          s, a, r, s_ = batch[i]
          # s = the original state (4 tuple)
          # a is the action that was taken
          # r is the reward that was given
          # s_ is the resulting state.
          # let t = the reward computed from current network for s 
          # and let r_ be the reward computed for state s_.
          # now modify t[a] = r + gamma* numpy.amax(r_) 
          # this last step emulated the bellmann equation
          x[i] = s
          y[i] = t
      self.brain.train(x,y)		
      

The final part of the program is now very simple. We have an environment object that returns a new state and a done flag for each action the agent takes. We simply run our agent until it falls out of bounds (the environment object returns done=True). If the step succeeded, we increment our score. The function to run the agent and to keep score is shown below.

def run(agent):
    s = env.reset()
    R = 0 
    while True:            
        a = agent.act(s)
	  s_, r, done, info = env.step(a)
        if done: # terminal state
            s_ = None
        agent.observe((s, a, r, s_))
        agent.replay() #learn from the past           
        s = s_
        R += r
        if done:
            return R

Each time we run “run” it learns a bit more.   After about 7000 runs it will take over 600 steps without failure.

The text above is no substitute for a careful study of the actual code in the notebook.  Also, as it is a notebook, you can have some fun experimenting with it.  We did.

Final Thoughts

CNTK is now as easy to use as any of the other deep learning toolkits.   While we  have not benchmarked its performance they claim it is extremely fast and it make good use of multiple GPUs and even a cluster of servers.    We are certain that the user community will enjoy using and contributing to its success.

Citation.

The team that first created CNTK should be cited.   I know there are likely many others that have contributed to the open source release in one way or another, but the following is the master citation.

Amit Agarwal, Eldar Akchurin, Chris Basoglu, Guoguo Chen, Scott Cyphers, Jasha Droppo, Adam Eversole, Brian Guenter, Mark Hillebrand, T. Ryan Hoens, Xuedong Huang, Zhiheng Huang, Vladimir Ivanov, Alexey Kamenev, Philipp Kranen, Oleksii Kuchaiev, Wolfgang Manousek, Avner May, Bhaskar Mitra, Olivier Nano, Gaizka Navarro, Alexey Orlov, Hari Parthasarathi, Baolin Peng, Marko Radmilac, Alexey Reznichenko, Frank Seide, Michael L. Seltzer, Malcolm Slaney, Andreas Stolcke, Huaming Wang, Yongqiang Wang, Kaisheng Yao, Dong Yu, Yu Zhang, Geoffrey Zweig (in alphabetical order), “An Introduction to Computational Networks and the Computational Network Toolkit“, Microsoft Technical Report MSR-TR-2014-112, 2014.

A Brief Look at Google’s Cloud Datalab

Google recently released a beta version of a new tool for data analysis using the cloud called Datalab.  In the following paragraphs we take a brief look at it through some very simple examples.  While there are many nice features of Datalab, the easiest way to describe it would be to say that it is a nice integration of the IPython Jupyter notebook system with Google’s BigQuery data warehouse.  It also integrates standard IPython libraries such as graphics and scikit-learn and Google’s own machine learning toolkit TensorFlow.

To use it you will need a Google cloud account.   The free account is sufficient if you are interested in just trying it out.   You may ask, why do I need a Google account when I can use Jupyter, IPython and TensorFlow on my own resources?    The answer is you can easily access BigQuery on non-trivial sized data collections directly from the notebook running on your laptop.  To get started go to the Datalab home page.   It will tell you that this is a beta version and give you two choices: you may either install the Datalab package locally on your machine or you may install it on a VM in the Google cloud.   We prefer the local version because it saves your notebooks locally.

 The Google public data sets that are hosted in the BigQuery warehouse are fun to explore.  They include

  • The names on all US social security cards for births after 1879.  (The table rows contain only the year of birth, state, first name, gender and number as long as it is greater than 5.  No social security numbers.),
  • The New York City Taxi trips from 2009 to 2015,
  • All stories and comments from “Hacker News”,
  • The US Dept of Health weekly records of diseases reported from each city and state from 1888 to 2013,
  • The public data from the HathiTrust and the Internet Book Archive,
  • The global summary of the day’s (GSOD) weather from the national oceanographic and atmospheric administration from 9000 weather stations between 1929 and 2016.

And more, including the 1000 genome database.

To run Datalab on your laptop you need to have Docker installed.   Once Docker is running then and you have created a Google cloud account and created a project, you can launch Datalab with simple docker command as illustrated in their quick-start guide.  When the container is up and running you can view it at http://localhost:8081.  What you see at first is shown in Figure 1.  Keep in mind that this is beta release software so you can expect it will change or go away completely. 

 datalab-first-view

Figure 1.  Datalab Top level view.

Notice the icon in the upper right corner consisting of a box with an arrow.   Clicking this allows you to login to the Google cloud and effectively giving your authorization to allow you container to run on your gcloud account.

The view you see is the initial notebook hierarchy.   Inside docs is a directory called notebooks that contain many great tutorials and samples.

A Few Simple Examples of Using Datalab

As mentioned above, one of the public data collections is the list of first names from social security registrations.   Using Datalab we can look at a sample of this data by using one of the built-in Bigquery functions as shown in Figure 2.

datalab-names

Figure 2.   Sampling the names data.

 

This page gives us enough information about the schema that we can now formulate a query.

In modern America there is a movement to “post-gender” names.   Typical examples cited on the web are “Dakota”, “Skyler” and  “Tatum”.   A very simple SQL query can be formulated to see how the gender breakdown for these names show up in the data.  In Datalab, we can formulate the query as shown in Figure 3.

datalab-dakotaplus2

Figure 3.   Breakdown by gender of three “post-gender” names.

As we can see, this is very nearly gender balanced.  A closer inspection using each of the three names separately show that “Skyler” tends to be ‘F’ and “Tatum” tends to ‘M’. On the other hand, “Dakota” does seem to be truly post-gender with 1052 ‘F’ and 1200 ‘M’ occurrences.

We can also consider the name “Billy” which, in the US, is almost gender neutral.   (Billy Mitchel was a famous World Work I general and also a contemporary Jazz musician.  Both male. And Billy Tipton and Billy Halliday were female musicians though Billy Halliday was actually named Billie and Billy Tipton lived her life as a man, so perhaps they don’t count.   We can ask how often Billy was used as a name associated with gender ‘F’ in the database?  It turns out it is most common in the southern US. We can then group these by state and create a count and show the top five.   The SQL command is easily inserted into the Datalab note book as shown in Figure 4.

datalab-billy

Figure 4.   Search for Billy with gender ‘F’ and count and rank by state of birth.

Rubella in Washington and Indiana

 A more interesting data collection is Center for Disease Control and Prevention dataset concerning diseases reported by state and city over a long period.   An interesting case is Rubella, which is virus also known as the “German measles”.   Through our vaccination programs it has been eliminated in the U.S. except for those people who catch it in other countries where it still exists.  But in the 1960s it was a major problem with an estimated 12 million cases in the US and a significant number of newborn deaths and birth defects.   The vaccine was introduced in 1969 and by 1975 the disease was almost gone.   The SQL script shown below is a slight modified version of one from the Google Bigquery example.   It has been modified to look for occurrences of Rubella in two states, Washington and Indiana, over the years 1970 and 1971.

%%sql --module rubella
SELECT
  *
FROM (
  SELECT
    *, MIN(z___rank) OVER (PARTITION BY cdc_reports_epi_week) AS z___min_rank
  FROM (
    SELECT
      *, RANK() OVER (PARTITION BY cdc_reports_state ORDER BY cdc_reports_epi_week ) AS z___rank
    FROM (
      SELECT
        cdc_reports.epi_week AS cdc_reports_epi_week,
        cdc_reports.state AS cdc_reports_state,
        COALESCE(CAST(SUM((FLOAT(cdc_reports.cases))) AS FLOAT),0) 
         AS cdc_reports_total_cases
      FROM
        [lookerdata:cdc.project_tycho_reports] AS cdc_reports
      WHERE
        (cdc_reports.disease = 'RUBELLA')
        AND (FLOOR(cdc_reports.epi_week/100) = 1970 
          OR FLOOR(cdc_reports.epi_week/100) = 1971)
        AND (cdc_reports.state = 'IN'
          OR cdc_reports.state = 'WA')
      GROUP EACH BY
        1, 2) ww ) aa ) xx
WHERE
  z___min_rank <= 500
LIMIT
  30000

We can now invoke this query as part of a python statement so we can capture its result as a pandas data frame and pull apart the time stamp fields and data values.

rubel = bq.Query(rubella).to_dataframe()
rubelIN = rubel[rubel['cdc_reports_state']=='IN']
                 .sort_values(by=['cdc_reports_epi_week'])
rubelWA = rubel[rubel['cdc_reports_state']=='WA']
                 .sort_values(by=['cdc_reports_epi_week'])
epiweekIN = rubelIN['cdc_reports_epi_week']
epiweekWA = rubelWA['cdc_reports_epi_week']
rubelINval = rubelIN['cdc_reports_total_cases']
rubelWAval = rubelWA['cdc_reports_total_cases']

At this point a small adjustment must be made to the time stamps.  The CDC reports times in epidemic weeks and there are 52 weeks in a year.    So the time stamps for the first week of 1970 is 197000 and the time stamp for the last week is 197051.  The next week is 197100.  To make these into timestamps that appear contiguous we need to make a small “time compression” as follows.

realweekI = np.empty([len(epiweekIN)])
realweekI[:] = epiweekIN[:]-197000
realweekI[51:] = realweekI[51:]-48

Doing the same thing with epiweekWA we now have the basis of something we can graph.  Figure 5 shows the progress of rubella in Washington and Indiana over two years.  Washington is the red line and Indiana is blue.   Note that the outbreaks occur about the same time in both states and that by late 1971 the disease is nearly gone.

datalab-rubella.png

Figure 5.   Progress of Rubella in Washington (red) and Indiana (blue) from 1970 through 1971.

Continuing the plot over 1972 and 1973 show there are flare-ups of the disease each year but their maximum size is diminishes rapidly.

(Datalab has some very nice plotting functions, but we could not figure out how to do a double plot, so we used the mathplot library with the “fivethirtheight” format.)

 

A Look at the Weather

 

From the national oceanographic and atmospheric administration we have the global summary of the day’s (GSOD) weather from the from 9000 weather stations between 1929 and 2016.   While not all of these stations were operating during that entire period, there is still a wealth of weather data here.   To illustrate it, we can use another variation on one of Google’s examples.  Let’s find the hottest spots in the state of Washington for 2015.   This was a particularly warm year that brought unusual droughts and fires to the state. The following query will list the hottest spots in the state for the year.

%%sql
SELECT
  max, (max-32)*5/9 celsius, mo, da, state, stn, name
FROM (
  SELECT
    max, mo, da, state, stn, name
  FROM
    [bigquery-public-data:noaa_gsod.gsod2015] a
  JOIN
    [bigquery-public-data:noaa_gsod.stations] b
  ON
    a.stn=b.usaf
    AND a.wban=b.wban
  WHERE
    state="WA"
    AND max

 The data set ‘gsod2015’ is the table of data for the year 2015.  To get a list that also shows the name of the station we need to do a join with the ‘station’ table over the corresponding station identifiers.  We order the results descending from the warmest recordings.    The resulting table is shown in Figure 6 for the top 10.

datalab-hotstations

Figure 6.   The top 10 hottest spots in Washington State for 2015

The results are what we would expect.   Walla Walla, Moses Lake and Tri Cities are in the eastern part of the state and summer was very hot there in 2015.   But  Skagit RGNL is in the Skagit Valley near Puget Sound.   Why is it 111 degrees F there in September?   If it is hot there what was the weather like in the nearby locations?   To find out which stations were nearby we can look at the stations on a map.   The query is simple but it took some trial and error.

%%sql --module stationsx
DEFINE QUERY locations
  SELECT FLOAT(lat/1000.0) AS lat, FLOAT(lon/1000.0) as lon, name
  FROM [bigquery-public-data:noaa_gsod.stations]
  WHERE state="WA" AND name != "SPOKANE NEXRAD"

It seems that the latitude and longitude for the Spokane NEXRAD station are incorrect and resolve to some point in Mongolia.  By removing it we get a good picture of the nearby stations as shown in Figure 7.

datalab-hotstations.png

Figure 7.   Location of weather stations in western Washington using the Bigquery chart map function.

 This is an interactive map, so we can get the names of the nearby stations.   There is one only a few miles away  called PADILLA BAY RESERVE and the next closest is BELLINGHAM INTL.   We can now compare the weather for 2015 at these three locations.

 To get the weather for each of these we need the station ID.   We can do that with a simple query.

%%sql
SELECT
  usaf, name
FROM [bigquery-public-data:noaa_gsod.stations] 
WHERE
    name="BELLINGHAM INTL" OR name="PADILLA BAY RESERVE" OR name = "SKAGIT RGNL"

Once we have our three station IDs we can use the follow to build a parameterized Bigquery expression.

qry = "SELECT max AS temperature, \
       TIMESTAMP(STRING(year) + '-' + STRING(mo) + \
       '-' + STRING(da)) AS timestamp \
FROM [bigquery-public-data:noaa_gsod.gsod2015] \
WHERE stn = '%s' and max /< 500 \
ORDER BY year DESC, mo DESC, da DESC"

stationlist = ['720272','727930', '727976']

dflist = [bq.Query(qry % station).to_dataframe() for station in stationlist]

 We can now render an image of the weather for our three stations as shown in Figure 8.

datalab-3stations-final.png

Figure  8.  Max daily temperatures for Skagit (blue), Padilla Bay (red) and Bellingham (yellow)

 We can clearly see the anomaly for Skagit in September and it is also easy to spot another problem in March where the instruments seemed to be not recording.   Other than that there is close alignment of the readings.

Conclusions

There are many features of Datalab that we have not demonstrated here.   The documentation gives an example of using Datalab with Tensorflow and the charting capabilities are more extensive than demonstrated here.  (The Google maps example here was not reproducible in any other notebook beyond the demo in the samples which we modified to run the code here.)  It is also easy to upload your own data to the warehouse and analyze it with Datalab.

 Using Datalab is almost addictive.  For every one of the data collections we demonstrated here there were many more questions we wanted to explore.  For example, where and when did the name “Dakota” start being used and how did its use spread?   Did the occurrence of Rubella outbreaks correspond to specific weather events?  Can we automate the process of detecting non-functioning weather instruments over the years where records exist?  These are all relatively standard data mining tasks, but the combination of Bigquery and IPython in the notebook format makes it fun.

 It should be noted that Datalab is certainly not the first use of the IPython notebook as a front-end to cloud hosted analysis tools.  The IPython notebook has been used frequently with Spark as we have previously described.  Those interested in an excellent overview of data science using Python should look at “Python Data Science Handbook”  by Jake VanderPlas which makes extensive use of IPython notebooks.     There are a variety of articles about using Jupyter on AWS  and Azure for data analytics.  A good one is by Cathy Ye about deep learning using Jupyter in the cloud where she gives detailed instruction for how to install Jupyter on AWS and deploy Caffe there.

.

Kubernetes and the Google Cloud Container Service: Fun with Pods of Celery.

In a previous post I talked about using Mesosphere on Azure for scaling up many-tasks parallel jobs and I promised to return to Kubernetes when I figured out how to bring it up.   Google just made it all very simple with their new Google Cloud container services.   And, thanks to their good tutorials, I learned about a very elegant way to do remote procedure calls using another open source tool called Celery.

So let me set the stage with a variation on an example I have used in the past.   Suppose we have 10000 scientific documents that are stored in the cloud.   I would like to use a simple machine learning method to classify each of these by topic.    I would like to do this quickly as possible and, because the analysis of each document is independent of the others, I can try to process as many as possible in parallel.  This is the basic “many task” parallel model and one of the most common uses of the cloud for scientific computing purposes.      To do this we will use the Celery distributed task queue mechanism to take a list of our documents and send each one to a work queue where the tasks will be parceled out to workers who will do the analysis and respond.

The Google Cloud Container Service and a few words about Kubernetes.

Before getting into the use of Celery and the analysis program, let’s describe the Google Cloud Container Service and a bit about Kubernetes.   Getting started is incredibly easy.   Google has a small free trial account which is sufficient to do the experiments described.  Go to http://cloud.google.com and sign in or create an account.  This will take you to the “console” portal. The first thing you need to do is to create a project. In doing so it will be assigned an id which is a string of the form “silicon-works-136723”.    There is a drop down menu on the left end of the blue banner at the top of the page.  (Look for three horizontal bars.) This allows you to select the type of service you want to work on.     Select the “Container Engine”.  On the “container clusters” page there is a link that will allow you to create a cluster.   With the free account you cannot make a very big cluster.   You are limited to about 4 dual core servers.   If you fill in the form and submit it, you will soon have a new cluster.  There is a special icon of the form “>_” in the blue banner.  Clicking on that icon will create an instance of a “Cloud Shell” that will be automatically authenticated to your account.   The page you will see should resemble Figure 1 below.   The next thing you need to do is to authenticate your cloud shell with your new cluster.   By selecting your container and clicking on the “connect” button to the right you will get the code to paste into the cloud shell.  The result should now look exactly like Figure 1.

google-container-engine

Figure 1.   Creating a Google cloud cluster and connecting the cloud shell to it.

Interacting with Kubernetes, which is now running on our small cluster, is through command lines which can be entered into the cloud shell.   Kubernetes has a different, and somewhat more interesting architectures than other container management tools.   The basic unit of scheduling in Kubernetes is  launching pods. A pod consists of a set of one or more Docker-style containers together with a set of resources that are shared by the containers in that pod.  When launched a pod resides on a single server or VM.   This has several advantages for the containers in that pod.   For example, because the containers in a pod are all running  on the same VM, the all share the same IP and port space so the containers can find each other through conventional means like “localhost”.   They can also share storage volumes that are local to the pod.

To start let’s consider a simple single container pod to run the Jupiter notebook.  There is a standard Docker container that contains Jupyter and the scipy software stack.  Using the Kubernetes control command kubectl we can launch Jupyter and expose its port 8888 with the following statement.

$ kubectl run jupyter --image=jupyter/scipy-notebook --port=8888

To see that it is up and running we can issue the command “kubectl get pods” which will return the status of all of our running pods.   Though we have launched jupyter it is still not truly visible.   To do that we will associate a load balancer with the pod.   This will expose the port 888 to the open Internet.

$ kubectl expose deployment jupyter --type=LoadBalancer

Once that has been run you can get the IP address for jupyter from the “LoadBalancer Ingress:” field of the service description when you run the following.   If it doesn’t appear, try again.

$ kubectl describe services jupyter

One you have verified that it is working at that address on port 8888, you should shut it down immediately because, as you can see, there is no security with this deployment.  Deleting a deployment is easy.

$ kubectl delete deployment jupyter

 There is another point that one must be aware of when building containers that need to directly interact with the google cloud APIs.  To make this work you will need to get application default credentials to run in your container.   For example if you container is going to interact with the storage services you will need this.   To get the default application credentials follow the instructions here.  We will say a few more words about this below.

The Analysis Example in detail.

Now to describe  Celery and how to use Celery and Kubernetes in the many-task scenario described above.

To use Celery we start with our analysis program.   We have previously described the analysis algorithms in detail in another post, so we won’t duplicate that here.  Let’s start by assuming we have a function predict(doc) that takes a document as a string as an argument and returns a string containing the result from our trained machine learnging classifiers.  Our categories are “Physics”, “Math”, “Bio”, “Computer Science” and “Finance” and the result from each classifier is simply the category that that classifier determines to be the most likely correct answer.

Celery is a distributed remote procedure call system for Python programs.   The Celery view of the world is you have a set of worker processes running on remote machines and a client process that is invoking functions that are executed on the remote machines.   The workers and the clients all coordinate through a message broker running somewhere else on the network.

Here we use a RabbitMQ service that is running on a Linux VM on the NSF JetStream cloud as illustrated in Figure 2.

kubernetes-jetstream-setup

Figure 2.  Experimental Configuration with Celery workers running on Kubernetes in the Google Cloud Container Service,  the RabbitMQ broker running in a VM on the NSF Jetstream Cloud and a client program running as a notebook on a laptop.

The code block below illustrates the basic Celery worker template.    Celery is initialized with a constructor that takes the name of the project and a link to the broker service which can be something like a Redis cache or MongoDB.   The main Celery magic is invoked with a special Python “decorator” associated with the Celery object as shown in the predictor.py file below.

from celery import Celery
app = Celery('predictor', backend='amqp')

#Now initialize and load all the data structure that will be constant 
#and recused for each analysis.  In our case this will include
#all the machine learning models that were trained on the data 
#previously. And create a main worker function to invoke the models.  
def invokeMLModels(statement):
    ....
	return analysis
	
#define the functions we will call remotely here
@app.task
def predict(statement):
	prediction = invokeMLModels(statement)
	return [prediction]

What this decorator accomplishes is to wrap the function in a manner that it can be invoked by a remote client.   To make this work we need create a Celery worker from our predictor.py file with the command below which registers a worker instance as a listener on the RabbitMQ queue.

>celery worker -A predictor -b 'amqp://guest@brokerIPaddr'

Creating a client program for our worker is very simple.   It is similar to the worker template except that our version of the predict( ) function does nothing because we are going to invoke it with the special Celery apply_async( ) method that will push the argument to the broker queue and return control immediately to the client.   The object that is returned from this call is similar to what is sometimes called a “future” or a “promise” in the programming language literature.  What it is a placeholder for the returned value.   Once we attempt to evaluate the get() method on this object our client will wait until a reply is returned from the remote worker that picked up the task.

from celery import Celery
app = Celery('predictor', broker='amqp://guest@brokerIPaddr', backend='amqp')

@app.task
def predict(statement):
	return ["stub call"]
	
res = predict.apply_async(["this is a science document ..."])

print res.get()

Now if we have 10000 documents to analyze we can send them in sequence to the queue as follows.

#load all the science abstracts into a list
documents = load_all_science_abstracts()

res = []
for doc in documents:
   res.append(predict.apply_async([doc])

#now wait for them all to be done
predictions = [result.get() for result in res]
#now do an analysis of the predictions

Here we push each analysis task into the queue and save the async returned objects in a list.  Then we create a new list by waiting for each prediction value to be returned.   Our client can run anywhere there is Internet access.  For example this one was debugged on a Jupyter instances running on a laptop.  All you need to do is “pip install celery” and run Juypter.

There is much more to say about Celery and the interested reader should look at the Celery Project site for the definitive guide. Let us now turn to using this with Kubernetes.  We must first create a container to hold the analysis code and all the model data.   For that we will need a Docker file and a shell script to correctly launch celery one the container is deployed.   For those actually interested in trying this, all the files and data are in OneDrive here.   The Docker file shown below has more than we need for this experiment.

# Version 0.1.0
FROM ipython/scipystack
MAINTAINER yourdockername "youremail"
RUN easy_install celery
RUN pip install -U Sphinx
RUN pip install Gcloud
RUN easy_install pattern
RUN easy_install nltk
RUN easy_install gensim
COPY bookproject-key.json /
COPY models /
COPY config /
COPY sciml_data_arxiv.p /
COPY predictor.py /
COPY script.sh /
ENTRYPOINT ["bash", "/script.sh"]

To build the image we first put all the machine learning configuration files in a directory called config and all the learned model files in a directory called models.  At same level we have the predictor.py source.   For reasons we will explain later we will also include the full test data set: sciml_data_arxiv.p. The Docker build starts with the ipython/scipystack container.   We then use easy_install to install Celery as well as four packages used by the ML analyzers: pattern, nltk and gensim.   Though we are not going to use the Gcloud APIs here, we include them with a pip install.   But to make that work we need an updated copy of Sphinx.  To make the APIs work we would need our default client authentication keys.   They are stored in a json file called bookproject-key.json that was obtained from the Gcloud portal as described previously.   Finally we copy all of the files and directory to the root path ‘/’.  Note that the copy from a directory is a copy of the all contained files to the path ‘/’ and not to a new directory.   The ENTRYPOINT runs our script which is shown below.

cp /predictor.py .
export C_FORCE_ROOT='true'
export GOOGLE_APPLICATION_CREDENTIALS='/bookproject-key.json'
echo $C_FORCE_ROOT
celery worker -A predictor -b $1

Bash will run our script in a temp directory, so we need to copy our predictor.py file to that directory.  Because our bash is running as root, we need to convince Celery that it is o.k. to do that.  Hence we export C_FORCE_ROOT as true.  Next, if we were using the Gcloud APIs we need to export the application credentials.   Finally we invoke celery but this time we use the -b flag to indicate that we are going to provide the IP address of the RabbitMQ amqp broker as a parameter and we remove it from the explicit reference in the predictor.py file.  When run the predictor file will look for all the model and configuration data in ‘/’.   We can now build the docker image with the command

>docker build -t “yourdockername/predictor” .

And we can test the container on our laptop with

>docker run -i -t “yourdockername/predictor ‘amqp://guest@rabbitserverIP’

Using “-i -t” allows you to see any error output from the container.   Once it seems to be working we can now push the image to the docker hub.   (to use our version directly, just pull dbgannon/predictor)

We can now return to our Google cloud shell and pull a version of the container there.   If we want to launch the predictor container on the cluster, we can do it one at a time with the “kubectl run” command.  However Kubernetes has a better way to do this using a pod configuration file where we can specify the number of pod instances we want to create.  In the file below, which we will call predict-job.json we specify a job name, the container image in the docker hub,   and the  parameter to pass to the container to pass to the shell script.   We also specify the number of pods to create.   In this case that is 6 as identified in the “parallelism” parameter.

apiVersion: batch/v1
kind: Job
metadata:
   name:predict-job
spec:
   parallelism: 6
   template:
        metadata:
            name: job-wq
       spec:
            containers:
                  - name: c
                  image: dbgannon/predictor
                  args: ["amqp://guest@ipaddress_of_rabbitmq_server"]
          restartPolicy: OnFailure

One command in the cloud shell will now launch six pods each running our predictor container.

$kubectl create -f predict-job.json 

Some Basic Performance Observations.

When using many tasks system based on a distributed worker model there are always three primary questions about the performance of the system.

  1. What is the impact of wide-area distribution on the performance?
  2. How does performance scale with the number of worker containers that are deployed? More specifically, if we N workers, how does the system speed up as N increases?    Is there a point of diminishing return?
  3. Is there a significant per/task overhead that the system imposes?  In other words, If the total workload is T and if it is possible to divide that workload into k tasks each  of size T/k , then what is the best value of k that will maximize performance?

Measuring the behavior of a Celery application as a function of the number workers is complicated by a number of factors.   The first concern we had was the impact of widely distributing the computing resources on the overall performance.   Our message broker (RabbitMQ) was running in a virtual machine in Indiana on the JetStream cloud.   Our client was running a Jupyter notebook on a laptop and the workers were primarily on the Midwest Google datacenter and on a few on other machines in the lab.   We compared this to a deployment where all the workers, the message broker and the client notebook were all running together on the Google datacenter.   Much to our surprise there was little difference in performance between the two deployments.   There are two ways to view this result.   One way is to say that the overhead of wide area distribution was not significant.   The other way to say this is that the overhead of wide area distribution was negligible compared to other performance problems.

A second factor that has an impact on performance as a function of the number of workers is the fact that a single Celery worker may have multiple threads that are responding to asynchronous function calls.   While we monitored the execution we noticed that the number of active threads in one worker could change over time. This made performance somewhat erratic.  Celery’s policy is that it will never have more threads than the number of available cores, so to limit the thread variability we ran workers in container pods on VMs with only one core.

Concerning the question of the granularity of the work partitioning we configured the program so that a number of documents could be processed in one invocation and this number could be set remotely.    By taking a set of 1000 documents and a fixed set of workers, we divided the document set into blocks of size K where K ranged from 1 to 100.   In general, larger blocks were better because the number of Celery invocations was smaller, but the difference was not great.   Another factor involved Celery’s scheduling for deciding which worker get the next invocation.   For large blocks this was not the most efficient because this left holes in the execution schedule when workers were occasionally idle while another was over scheduled.   For very small blocks these holes tended to be small.   We found that a value of K=2 gave reasonably consistent performance.

Finally to test scalability we used three different programs.

  1. The document topic predictor described above where each invocation classified two documents.
  2. A simple worker program that does no computation but just sleeps for 10 seconds before returning a “hello world” string.
  3. A worker that computes part of the Euler sequence sum(1/i**2, i=1..n) where n = 109 .   Each worker computes a block of 107 terms of the sequence and the 100 partial results are added together to get the final result  (which approximates pi2/6  to about 7 decimal places).

The document predictor is very computational intensive and uses some rather large data matrices for the trained machine learning models.   The size of these arrays are about 150 megabytes total.  While this does fit in memory, the computation is going to involve a great deal of processor cache flushing and there may be memory paging effects.   The example that computes the Euler sum requires no data other than the starting point index and the size of the block to sum.   It is pure computation and it will have no cache flushing or memory paging effects, but it will keep the CPU very busy.   The “sleep” example leaves the memory and the CPU completely idle.

We ran all three with one to seven workers.   (6 workers using 6 cores from the small Google demo account and one on another other remote machine).    To compare the results, we computed the time for each program on one worker and plotted the speed-up ratio for 2, 3, 4, 5, 6 and 7 workers.   The results are shown in the graphs below.

predictor-euler-sleep

Figure 3.  Performance as speed-up for each of the three applications with up to 7 workers.

As can be seen, the sleeper scales linearly in the number of workers.   In fact, when executed on multi-core machines it is almost super-linear because of the extra threads that can be used.  (It is very easy for a large number of threads to sleep in parallel.)   On the other hand, the predictor and the Euler examples reached a maximum speed up with around five workers.  Adding more worker pods to the servers did not show improvement because these applications are already very compute intensive.    This was a surprise as we expect all three experiments to scale well beyond seven workers.  Adding more worker pods to the servers did not show improvement because these applications are already very compute intensive.   When looking for the cause of this limited performance, we considered the possibility that the RabbitMQ broker was a bottleneck, but our previous experience with it has allowed us to scale applications to dozens of concurrent reader and writers.   We are also convinced that the Google Container Engine performed extremely well and it was not the source of any of these performance limitations. We suspect (but could not prove) that the Celery work distribution and result gathering mechanisms have overheads that limit scalability as the number of available workers grows.

Conclusion

Google has made it very easy to deploy containerized applications using Kubernetes on their cloud container service.  Kubernetes has some excellent architectural features that allow multiple containers to be co-located on a single server within a pod.   We did not have time here to demonstrate this, but their documentation gives some excellent examples.

Celery is an extremely elegant way to do remote procedure calls in Python.  One only needs to define the function and annotate it with a Celery object.   It can then be remotely invoked with an asynchronous call that returns control to the caller.  A future like object is returned.  By calling a special method on the returned object the caller will pause until the remote call completes and the value is provided to the caller.

Our experiments demonstrated that Celery has limited scalability if it is used without modification and with the RabbitMQ message broker.   However, celery has many parameters and it may be possible that the right combinations will improve our results.  We will report any improved results we discover in a later version of this document.