Category Archives: cloud computing

The State of the Cloud for Science -2018

Introduction

This post is based on a talk I prepared for the Scientific Cloud Computing Workshop at HPDC 2018.

Two years ago, Ian Foster and I started writing  “Cloud Computing for Science and Engineering”.   That book covers fundamental cloud tools and computational models, but there are some topics we alluded to but did not explore fully because they were still on the horizon.  In other cases, we were simply clueless about changes that were coming. Data center design and cloud services have evolved in some amazing ways in just two years and many of these changes represent opportunities for using cloud technology in scientific research.

Whose cloud?

Any  discussion of cloud computing in science leads to the question of definition.  What defines a cloud for science?   For example, the European Open Science Cloud (EOSC) is a European-wide virtual environment for data sharing and collaboration.  That project will involve multiple data archives, research labs and HPC centers, commercial service providers and EU agencies and funding.  It is truly an important project.  However, my focus here is on the fundamental technologies that are driving hardware and software innovation, and these tend to come from a combination of academic, open source and commercial providers.   The most ubiquitous commercial clouds are:

  • Amazon Web Services (AWS) – 40% of all commercial cloud resources on the planet,
  • Microsoft Azure – about 50% of AWS but growing,
  • Google Cloud – a solid and growing third place,
  • IBM Bluemix – growing very fast and in some measures bigger now that Google.

There are many more, smaller or more specialized providers: Salesforce, DigitalOcean, Rackspace, 1&1, UpCloud, CityCloud, CloudSigma, CloudWatt, Aruba, CloudFerro, Orange, OVH, T-Systems.

There are a number of smaller cloud systems that have been deployed for scientific research.  They  include Aristotle, Bionimbus, Chameleon, RedCloud, indigo-datacloud, EU-Brazil Cloud,  and the NSF JetStream.  The advantage of these research clouds is that they can be optimized for use by a specific user community in ways not possible in a commercial cloud.  For example, Chameleon is funded by the US NSF to support basic computer systems research at the foundational level which is not possible when the foundation is proprietary.

Are these clouds of any value to Science?

When the first commercial clouds were introduced in 2008 the scientific community took interest and asked if there was value there.  In 2011 the official answer to this question seemed to be  “no”.  Two papers (see end node 1) described research experiments designed to address this question.   The conclusion of both papers was that these systems were no match for traditional supercomputers for running MPI-based simulation and modeling.   And, in 2010, they were correct.   Early cloud data centers were racks of off-the-shelf PCs and the networks had terrible bisection bandwidth and long latencies.   They were no match for a proper HPC cluster or supercomputer.

Over the last few years, others have recognized a different set of roles for the cloud in science that go beyond traditional supercomputer simulation.   The biology community was quick to adopt cloud computing especially when it is necessary to do large scale analysis on thousands of independent data samples.  These applications ranged from metagenomics to protein folding.   These computations could each fit on a single server, so network bandwidth is not an issue and, using the scale of the cloud, it is easy to launch thousands of these simultaneously.   Hosting and sharing large public scientific data collections is another important application.   Google, AWS, Microsoft and other have large collections and they also are also providing new ways to host services to explore this data.

However, there are at least three additional areas where the cloud is a great platform for science.

Analysis of streaming data

Microsoft’s AI for earth project (Figure 1) looks at the application of streaming data from sources such as satellites to do land cover analysis,  sensors on and above farm land to improve agriculture outcomes and crowd sourced data to help understand biodiversity.

urban0

Figure 1.  Applications of streaming include land cover analysis, using sensor data for improving agriculture and biodiversity.   From  https://www.microsoft.com/en-us/aiforearth

The internet of things is exploding with sensor data as are on-line experiments of all types.  This data will be aggregated in edge computing networks that do initial analysis with results fed  to the cloud systems for further analysis.   Urban Informatics is a topic that has emerged as critical to the survival of our cities and the people who live in them.  Sensors of all types are being deployed in cities to help understand traffic flow, microclimates, local pollution and energy waste.  Taken together this sensor data can paint a portrait of the city that planners can use to guide its future.  Streaming data and edge computing is a topic that will involve the growing capabilities and architecture of the cloud.  We will return to this later in this document.

Interactive big data exploration

Being able explore and interact with data is a critical component of science.   If it fits on our laptop we can use tools like Matlab, Excel or Mathematica to conduct small computational experiments and visualize the results.  If the data is too big it must be stored on something bigger than that laptop.   Traditional supercomputers are up to the task of doing the analysis, but because they are designed around batch computing there are not well suited to interactive use.   The cloud exists to host services that can be used by thousands of simultaneous users.   In addition, there is a new generation of interactive data analysis tools that are cloud-ready for use on very large data collections.  This collection of tools includes Spark and Python Dask.   In both cases these tools can be driven by the open-source Jupyter studio which provides a powerful, interactive compute and visualization tool.  The commercial providers have adapted Jupyter and its interactive computational model into their offerings.   Google has Cloud Datalab (Figure 2), Amazon uses Jupyter with its SageMaker Machine Learning platform and Microsoft provide a special data science virtual machine that runs Jupyter Hub so that teams of users can collaborate.

google-datalab

Figure 2.  Google’s Cloud Data lab integrates SQL-like queries to be combined with Python code and visualization to a Jupyter based web interface. https://cloud.google.com/datalab/

Being able to interact with data at scale is part of the power of the cloud.   As this capability is combined with advanced cloud hosted machine learning tools and other services, some very promising possibilities arise.

The quest for AI and an intelligent assistant for research

The commercial clouds were originally built to  host web search engines.   Improving those search engines led to a greater engagement of the tech companies with machine learning.   That work led to deep learning which enabled machine language translation,  remarkably strong spoken language recognition and generation and image analysis with object recognition.  Many of these  capabilities rival humans in accuracy and speed.  AI is now the holy grail for the tech industry.

One outcome of this has been the proliferation of voice-driven digital assistants such as Amazon’s Echo, Microsoft’s Cortana, Apple’s Siri and Google Assistant.   When first introduce these were novelties, but as they have improved their ability to give us local information, do web searching, keep our calendars has improved considerably.   I believe there is an opportunity for science here.

Ask the question “what would it take to make Alexa or Cortana help with my research?”   The following use cases come to mind.

  1. Provide a fast and accurate search of the scientific literature for a given specific scientific concept or topic and not just a keyword or specific author. Then ask who is working on or has worked on this topic?  Is there public data in the cloud related to experiments involving this topic?  Translate and transcribe related audio and video.
  2. Understand and track the context of your projects.
  3. Help formulate, launch and monitor data analysis workflows. Then coordinate and catalog results.  If state-space search is involved, automatically narrow the search based on promising findings.
  4. Coordinate meetings and results from collaborators.

If I ask Alexa to do any of this now, she would politely say “Sorry.  I can’t help you with that.”  But with the current rate of change in cloud AI tools, ten years seems like a reasonable timeframe.

siri-grandson

Figure 3.  Siri’s science geek grandson.

Technical Revolutions in the Cloud

Two of the three scenarios above are here now or very close.   The third is some ways off.   There have been three major changes in cloud technology in the past five years and some aspects of these changes are true game-changers for the industry.    The first, and most obvious is the change in scale of cloud deployments. The two leaders, AWS and Azure are planetary in scale.  This is illustrated in Figure 4 below.

cloud-scale

Figure 4.  A 2016 map of cloud coverage from Atomia.com.  https://www.atomia.com/2016/11/24/ comparing-the-geographical-coverage-of-aws-azure-and-google-cloud/ There is some inaccuracy here because AWS and Azure define regional data centers differently, so counting the dots is not a good comparison.   In addition, data centers are now under construction in South Africa and the Middle East.

This map does not include all the data centers run by the smaller cloud providers.

Cloud Services

A major departure from the early days of the cloud, where scientists focused on storage and servers, has been an explosion in pay-by-the-hour cloud hosted services.  In addition to basic IaaS the types of services available now are:

  • App services: Basic web hosting, mobile app backend
  • Streaming data: IoT data streams, web log streams, instruments
  • Security services: user authentication, delegation of authorization, privacy, etc.
  • Analytics: database, BI, app optimization, stream analytics
  • Integrative: networking, management services, automation

In addition, the hottest new services are AI machine learning services for mapping, image classification, voice-to-text and text-to-voice services and text semantic analysis.   Tools to build and train voice activated bots are also now widely available.   We will take a look at two examples.

A planet scale database

The Azure Cosmos DB is a database platform that is globally distributed.   Of course, distributing data across international boundaries is a sensitive topic, so the Cosmos platform allows the database creator to pick the exact locations you want copies to reside.   When you create an instance of the database you use a map of Azure data centers and select the locations as shown in Figure 5.

cosmos1

Figure 5.  Cosmos DB.  A database created in central US and replicated in Europe, South India and Brazil.

The database can support 4 modes: Documents, key-value Graph and NoSQL.  In addition, there are five different consistency models the user can select: eventual, consistent prefix, session, bounded stateless and strong consistency all with 99.9% guarantee of less than 15ms latency.  My own experiments validated many these claims.

Cloud AI Services

The commercial clouds are in a race to see who can provide the most interesting and useful AI services on their cloud platform.   This work began in the research laboratories in universities and companies over the past 25 years, but the big breakthroughs came when deep learning models trained on massive data collections began to reach levels of human accuracy.  For some time now, the public cloud companies have provided custom virtual machines that make it easy for technically sophisticated customers to use state of the art ML and neural network tools like TensorFlow, CNTK and others.  But the real competition is now to provide services for building smart applications that can be used by developers lacking advanced training in machine learning and AI. We now have speech recognition, language translation, image recognition capabilities that can be easily integrated into web and mobile applications.

We gave this a try with services that use a technique called Transfer Learning to make it possible to re-train a deep neural network to recognize objects from a narrow category using a very small training set.   We chose images of galaxies and used the services of IBM Watson, Azure and Amazon.   Figure 6 illustrates the results from IBM’s tool.  The results were surprisingly good.

galaxies

Figure 6.  IBM’s Watson recognizing previously unseen images of three different galaxies.  The details of this study are here: https://esciencegroup.com/2018/02/16/cloud-services-for-transfer-learning-on-deep-neural-networks/

The Revolution in Cloud Service Design

Making all of these services work, perform reliably and scale to thousands of concurrent users forced a revolution in cloud software design.    In order to support these applications, the tech companies needed a way to design them so that they could be scaled rapidly and updated easily.   They settled on a design pattern that based on the idea of breaking the applications into small stateless components with well defined interfaces.   Statelessness meant that a component could be replaced easily if it crashed or needed to be upgraded.   Of course, not everything can be stateless, so state is saved in cloud hosted databases.   Each component was a “microservice” and it could be built from containers or functions.  This design pattern is now referred to as “cloud native” design.   Applications built and managed this way include Netflix, Amazon, Facebook, Twitter, Google Docs, Azure CosmosDB, Azure Event hub, Cortana, Uber.

microserviceFigure 7.  Conceptual view of microservices as stateless services communicating with each other and saving needed state in distribute databases or tables.

To manage applications that required dozens to hundreds of concurrently running microservice you need a software foundation or container orchestration system to monitor the services and schedule them on available resources.  Several candidates emerged and are used. Siri, for example, is composed of thousands of microservices running on the Apache Mesos system.   Recently cloud providers have settled on a de-facto standard container orchestrator built by Google and released as open source called Kubernetes.   It is now extremely easy for any customer to use Kubernetes on many cloud deployments to launch and manage cloud native applications.

Serverless Functions

The next step in the cloud software evolution was the introduction of “serverless functions”.   The original idea of cloud computing involved launching and managing a virtual machine.  However, suppose you want to have a cloud-based application whose sole job is to wait for an event to trigger some action.  For example, monitor a file directory and wait for a change such as the addition of a new file.  When that happens, you want to send email to a set of users alerting them of the change.   If this is a rare event, you don’t want to have to pay for an idle virtual machine that is polling some service looking for a change.  Amazon was the first to introduce the concept of a function-as-a-service.   With AWS Lambda, you only need to describe the function in terms of the trigger event and the output actions it takes when the even appears.  As illustrated in Figure 8, there are many possible triggers and corresponding output channels.

lambda

Figure 8.   From Amazon AWS.   The triggers and outputs of a lambda function.

In addition to AWS Lambda, Azure Functions, Google Functions, IBM OpenWhisk are similar systems.  OpenWhisk is now open source.  Another open source solution is Kubeless that allow you to deploy a lambda-like system on top of your Kubernetes cluster.   These serverless systems let you scale up and down extremely rapidly and automatically.   You can have hundreds of instances responding to events at once.  And the cost is based on charge-by-use models.  AWS has  introduced AWS Fargate which allows any containerized application to run in serverless mode.

The Edge and the Fog

The frontier of cloud computing is now at the edge of the network.  This has long been the home of content distribution systems where content can be cached and access quickly, but now that the Internet-of-Things (IoT) is upon us, it is increasingly important to do some computing at the edge.  For example, if you have a thousand tiny sensors in a sensitive environment or farm and you need to control water from sprinklers or detect severe weather conditions, it is necessary to gather the data, do some analysis and signal an action.   If the sensors are all sending WIFI messages they may be routable to the cloud, but a more common solution is to provide local computing that can do some event preprocessing and response while forwarding summary data to the cloud.  That local computing is called the Edge, or if a distributed systems of edge servers, the Fog.

If serverless functions are designed to respond to signals, that suggests that it should be possible to extend them to run in the edge servers rather than the cloud.  AWS was the first to do this with a tool called GreenGrass that provides a special runtime system that allows us to push/migrate lambda functions or microservices from the data center to the edge.   More recently Microsoft has introduced Azure IoT Edge which is built on open container technologies.  Using an instance open source Virtual Kubelet deployed on the edge devices we can run our Kubernetes containers to run on the edge.  You can think of a Kubelet as the part of Kubernetes that runs on a single node. This enables Kubernetes clusters to span across the cloud and edge as illustrated in Figure 9.

edge-cloud

Figure 9.  shows a sketch of migrating containerized functions to edge function.   That way our IOT devices can communicate with locally deployed microservices.  These microservices can communicate with cloud-based services.  The edge function containers can also be updated and replaced remotely like any other microservice.

The Evolution of the Data Center

As mentioned at the beginning of this paper, the early days (2005) of cloud data center design systems were based on very simple server and networks were designed for outgoing internet traffic and not bisectional bandwidth for parallel computing.   However, by 2008 interest in performance began to grow.  Special InfiniBand sub-networks were being installed at some  data centers.  The conventional dual-core servers were being replaced by systems with up to 48 cores and multiple GPU accelerators.  By 2011 most of the commercial clouds and a few research clouds had replaced traditional network with software defined networking.  To  address the demand of some of its customers, in 2017 Microsoft added  Cray® XC™  and Cray CS™ supercomputers to a few data centers and then acquired the company cycle computing.

From 2016 we have seen progress focused on performance and parallelism.   The driver of this activity has been AI and, more specifically, the deep neural networks (DNNs) driving all the new services.  There are many types of DNNs but two of the most common are convolutional, which look like a linear sequence of special filters, and recurrent networks which, as the name implies, are networks with a feedback component.   And there are two phases to neural network design and use.  The first is the training phase which requires often massive parallelism and time.  But it is usually an off-line activity.  The second phase is called inference and it refers to the activity of evaluating the trained network on classification candidates.  In both the convolutional and recurrent network inference boils down to doing a very large number of matrix-vector and matrix-matrix multiplies where the coefficients of the matrix are the trained model and the vector represent the classification candidates.      To deliver the performance at scale that was needed by the AI services it was essential to do these operations fast.  While GPUs were good, more speed was needed.

Google’s Tensor Processing Unit

In 2015 Google introduced the Tensor Processing Unit (TPU) and the TPU2 in 2017.

google-tf1

google-tf2

Figure 10.  Above, Google data center.  https://www.nextplatform.com/2017/05/22/hood-googles-tpu2-machine-learning-clusters/  Below architecture of Google Tensor Processing Unit TPU. From “In-Datacenter Performance Analysis of a Tensor Processing Unit”, Norman P. Jouppi et al.​  ISCA 2017  https://ai.google/research/pubs/pub46078

Figure 10 illustrates several racks of TPU equipped servers and the functional diagram of the TPU.  One of the key components is the 8-bit matrix multiply capable of delivering 92 TeraOps/second (TOPS).   (It should be noted that DNNs can be well trained on less than IEEE standard floating-point standards and floating point systems with small mantissa are popular.)  The multiply unit uses a systolic algorithm like those proposed for VLSI chips in the 1980s.

Microsoft’s Brainwave

In 2011 a small team in Microsoft Research led by Doug Burger began looking at the use of FPGAs to accelerate the search result ranking produced by Bing.   Over several iterations they arrived at a remarkable design that allowed them to put the FPGA between the network and the NIC so that the FPGA could be configured into separate plane of computation that can be managed and used independently from the CPU (see Figure 11).   Used in this way groups of FPGAs could be configured into a subnetwork to handle tasks such as, database queries and  inference stage of deep learning in addition to Bing query  optimization.

brainwave0

Figure 11.  The Brainwave architecture.

brainwave

Figure 12. The brainwave software stack for mapping a DNN to one or more FPGAs. From BrainWave_HOTCHIPS2017.pptx, Eric Chung, et. al., https://vneetop.wordpress.com/ 2017/10/28/accelerating-persistent-neural-networks-at-datacenter-scale/

The team also built a software stack that could really make this possibility a reality.   What makes much of this possible is that the models for DNNs are all based on flow graphs which describe sequences of tensor operations.  As shown in Figure 12 above, the flow graphs can be compiled to a graph internal representation that can be split and partitioned across one or more FPGAs.  They refer to the result as a hardware microservice.   Just recently Mary Wall [see endnote 2] wrote a nice blog about the teams work on using this idea to do deep learning inference on land cover maps.  Each compiled inference hardware microservice is mapped to a single FPGA, but they used 800 of the inference instances in parallel with 80 VMs to process 20 terabytes of aerial imagery into land cover data for the entire United States.   It took only about 10 minutes for a total cost of $42. [see endnote3] Mary Wall’s code is in the blog and available in Github.

Conclusion

Cloud data centers are getting high performance networks (with latencies of only a few microseconds in the case of Azure Brainwave) and immense computing capacity such as the tensor processing capability of Google’s TPU.  At the same time designers of supercomputers are having to deal with more failure resilience and complexity in the design of the first exascale supercomputers.  For the next generation exascale systems the nodes will be variations on a theme of multicore and GPU-style accelerators.

Observed from a distance, one might conclude the architectures of cloud data centers and the next generation of supercomputers are converging.  However, it is important to keep in mind that the two are designed for different purposes. The cloud is optimized for fast response for services supporting many concurrent globally distributed clients. Supers are optimized for exceptionally fast execution of programs on behalf of a small number of concurrent users.   However, it may be the case that an exascale system may be so large that parts of it can run many smaller parallel jobs at once.  Projects like Singularity provide a solution for running containerized application on supercomputers in a manner similar to the way microservices are run on a cloud.

Possible Futures

The continuum: edge+cloud+supercomputer

There are interesting studies showing how supercomputers are very good at training very large, deep neural networks.  Specifically, NERSC scientists have show the importance of this capability in many science applications[4]. However, if you need to perform inference on models that are streamed from the edge you need the type of edge+cloud strategy described here.   It not hard to imagine scenarios where vast numbers of instrument streams are handled by the edge and fed to inference models on the cloud and those models are being continuously improved on a back-end supercomputer.

A data garden

In the near future, the most important contribution clouds can make to science is to provide access to important public data collections.  There is already reasonable start.   AWS has an opendata registry that has 57 data sets covering topics ranging from astronomy to genomics.   Microsoft Research has a Data Science for Research portal with a curated collection of datasets relating to human computer interaction, data mining, geospatial, natural language processing and more.  Google cloud has a large collection of public genomics datasets.  The US NIH has launch three new cloud data and analytics projects.  They include the Cancer Genomics Cloud led by the Institute for Systems Biology with Google’s cloud, FireCloud from the Broad Institute also using Google’s cloud and Cancer Genomics Cloud (CGC), powered by Seven Bridges.   These NIH facilities also provide analytics frameworks designed to help research access and effective use the resources.

I am often asked about research challenges in cloud computing that student may wish to undertake.   There are many.  The fact that the IEEE cloud computing conference being held in San Francisco in July received nearly 300 submissions shows that the field is extremely active.   I find the following topics very interesting.

  1. Find new ways to extract knowledge from the growing cloud data garden.   This is a big challenge because the data is so heterogeneous and discovery of the right tool to use to explore it requires expert knowledge.  Can we capture that community knowledge so that non-experts can find their way?  What are the right tools to facility collaborative data exploration?
  2. There are enormous opportunities for systems research in the edge-to-cloud-to-supercomputer path.  How does one create a system to manage and optimize workflows of activities that span this continuum?  Is there a good programming model for describing computations involving the edge and the cloud?  Can a program be automatically decomposed into the parts that are best run on the edge and the parts on cloud?  Can such a decomposition be dynamically adjusted to account for load, bandwidth constraints, etc.?
  3. Concerning the intelligent assistant for research, there are a number of reasonable projects short of build the entire thing.  Some may be low hanging fruit, and some may be very hard.  For example, ArXiv, Wikipedia and Google search and Bing are great for discovery but in different ways.   Handling complex queries like “what is the role of quantum entanglement in the design of a quantum computer?” should lead to a summary of the answer with links.   There is a lot of research on summarization and there are a lot of sources of data.  Another type of query is “How can I access data on genetic indicators related to ALS?”  Google will go in the right direction, but it takes more digging to find data.

These are rather broad topics, but progress on even the smallest part may be fun.

[1] L. Ramakrishnan, P. T. Zbiegel, S. Campbell, R. Bradshaw, R. S. Canon, S. Coghlan, I. Sakrejda, N. Desai, T. Declerck, and A. Liu. Magellan: Experiences from a science cloud. In 2nd International Workshop on Scientific Cloud Computing, pages49–58., ACM, 2011.

P. Mehrotra, J. Djomehri, S. Heistand, R. Hood, H. Jin, A. Lazanoff, S. Saini, and R. Biswas. Performance evaluation of Amazon EC2 for NASA HPC applications. In 3rd Workshop on Scientific Cloud Computing, pages 41–50. ACM, 2012

[2] https://blogs.technet.microsoft.com/machinelearning/2018/05/29/how-to-use-fpgas-for-deep-learning-inference-to-perform-land-cover-mapping-on-terabytes-of-aerial-images/ a blog by Mary Wall, Microsoft

[3] https://blogs.microsoft.com/green/2018/05/23/achievement-unlocked-nearly-200-million-images-into-a-national-land-cover-map-in-about-10-minutes/  from Lucas Joppa – Chief Environmental Scientist, Microsoft

[4] https://www.hpcwire.com/2018/03/19/deep-learning-at-15-pflops-enables-training-for-extreme-weather-identification-at-scale/

Parallel Programming in the Cloud with Python Dask

I am always looking for better ways to write parallel programs.  In chapter 7 of our book “Cloud Computing for Science and Engineering” we looked at various scalable parallel programming models that are used in the cloud.   We broke these down into five models: (1) HPC-style “Single Program Multiple Data” (SPMD) in which a single program communicates data with copies of itself running in parallel across a cluster of machines, (2) many task parallelism that uses many nearly identical workers processing independent data sets, (3) map-reduce and bulk synchronous parallelism in which computation is applied in parallel to parts of a data set and intermediate results of a final solution are shared at well defined, synchronization points,  (4) graph dataflow transforms a task workflow graph into sets of parallel operators communicating according to the workflow data dependencies and (5) agents and microservices  in which a set of small stateless services process incoming data messages and generate messages for other microservices to consume.  While some applications that run in the cloud are very similar to the batch style of HPC workloads, parallel computing in the cloud is often driven by different classes application requirements.  More specifically, many cloud applications require massive parallelism to respond external events in real time.  This includes thousands of users that are using apps that are back-ended by cloud compute and data.   It also includes applications that are analyzing streams of data from remote sensors and other instruments.   Rather than running in batch-mode with a start and end, these applications tend to run continuously.

A second class of workload is interactive data analysis.   In these cases, a user is exploring a large collection of cloud resident data.   The parallelism is required because the size of the data: it is too big to download and if you could the analysis would be too slow for interactive use.

We have powerful programming tools that can be used for each of the parallel computing models described above but we don’t have a single programming tool that support them all.   In our book we have used Python to illustrate many of the features and services available in the commercial clouds.  We have taken this approach because Python and Jupyter are so widely used in the science and data analytics community.  In 2014 the folks at Continuum (now just called Anaconda, Inc) and a several others released a Python tool called Dask which supports a form of parallelism similar to at least three of the five models described above.  The design objective for Dask is really to support parallel data analytics and exploration on data that was too big to keep in memory.   Dask was not on our radar when we wrote the drafts for our book,  but it certainly worth discussing now.

Dask in Action

This is not intended as a full Dask tutorial.   The best tutorial material is the on-line YouTube videos of talks by Mathew Rocklin from Anaconda.   The official  tutorials from Anaconda are also available.  In the examples we will discuss here we used three different Dask deployments.  The most trivial (and the most reliable) deployment was a laptop installation.  This worked on a Windows 10 PC and a Mac without problem.  As Dask is installed with the most recent release of Anaconda, simply update your Anaconda deployment and bring up a Jupyter notebook and “import dask”.    We also used the same deployment on a massive Ubuntu linux VM on a 48 core server on AWS.  Finally, we deployed Dask on Kubernetes clusters on Azure and AWS.

Our goal here is to illustrate how we can use Dask to illustrate several of the cloud programming models described above.    We begin with many task parallelism, then explore bulk synchronous and a version of graph parallelism and finally computing on streams.  We say a few words about SPMD computing at the end, but the role Dask plays there is very limited.

Many Task Parallelism and Distributed Parallel Data Structures

Data parallel computing is an old important concept in parallel computing.  It describes a programming style where a single operation is applied to collections of data as a single parallel step. A number of important computer architectures supported data parallelism by providing machine instructions that can be applied to entire vectors or arrays of data in parallel.  Called Single instruction, multiple data (SIMD) computers, these machines were the first supercomputers and included the Illiac IV and the early Cray vector machines.  And the idea lives on as the core functionality of modern GPUs.   In the case of clusters computers without a single instruction stream we traditionally get data parallelism by distributed data structures over the memories of each node in the cluster and then coordinating the application of the operation in a thread on each node in parallel.   This is an old idea and it is central to Hadoop, Spark and many other parallel data analysis tools.   Python already has a good numerical array library called numpy, but it only supports sequential operations for array in the memory of a single node.

Dask Concepts

Dask computations are carried out in two phases.   In the first phase the computation is rendered into a graph where the nodes are actual computations and the arcs represent data movements.   In the second phase the graph is scheduled to run on a set of resources.  This is illustrated below.  We will return to the details in this picture later.

dask-workflow

Figure 1.  Basic Dask operations: compile graph and then schedule on cluster

There are three different sets of “resources” that can be used.   One is a set of threads on the host machine.   Another is a set of process and the third is a cluster of machines.   In the case of threads and local processes the scheduling is done by the “Single machine scheduler”.   In the case of a cluster it called the distributed cluster.  Each scheduler consumes a task graph and executes it on the corresponding host or cluster.   In our experiments we used a 48 core VM on AWS for the single machine scheduler. In the cluster case the preferred host is a set of containers managed by Kubernetes.   We deployed two Kubernetes clusters:  a three node cluster on Azure and a 6 node cluster on AWS.

Dask Arrays, Frames and Bags

Python programmers are used to numpy arrays, so Dask takes the approach to distributing arrays by maintaining as much of the semantics of numpy as possible.  To illustrate this idea consider the following numpy computation that creates a random 4 by 4 array, then zeros out all elements lest than 0.5 and computes the sum of the array with it’s transpose.

x = np.random.random((4,4))
x[x<0.5] = 0
y = x+x.T

We can use Dask to make a distributed version of the same matrix and perform the same computations in parallel.

Import dask.array as da
x = da.random.random(size = (4,4), chunks =(4,1))
x[x<0.5] = 0
y = x+x.T

The important new detail here is that we give explicit instructions on how we want the array to be distributed by specifying the shape of the chunks on each node.   In this case we have said we want each “chunk” to be a 4×1 slice of the 4×4 array.   We could have partitioned it into square blocks of size 2×2.   Dask takes care of managing each chunk and the needed communication between the processes that handle each chunk.   The individual chunks are managed on each thread/process/worker as numpy arrays.

As stated above, there are two parts to a dask computation.   The first phase is the construction of a graph representing the computation involving each chunk. We can actually take a look at the graph.   For example, in the computation above we can use the “visualize()” method as follows.

y = x+x.T
y.visualize()

big-transpose

Figure 2.   Sample Dask Graph for x+x.T

The nodes represent data or operations and the lines are data movements from one node to another.  As can be seen this is a rather communication intensive graph.   This is becase the transpose operation requires element on the rows (which are distributed) must be moved to columns on the appropriate node to do the addition.  The way we chunck the array can have a huge impact on the complexity of the distributed computation.  For example, 2×2 chuncking makes this one very easy.   There are 4 chunks and doing the transpose involves only a simple swap of the “off diagonal” chunks.   In this case the graph is much simpler (and easier to read!)

small-transpose

Figure 3.  Task graph for x+x.T with 2×2 chunking of data

The second step for Dask is to send the graph to the scheduler to schedule the subtasks and execute them on the available resources. That step is accomplished with a call to the compute method.

y.compute()

Dask arrays support almost all the standard numpy array operations except those that involve complex communications such as sorting.

In addition to numpy-style arrays, Dask also has a feature called Dask dataframes that are distributed versions of Pandas dataframes.   In this case each Dask dataframe is partitioned by blocks of rows where each block is an actual Pandas dataframe.  In other words, Dask dataframes operators are wrappers around the corresponding Pandas wrappers in the same way that Dask array operators are wrappers around the corresponding numpy array operators.    The parallel work is done primarily by the local Pandas and Numpy operators working simultaneously on the local blocks and this is followed by the necessary data movement and computation required to knit the partial results together.  For example, suppose we have a dataframe, df, where each row is a record consisting of a name and a value and we would like to compute the sum of the values associated with each name.   We assume that names are repeated so we need to group all records with the same name and then apply a sum operator.  We set this up on a system with three workers.  To see this computational graph we write the following.

df.groupby(['names']).sum().visualize()

groupby

Figure 4.  Dataframe groupby reduction

As stated earlier, one of the motivations of Dask is the ability to work with data collections that are far too large to load on to your local machine.   For example, consider the problem of loading the New York City taxi data for an entire year.    It won’t fit on my laptop.   The data for is for 245 million passenger rides and contains a wealth of information about each ride.  Though we can’t load this into our laptop we can ask dask to load it from a remote repository into our cloud and automatically partition it using the read_csv function on the distrusted dataframe object as shown below.

taxi1

Figure 5.  Processing Yellow Cab data for New York City

The persist method moves the dataframe into memory as a persistent object that can be reused without being recomputed.  (Note:  the read_cvs method did not work on our kubernetes clusters because of a missing module s3fs in the dask container, but it did work on our massive shared memory VM which has 200 GB of memory.)

Having loaded the data we can now follow the dask demo example and compute the best hour to be a taxi driver based on the fraction of tip received for the ride.

taxi3

Figure 6.  New York City cab data analysis.
As you can see, it is best to be a taxi driver about 4 in the morning.

A more general distributed data structure is the Dask Bag that can hold items of less structured type than array and dataframes.   A nice example http://dask.pydata.org/en/latest/examples/bag-word-count-hdfs.html illustrates using Dask bags to explore the Enron public email archive.

Dask Futures and Delayed

One of the more interesting Dask operators is one that implements a version of the old programming language concept of a future   A related concept is that of lazy evaluation and this is implemented with the dask.delayed function.   If you invoke a function with the delayed operator it simply builds the graph but does not execute it.  Futures are different.    A future is a promise to deliver the result of a computation later.  The future computation begins executing but the calling thread is handed a future object which can be passed around as a proxy for the result before the computation is finished.

The following example is a slightly modified version of one of the demo programs.   Suppose you have four functions

def foo(x):
   return result
def bar(x):    
   return result
def linear(x, y):
   return result
def three(x, y, z):
   return result

We will use the distributed scheduler to illustrate this example. We first must create a client for the scheduler. Running this on our Azure Kubernetes cluster we get the following.

 
from dask.distributed import Client
c = Client()
c

azure-scheduler

To illustrate the delayed interface, let us build a graph that composes our example functions

from dask import visualize, delayed
i = 3
x = delayed(foo)( I )
y = delayed(bar)( x )
z = delayed(linear)(x, y)
q = delayed(three)( x, y, z)
q.visualize(rankdir='LR')

In this example q is now a placeholder for the graph of a delated computation.   As with the dask array examples, we can visualize the graph (plotting it from Left to Right).

delayed-graph

Figure 7.  Graph of a delayed computation.

A call to compute will evaluate our graph.   Note that we have implemented the  four functions each with about 1 second of useless computational math (computing the sum of a geometric series) so that we can measure some execution times.   Invoking compute on our delayed computation gives us

delayed_result

which shows us that there is no parallelism exploited here because the graph has serial dependences.

To create a future, we “submit” the function and its argument to the scheduler client.  This immediately returns a reference to future value and starts the computation.  When you need the result of the computation the future has a method “result()” that can be invoked and cause the calling thread to wait until the computation is done.

Now let us consider the case where the we need to evaluate this graph on 200 different values and then sum the results.   We can use futures to kick off a computation for each instance and wait for them to finish and sum the results.   Again, following the example in the Dask demos, we ran the following on our Azure Kubernetes cluster:

futures-azure-result

Ignore the result of the computation (it is correct). The important result is the time. Calculating the time to run this sequentially (200*4.19 = 838 seconds) and dividing by the parallel execution time we get a parallel speed-up of about 2, which is not very impressive. Running the same computation on the AWS Kubernetes cluster we get a speed-up of 4. The Azure cluster has 6 cores and the AWS cluster has 12, so it is not surprising that it is twice as fast. The disappointment is that the speed-ups are not closer to 6 and 12 respectively.

aws48-future

Results with AWS Kubernetes Cluster

However, the results are much more impressive on our 48 core AWS virtual machine.

aws48-future2

Results with AWS 48-core VM

In this case we see a speed-up of 24.   The difference is the fact that the scheduling is using shared memory and threads.

Dask futures are a very powerful tool when used correctly.   In the example above, we spawned off 200 computations in less than a second.   If the work in the individual tasks is large, that execution time can mask much of the overhead of scheduler communication and the speed-ups can be much greater.

Dask Streams

Dask has a module called streamz that implements a basic streaming interface that allows you to compose graphs for stream processing.   We will just give the basic concepts here.   For a full tour look at https://streamz.readthedocs.io.   Streamz graphs have sources,  operators and sinks.   We can start by defining some simple functions as we did for the futures case:

def inc(x):
    return x+13
def double(x):
    return 2*x
def fxy(x): #expects a tuple
    return x[0]+ x[1]
def add(x,y):
return x+y
from streamz import Stream
source = Stream()

The next step will be to create a stream object and compose our graph.   We will describe the input to the stream later.   We use four special stream operators here.    Map is how we can attach a function to the stream.   We can also merge two streams with a zip operator.   Zip waits until there is an available object on each stream and then creates a tuple that combines both into one object.   Our function fxy(x) above takes a tuple and adds them.   We can direct the output of a stream to a file, database, console output or another stream with the sink operator.  Shown below our graph has two sink operators.

stream1

Figure 8.  Streamz stream processing pipeline.

Visualizing the graph makes this clear.   Notice there is also an accumulate operator.   This allows state flowing through the stream to be captured and retained.   In this case we use it to create a running total.  To push  something into the stream we can use the emit() operator as shown below.

stream2

The emit() operator is not the only way to send data into a stream. You can create the stream so that it takes events from kafka, or reads lines from a file or it can monitor a file system directory looking for new items. To illustrate that we created another stream to look at the home director of our kubernetes cluster on Azure. Then we started this file monitor. The names of the that are there are printed. Next, we added another file “xx” and it picked it up. Next, we invoked the stream from above and then added another file “xxx”.

stream3

Handling Streams of Big Tasks

Of the five types of parallel programming Dask covers 2 and a half:  many task parallelism, map-reduce and bulk synchronous parallelism and part of graph dataflow.   Persistent microservices  are not part of the picture.   However, Dask and Streamz can be used together to handle one of the use cases for microservices.  For example, suppose you have a stream of tasks and you need to do some processing on each task but the arrival rate of tasks exceed the rate at which you can process them.   We treated this case with Microservices while processing image recognition with MxNet and the resnet-152 deep learning model (see this article.)  One can  use the Streams sink operation to invoke a future to spawn the task on the Kubernetes  cluster.   As the tasks finish the results can be pushed to other processes for further work or to a table or other storage as illustrated below.

process-events

Figure 9 Extracting parallelism from a stream.

In the picture we have a stream called Source which gathers the events from external sources.  We then map it to a function f() for initial processing. The result of that step is sent to a function called spawn_work which creates a future around a function that does some deep processing and sends a final result to an AWS DynamoDB table.   (The function putintable(n) below shows an example.  It works by invoking a slow computation then create the appropriate DynamoDB metadata and put the item in the table “dasktale”.)

def putintable(n): 
    import boto3 
    e = doexp(n*1000000) 
    dyndb = boto3.resource('dynamodb', … , region_name='us-west-2' )
    item ={'daskstream':'str'+str(n),'data': str(n), 'value': str(e)} 
    table = dyndb.Table("dasktale") 
    table.put_item(Item= item ) 
    return e 

def spawn_work(n): 
    x = cl.submit(putintable, n)

This example worked very well. Using futures allowed the input stream to work at full speed by exploiting the parallelism. (The only problem is that boto3 needs to be installed on all the kubernetes cluster processes. Using the 48 core shared memory machine worked perfectly.)
Dask also has a queue mechanism so that results from futures can be pushed to a queue and another thread can pull these results out. We tried as well, but the results were somewhat unreliable.

Conclusion

There are many more stream, futures, dataframe and bag operators that are described in the documents.   While it is not clear if this stream processing tool will be robust enough to replace any of the other systems current available, it is certainly a great, easy-to-use teaching tool.   In fact, this statement can be made about the entire collection of Dask related tools.   I would not hesitate to use it in an undergraduate course on parallel programming.   And I believe that Dask Dataframes technology is very well suited to the challenge of big data analytics as is Spark.

The example above that uses futures to extract parallelism from a stream challenge is interesting because it is completely adaptive. However, it is essential to be able to launch arbitrary application containers from futures to make the system more widely applicable.   Some interesting initial work has been done on this at the San Diego Supercomputer center using singularity to launch jobs on their resources using Dask.   In addition the UK Met Office is doing interesting things with autoscaling dask clusters.   Dask and StreamZ are still young.   I expect them to continue to evolve and mature in the year ahead.

Algorithmia™: A Cloud Marketplace for Algorithms and Deep Learning

 

One area of great frustration encountered by application developers involves the challenge of integrating new algorithms into a code base.  There are many reasons for this.   For example, the algorithm may be described in a journal article where many details of the implementation are omitted or it is available only in a programming language different from the one being used.  The code may have software dependencies that are hard to resolve.  The new algorithm may also have hardware dependencies, such as reliance on a GPU to get performance and you may not have access to this hardware.  On the other hand, if you are the author of a great new algorithm you may be disappointed that your new invention is not being used for these very same reasons.     

About 18 months ago a company called Algorithmia™  was founded in Seattle that provides an elegant solution to these problems.  They provide a very simple multi-language API that can be used to invoke any of their catalog of 3,500 different cloud-based algorithms. While we may be getting tired of reading about X-as-a-Service for different versions of X, there is one binding for X that has been around for a while in various forms and, as much as it pains me to do so, it begs to be called Algorithms as a Service.   And AaaS is just one of the things Algorithmia provides.      

AaaS is indeed not a new idea.  Jack Dongarra and his ICL team at the University of Tennessee created NetSolve/GridSove in 2003  to provide scientists and engineers with access to state-of-the-art numerical algorithms running on a distributed network of high performance computers.   As cool as NetSolve is, Algorithmia goes several steps beyond this concept. 

One of Algorithmia’s cofounders and CEO,  Diego Oppenheimer has a deep background in building business intelligence tools.   While working on that he developed an appreciation of the power of being able to call out to powerful algorithms from inside a user facing application.  This capability allows the application to have access to deeper knowledge and more powerful computational resources than available on the user’s device.  A key insight from this experience is that algorithms must be discoverable an invokable from any user application runtime.   These ideas are all central to Algorithmia.  In the following paragraphs we will look at Algoritmia’s marketplace,  explore building a new  algorithm and discuss a bit of the system microservice architecture. 

Algorithmia is a marketplace.  

There are over 50,000 developers that use Algorithmia services and the platform encourages these developers to contribute new algorithms to the collection.   Invoking an algorithm is dead simple and it can be done from any programming language that can formulate a JSON doc and send a REST message.   We will provide some detailed illustrations at the end of this document.  

To use it, you need to set up an account.   Doing so will get you a starter award of 5000 or so “credits”.   When you invoke an algorithm, credits are deducted from your account.   Typically, there is a “royalty” cost of about 10 credits and then the cost is usually around one credit per second of execution.   A fun example from their library of deep learning collection is an image colorizer.   Input is a PNG file of a black and white image and the returned value is a link to the output colorized image.  We took a color image from a visit to Red Square a few years ago.   We converted it to a grayscale image and gave that to the colorizer.  The result is shown illustrated below.  The original is on the left, grayscale in the middle and the colorized image on the right.   While it is not as good as the best hand-colored photos, it is not too bad.     It lost the amazing color of St. Bazil’s Cathedral which is not too surprising,  but it was great with sky and skin tones of those people in foreground.   (It seemed to think the bricks of the square would look better with some grass color.)

colorized

The Python code to upload the grayscale image and invoke the service was incredibly simple.

import Algorithmia
client = Algorithmia.client(‘youruserkeyfromaccountrecation’)
input = bytearray(open("path_to_grayscale.png", "rb").read())
result = client.algo("deeplearning/ColorfulImageColorization/1.1.6")
        .pipe(input).result
path_to_local_copy_of_result_image= client.file(result[‘output’]).getFile()

The cost in credits was 154.   The exchange rate for credits is 1$ = 10,000 credits (approximately) so this invocation would have cost about 1.5 cents.  

This algorithm is from their extensive machine learning and AI collection.  A related algorithm is one that computes the salience of objects in an image.  Salience is the degree to which an object in the image attracts the attention of the viewer’s eye.   The algorithm is called SalNet and it is based on ideas from the paper, Shallow and Deep Convolutional Networks for Saliency Prediction by Pan et. al.  (see arXiv:1603.00845v1).

As with the colorizer, salnet it is easy to invoke.

input = { "image": "data://.algo/deeplearning/SalNet/perm/an-uploaded-image.png" }
result2 = client.algo("deeplearning/SalNet/0.2.0").pipe(input).result

Note that in this case we have loaded the image from one that we uploaded to Algorithmia’s data cloud.  In fact, it is the same grayscale image of red square.  As you can see below, the algorithm picks out the woman in the foreground and also notices the church in the background. 

Salience computation can be very helpful in identifying and labeling objects in an image.   Image tagging is also something that Algorithmia supports.   Running the same image through their tagger returned the observations that the image was “safe” and that there were multiple boys and multiple girls and sky and clouds and it seem to be near a palace. 

salience2

There are many other AI related image algorithms such as nudity detection, character recognition, face detection and a very impressive car make and model recognition algorithm.   A quick look at https://algorithmia.com/use-cases will show many other fascinating use cases. 

Another very cool capability of Algorithmia is its ability to host your trained machine learning model.  Suppose you have a model you have built with MsXNet, TensorFlow, Scikit-Learns, CNTK or any of the other popular ML frameworks, you can upload your model to Algorithmia so that it can be available as a service.   This is explained in here. We will explore this capability in a later post.

While the main emphasis and attraction of the Algorithmia collection is machine learning and AI, there are many more algorithm categories represented there.  For example, there is an excellent collection of utilities for managing data and making certain programming tasks extremely easy: such as extracting text from web pages, Wikipedia search tools, computing the timezone and elevation from lat, lon coordinates.

There is also a large collection of time series analysis algorithms.   These include forecasting, outlier detection, Fourier filters, auto-correlation computation and many more.

Algorithmia is cloud of microservices

In an excellent talk at the 2017 Geekwire cloud summit, Oppenheimer described some key elements of Algorithmia’s architecture.  In this talk he makes the critically important observation that two phases of machine learning,  training and prediction, if used in production require very different execution environments.   Training is often done on a dedicated system consuming many hours of compute and as much memory as is available.   The result of training is a model codified as data.   Prediction (also called Inference) uses the model to make predictions or inferences about a sample case.   Prediction can be done on the same hardware platform that was used for the training, but if the model is to be used to make predictions concerning thousands of cases for thousands of concurrent users,  one need a completely different design.  

Their approach to the scale problem for predictions (and for any high demand algorithm in their collection) is based on serverless microservices.    They use a Kubernetes microservice foundation with algorithms deployed in Docker containers.  Requests from remote client applications are load balanced across API servers who dispatch requests to container instances for the requested function.  The challenge is making the latency from request to reply very low.  If a container for an algorithm is already in system memory, it requires very little time to spawn a new instance on Kubernetes.  Another technique they use it to dynamically load algorithms into running containers.  (We don’t know the exact mechanism Algorithmia uses here, but we expect it is exploiting these facts.) 

They have made some very interesting optimizations.   For example, if the data used in the computation is stored in one of their cloud regions, the docker instance will be instantiated nearby.   Just as important, if an algorithm invokes another algorithm they will attempt to co-locate the two containers and reduce the inter-process latency.  Composability of algorithms is one of their guiding concepts.  

Turning your own algorithm into a microservice

The process of turning your own algorithm into a microservice is remarkably simple.   From the Algorithmia portal there is a “+” symbol in the upper right-hand corner.   This give you a dialog box to fill out.   You provide a name of your algorithm, the programming language you are using (from a long list .. but sorry, no Fortran or Julia but there are lots of alternatives), and several other choices including: your source license policy, does your function invoke other Algorithmia functions, does your function invoke things on the open internet?

Answering these questions causes Algorithmia to create a nice GitHub repo for your function.   Your next step is to install the Algorithmia command line interface and then you can clone your functions GitHub repo.  Once you have done that you can edit the function so that it does what you want.   The basic skeleton is already there for you in the “src” directory.   Here is the basic skeleton in Python rendered as a hello world function.

import Algorithmia
# API calls will begin at the apply() method, 
# with the request body passed as 'input'
# For more details, see algorithmia.com/developers/algorithm- 
# development/languages
def apply(input):
    return "hello {}".format(input)

You can edit the function directly from an editor built into the Algorithmia portal or, now that you have a clone of the repo you can use your own tools to transform this skeleton into your algorithm.   If you have done this work on your clone you need to use the Github commands to push your code back to the master.

We tried this with a small experiment.   We built a function called KeyPhrases that takes English language text as input and breaks it down into subjects (s), actions (a) which are like verb clauses and objects (o).   The algorithm is not very useful or sophisticated.   In fact, it uses another Algorithmia microservice  called Parsey McParseface which was originally released by Goolge (see https://arxiv.org/ pdf/1603.06042v1.pdf) .   This is truly a deep parser that build a very sophisticated tree.  For example the figure below illustrates the tree for a pars of the sentence

Einstein’s general theory of relativity explains gravity in terms of the curvature of spacetime.

parsey-einstein

Parsey McParseface tree output.

Our function KeyPhrases walks the tree and groups the terms, subjects(s), objects(o) and actions (a) and returns a JSON document with the original string and the list of phrases.  It also breaks out separate subphrases with “/” marks.  In this case it returns

{"phrases":[
       "s: Einstein's general theory /of relativity ",
       "a: explains ",
       "s: gravity /in terms /of the curvature /of spacetime. "
        ],
  "text":"Einstein's general theory of relativity explains gravity in terms of the curvature of spacetime."
}

A more complex example is

Facebook Incs chief security officer warned that the fake news problem is more complicated to solve than the public thinks.

The phrase output is

['s: Facebook Incs chief security officer ',
 'a: warned ',
 'o: that the fake news problem ',
 'a: is more ',
 'o: complicated and dangerous /to solve /than the public thinks ']

This is clearly not as rich in detail as the Parsey output, but it does extract some useful key phrases. 

To complete the creation of the microservice for this algorithm one need only issue the git commands

$ git add src/KeyPhrases.py
$ git commit -m "added src mods"
$ git push origin master

The last push causes a compile step to happen and the microservice is now created.   Algorithmia also provides an easy template to add documentation and instructions about how to invoke your function.  From the Algorithmia editor there is a function that allows you to “publish” your algorithm.   After pushing that button, the KeyPhrase example was put in their library.   You can see it here: https://algorithmia.com/algorithms/dbgannon/KeyPhrases (If you use it, remember it has not been tested very well, so it may break.)

Algorithmia as an enterprise platform

The Algorithmia serverless microservice platform is robust enough that they offer it as an enterprise product.   This allows enterprises to host their own version on one of the public clouds or on their own clusters or across multiple cloud in a hybrid system.    This allows their own internally used algorithm to be hosted and invoked by their in-house analytics tools and pipelines in a totally scalable way.   This enterprise version comes with a management dashboard and monitoring tools.

Conclusions

Algorithmia is a fascinating company with very interesting products.   It is extremely easy to sign up for a free account and it is fun to use.   The team was extremely helpful when we had questions.  A Jupyter Notebook with some of the examples mentioned above will be posted very soon.   We found experimenting with the various algorithms from an interactive notebook was a pleasure.   Creating the hosted version of the KeyPhrases algorithm took less than an hour after the original python code was debugged.   In our next experiment we will explore hosting deep learning models with Algorithmia.

Cloud-Native Applications – call for papers

The IEEE Cloud Computing Journal is going to publish a special issue on the topic of Cloud-Native applications. This is an extremely interesting topic and it cuts to the heart of what makes the cloud a platform that is, in many ways, fundamentally different from what we have seen before.

What is “cloud-native”? That is what we want you to tell us. Roger Barga from Amazon, Neel Sundaresan from Microsoft and this blogger have been invited to be guest editors. But we want you to tell us soon. The deadline is March 1, 2017. The papers do not need to be long (3,000 to 5,000 words) and some of the topics possible include:

  • Frameworks to make it easier for industry to build cloud-native applications;
  • Educational approaches and community based organizations that can promote cloud-native design concepts;
  • The role of open source for building cloud-native applications;
  • VM and container orchestration systems for managing cloud-native designs;
  • Efficient mechanisms to make legacy applications cloud-native;
  • Comparing applications – one cloud-native and the other not – in terms of performance, security, reliability, maintainability, scalability, etc.;

And  more.   please go to https://www.computer.org/cloud-computing/2016/09/27/cloud-native-applications-call-for-papers/  to read more.

 

A Brief Look at Google’s Cloud Datalab

Google recently released a beta version of a new tool for data analysis using the cloud called Datalab.  In the following paragraphs we take a brief look at it through some very simple examples.  While there are many nice features of Datalab, the easiest way to describe it would be to say that it is a nice integration of the IPython Jupyter notebook system with Google’s BigQuery data warehouse.  It also integrates standard IPython libraries such as graphics and scikit-learn and Google’s own machine learning toolkit TensorFlow.

To use it you will need a Google cloud account.   The free account is sufficient if you are interested in just trying it out.   You may ask, why do I need a Google account when I can use Jupyter, IPython and TensorFlow on my own resources?    The answer is you can easily access BigQuery on non-trivial sized data collections directly from the notebook running on your laptop.  To get started go to the Datalab home page.   It will tell you that this is a beta version and give you two choices: you may either install the Datalab package locally on your machine or you may install it on a VM in the Google cloud.   We prefer the local version because it saves your notebooks locally.

 The Google public data sets that are hosted in the BigQuery warehouse are fun to explore.  They include

  • The names on all US social security cards for births after 1879.  (The table rows contain only the year of birth, state, first name, gender and number as long as it is greater than 5.  No social security numbers.),
  • The New York City Taxi trips from 2009 to 2015,
  • All stories and comments from “Hacker News”,
  • The US Dept of Health weekly records of diseases reported from each city and state from 1888 to 2013,
  • The public data from the HathiTrust and the Internet Book Archive,
  • The global summary of the day’s (GSOD) weather from the national oceanographic and atmospheric administration from 9000 weather stations between 1929 and 2016.

And more, including the 1000 genome database.

To run Datalab on your laptop you need to have Docker installed.   Once Docker is running then and you have created a Google cloud account and created a project, you can launch Datalab with simple docker command as illustrated in their quick-start guide.  When the container is up and running you can view it at http://localhost:8081.  What you see at first is shown in Figure 1.  Keep in mind that this is beta release software so you can expect it will change or go away completely. 

 datalab-first-view

Figure 1.  Datalab Top level view.

Notice the icon in the upper right corner consisting of a box with an arrow.   Clicking this allows you to login to the Google cloud and effectively giving your authorization to allow you container to run on your gcloud account.

The view you see is the initial notebook hierarchy.   Inside docs is a directory called notebooks that contain many great tutorials and samples.

A Few Simple Examples of Using Datalab

As mentioned above, one of the public data collections is the list of first names from social security registrations.   Using Datalab we can look at a sample of this data by using one of the built-in Bigquery functions as shown in Figure 2.

datalab-names

Figure 2.   Sampling the names data.

 

This page gives us enough information about the schema that we can now formulate a query.

In modern America there is a movement to “post-gender” names.   Typical examples cited on the web are “Dakota”, “Skyler” and  “Tatum”.   A very simple SQL query can be formulated to see how the gender breakdown for these names show up in the data.  In Datalab, we can formulate the query as shown in Figure 3.

datalab-dakotaplus2

Figure 3.   Breakdown by gender of three “post-gender” names.

As we can see, this is very nearly gender balanced.  A closer inspection using each of the three names separately show that “Skyler” tends to be ‘F’ and “Tatum” tends to ‘M’. On the other hand, “Dakota” does seem to be truly post-gender with 1052 ‘F’ and 1200 ‘M’ occurrences.

We can also consider the name “Billy” which, in the US, is almost gender neutral.   (Billy Mitchel was a famous World Work I general and also a contemporary Jazz musician.  Both male. And Billy Tipton and Billy Halliday were female musicians though Billy Halliday was actually named Billie and Billy Tipton lived her life as a man, so perhaps they don’t count.   We can ask how often Billy was used as a name associated with gender ‘F’ in the database?  It turns out it is most common in the southern US. We can then group these by state and create a count and show the top five.   The SQL command is easily inserted into the Datalab note book as shown in Figure 4.

datalab-billy

Figure 4.   Search for Billy with gender ‘F’ and count and rank by state of birth.

Rubella in Washington and Indiana

 A more interesting data collection is Center for Disease Control and Prevention dataset concerning diseases reported by state and city over a long period.   An interesting case is Rubella, which is virus also known as the “German measles”.   Through our vaccination programs it has been eliminated in the U.S. except for those people who catch it in other countries where it still exists.  But in the 1960s it was a major problem with an estimated 12 million cases in the US and a significant number of newborn deaths and birth defects.   The vaccine was introduced in 1969 and by 1975 the disease was almost gone.   The SQL script shown below is a slight modified version of one from the Google Bigquery example.   It has been modified to look for occurrences of Rubella in two states, Washington and Indiana, over the years 1970 and 1971.

%%sql --module rubella
SELECT
  *
FROM (
  SELECT
    *, MIN(z___rank) OVER (PARTITION BY cdc_reports_epi_week) AS z___min_rank
  FROM (
    SELECT
      *, RANK() OVER (PARTITION BY cdc_reports_state ORDER BY cdc_reports_epi_week ) AS z___rank
    FROM (
      SELECT
        cdc_reports.epi_week AS cdc_reports_epi_week,
        cdc_reports.state AS cdc_reports_state,
        COALESCE(CAST(SUM((FLOAT(cdc_reports.cases))) AS FLOAT),0) 
         AS cdc_reports_total_cases
      FROM
        [lookerdata:cdc.project_tycho_reports] AS cdc_reports
      WHERE
        (cdc_reports.disease = 'RUBELLA')
        AND (FLOOR(cdc_reports.epi_week/100) = 1970 
          OR FLOOR(cdc_reports.epi_week/100) = 1971)
        AND (cdc_reports.state = 'IN'
          OR cdc_reports.state = 'WA')
      GROUP EACH BY
        1, 2) ww ) aa ) xx
WHERE
  z___min_rank <= 500
LIMIT
  30000

We can now invoke this query as part of a python statement so we can capture its result as a pandas data frame and pull apart the time stamp fields and data values.

rubel = bq.Query(rubella).to_dataframe()
rubelIN = rubel[rubel['cdc_reports_state']=='IN']
                 .sort_values(by=['cdc_reports_epi_week'])
rubelWA = rubel[rubel['cdc_reports_state']=='WA']
                 .sort_values(by=['cdc_reports_epi_week'])
epiweekIN = rubelIN['cdc_reports_epi_week']
epiweekWA = rubelWA['cdc_reports_epi_week']
rubelINval = rubelIN['cdc_reports_total_cases']
rubelWAval = rubelWA['cdc_reports_total_cases']

At this point a small adjustment must be made to the time stamps.  The CDC reports times in epidemic weeks and there are 52 weeks in a year.    So the time stamps for the first week of 1970 is 197000 and the time stamp for the last week is 197051.  The next week is 197100.  To make these into timestamps that appear contiguous we need to make a small “time compression” as follows.

realweekI = np.empty([len(epiweekIN)])
realweekI[:] = epiweekIN[:]-197000
realweekI[51:] = realweekI[51:]-48

Doing the same thing with epiweekWA we now have the basis of something we can graph.  Figure 5 shows the progress of rubella in Washington and Indiana over two years.  Washington is the red line and Indiana is blue.   Note that the outbreaks occur about the same time in both states and that by late 1971 the disease is nearly gone.

datalab-rubella.png

Figure 5.   Progress of Rubella in Washington (red) and Indiana (blue) from 1970 through 1971.

Continuing the plot over 1972 and 1973 show there are flare-ups of the disease each year but their maximum size is diminishes rapidly.

(Datalab has some very nice plotting functions, but we could not figure out how to do a double plot, so we used the mathplot library with the “fivethirtheight” format.)

 

A Look at the Weather

 

From the national oceanographic and atmospheric administration we have the global summary of the day’s (GSOD) weather from the from 9000 weather stations between 1929 and 2016.   While not all of these stations were operating during that entire period, there is still a wealth of weather data here.   To illustrate it, we can use another variation on one of Google’s examples.  Let’s find the hottest spots in the state of Washington for 2015.   This was a particularly warm year that brought unusual droughts and fires to the state. The following query will list the hottest spots in the state for the year.

%%sql
SELECT
  max, (max-32)*5/9 celsius, mo, da, state, stn, name
FROM (
  SELECT
    max, mo, da, state, stn, name
  FROM
    [bigquery-public-data:noaa_gsod.gsod2015] a
  JOIN
    [bigquery-public-data:noaa_gsod.stations] b
  ON
    a.stn=b.usaf
    AND a.wban=b.wban
  WHERE
    state="WA"
    AND max

 The data set ‘gsod2015’ is the table of data for the year 2015.  To get a list that also shows the name of the station we need to do a join with the ‘station’ table over the corresponding station identifiers.  We order the results descending from the warmest recordings.    The resulting table is shown in Figure 6 for the top 10.

datalab-hotstations

Figure 6.   The top 10 hottest spots in Washington State for 2015

The results are what we would expect.   Walla Walla, Moses Lake and Tri Cities are in the eastern part of the state and summer was very hot there in 2015.   But  Skagit RGNL is in the Skagit Valley near Puget Sound.   Why is it 111 degrees F there in September?   If it is hot there what was the weather like in the nearby locations?   To find out which stations were nearby we can look at the stations on a map.   The query is simple but it took some trial and error.

%%sql --module stationsx
DEFINE QUERY locations
  SELECT FLOAT(lat/1000.0) AS lat, FLOAT(lon/1000.0) as lon, name
  FROM [bigquery-public-data:noaa_gsod.stations]
  WHERE state="WA" AND name != "SPOKANE NEXRAD"

It seems that the latitude and longitude for the Spokane NEXRAD station are incorrect and resolve to some point in Mongolia.  By removing it we get a good picture of the nearby stations as shown in Figure 7.

datalab-hotstations.png

Figure 7.   Location of weather stations in western Washington using the Bigquery chart map function.

 This is an interactive map, so we can get the names of the nearby stations.   There is one only a few miles away  called PADILLA BAY RESERVE and the next closest is BELLINGHAM INTL.   We can now compare the weather for 2015 at these three locations.

 To get the weather for each of these we need the station ID.   We can do that with a simple query.

%%sql
SELECT
  usaf, name
FROM [bigquery-public-data:noaa_gsod.stations] 
WHERE
    name="BELLINGHAM INTL" OR name="PADILLA BAY RESERVE" OR name = "SKAGIT RGNL"

Once we have our three station IDs we can use the follow to build a parameterized Bigquery expression.

qry = "SELECT max AS temperature, \
       TIMESTAMP(STRING(year) + '-' + STRING(mo) + \
       '-' + STRING(da)) AS timestamp \
FROM [bigquery-public-data:noaa_gsod.gsod2015] \
WHERE stn = '%s' and max /< 500 \
ORDER BY year DESC, mo DESC, da DESC"

stationlist = ['720272','727930', '727976']

dflist = [bq.Query(qry % station).to_dataframe() for station in stationlist]

 We can now render an image of the weather for our three stations as shown in Figure 8.

datalab-3stations-final.png

Figure  8.  Max daily temperatures for Skagit (blue), Padilla Bay (red) and Bellingham (yellow)

 We can clearly see the anomaly for Skagit in September and it is also easy to spot another problem in March where the instruments seemed to be not recording.   Other than that there is close alignment of the readings.

Conclusions

There are many features of Datalab that we have not demonstrated here.   The documentation gives an example of using Datalab with Tensorflow and the charting capabilities are more extensive than demonstrated here.  (The Google maps example here was not reproducible in any other notebook beyond the demo in the samples which we modified to run the code here.)  It is also easy to upload your own data to the warehouse and analyze it with Datalab.

 Using Datalab is almost addictive.  For every one of the data collections we demonstrated here there were many more questions we wanted to explore.  For example, where and when did the name “Dakota” start being used and how did its use spread?   Did the occurrence of Rubella outbreaks correspond to specific weather events?  Can we automate the process of detecting non-functioning weather instruments over the years where records exist?  These are all relatively standard data mining tasks, but the combination of Bigquery and IPython in the notebook format makes it fun.

 It should be noted that Datalab is certainly not the first use of the IPython notebook as a front-end to cloud hosted analysis tools.  The IPython notebook has been used frequently with Spark as we have previously described.  Those interested in an excellent overview of data science using Python should look at “Python Data Science Handbook”  by Jake VanderPlas which makes extensive use of IPython notebooks.     There are a variety of articles about using Jupyter on AWS  and Azure for data analytics.  A good one is by Cathy Ye about deep learning using Jupyter in the cloud where she gives detailed instruction for how to install Jupyter on AWS and deploy Caffe there.

.

Kubernetes and the Google Cloud Container Service: Fun with Pods of Celery.

In a previous post I talked about using Mesosphere on Azure for scaling up many-tasks parallel jobs and I promised to return to Kubernetes when I figured out how to bring it up.   Google just made it all very simple with their new Google Cloud container services.   And, thanks to their good tutorials, I learned about a very elegant way to do remote procedure calls using another open source tool called Celery.

So let me set the stage with a variation on an example I have used in the past.   Suppose we have 10000 scientific documents that are stored in the cloud.   I would like to use a simple machine learning method to classify each of these by topic.    I would like to do this quickly as possible and, because the analysis of each document is independent of the others, I can try to process as many as possible in parallel.  This is the basic “many task” parallel model and one of the most common uses of the cloud for scientific computing purposes.      To do this we will use the Celery distributed task queue mechanism to take a list of our documents and send each one to a work queue where the tasks will be parceled out to workers who will do the analysis and respond.

The Google Cloud Container Service and a few words about Kubernetes.

Before getting into the use of Celery and the analysis program, let’s describe the Google Cloud Container Service and a bit about Kubernetes.   Getting started is incredibly easy.   Google has a small free trial account which is sufficient to do the experiments described.  Go to http://cloud.google.com and sign in or create an account.  This will take you to the “console” portal. The first thing you need to do is to create a project. In doing so it will be assigned an id which is a string of the form “silicon-works-136723”.    There is a drop down menu on the left end of the blue banner at the top of the page.  (Look for three horizontal bars.) This allows you to select the type of service you want to work on.     Select the “Container Engine”.  On the “container clusters” page there is a link that will allow you to create a cluster.   With the free account you cannot make a very big cluster.   You are limited to about 4 dual core servers.   If you fill in the form and submit it, you will soon have a new cluster.  There is a special icon of the form “>_” in the blue banner.  Clicking on that icon will create an instance of a “Cloud Shell” that will be automatically authenticated to your account.   The page you will see should resemble Figure 1 below.   The next thing you need to do is to authenticate your cloud shell with your new cluster.   By selecting your container and clicking on the “connect” button to the right you will get the code to paste into the cloud shell.  The result should now look exactly like Figure 1.

google-container-engine

Figure 1.   Creating a Google cloud cluster and connecting the cloud shell to it.

Interacting with Kubernetes, which is now running on our small cluster, is through command lines which can be entered into the cloud shell.   Kubernetes has a different, and somewhat more interesting architectures than other container management tools.   The basic unit of scheduling in Kubernetes is  launching pods. A pod consists of a set of one or more Docker-style containers together with a set of resources that are shared by the containers in that pod.  When launched a pod resides on a single server or VM.   This has several advantages for the containers in that pod.   For example, because the containers in a pod are all running  on the same VM, the all share the same IP and port space so the containers can find each other through conventional means like “localhost”.   They can also share storage volumes that are local to the pod.

To start let’s consider a simple single container pod to run the Jupiter notebook.  There is a standard Docker container that contains Jupyter and the scipy software stack.  Using the Kubernetes control command kubectl we can launch Jupyter and expose its port 8888 with the following statement.

$ kubectl run jupyter --image=jupyter/scipy-notebook --port=8888

To see that it is up and running we can issue the command “kubectl get pods” which will return the status of all of our running pods.   Though we have launched jupyter it is still not truly visible.   To do that we will associate a load balancer with the pod.   This will expose the port 888 to the open Internet.

$ kubectl expose deployment jupyter --type=LoadBalancer

Once that has been run you can get the IP address for jupyter from the “LoadBalancer Ingress:” field of the service description when you run the following.   If it doesn’t appear, try again.

$ kubectl describe services jupyter

One you have verified that it is working at that address on port 8888, you should shut it down immediately because, as you can see, there is no security with this deployment.  Deleting a deployment is easy.

$ kubectl delete deployment jupyter

 There is another point that one must be aware of when building containers that need to directly interact with the google cloud APIs.  To make this work you will need to get application default credentials to run in your container.   For example if you container is going to interact with the storage services you will need this.   To get the default application credentials follow the instructions here.  We will say a few more words about this below.

The Analysis Example in detail.

Now to describe  Celery and how to use Celery and Kubernetes in the many-task scenario described above.

To use Celery we start with our analysis program.   We have previously described the analysis algorithms in detail in another post, so we won’t duplicate that here.  Let’s start by assuming we have a function predict(doc) that takes a document as a string as an argument and returns a string containing the result from our trained machine learnging classifiers.  Our categories are “Physics”, “Math”, “Bio”, “Computer Science” and “Finance” and the result from each classifier is simply the category that that classifier determines to be the most likely correct answer.

Celery is a distributed remote procedure call system for Python programs.   The Celery view of the world is you have a set of worker processes running on remote machines and a client process that is invoking functions that are executed on the remote machines.   The workers and the clients all coordinate through a message broker running somewhere else on the network.

Here we use a RabbitMQ service that is running on a Linux VM on the NSF JetStream cloud as illustrated in Figure 2.

kubernetes-jetstream-setup

Figure 2.  Experimental Configuration with Celery workers running on Kubernetes in the Google Cloud Container Service,  the RabbitMQ broker running in a VM on the NSF Jetstream Cloud and a client program running as a notebook on a laptop.

The code block below illustrates the basic Celery worker template.    Celery is initialized with a constructor that takes the name of the project and a link to the broker service which can be something like a Redis cache or MongoDB.   The main Celery magic is invoked with a special Python “decorator” associated with the Celery object as shown in the predictor.py file below.

from celery import Celery
app = Celery('predictor', backend='amqp')

#Now initialize and load all the data structure that will be constant 
#and recused for each analysis.  In our case this will include
#all the machine learning models that were trained on the data 
#previously. And create a main worker function to invoke the models.  
def invokeMLModels(statement):
    ....
	return analysis
	
#define the functions we will call remotely here
@app.task
def predict(statement):
	prediction = invokeMLModels(statement)
	return [prediction]

What this decorator accomplishes is to wrap the function in a manner that it can be invoked by a remote client.   To make this work we need create a Celery worker from our predictor.py file with the command below which registers a worker instance as a listener on the RabbitMQ queue.

>celery worker -A predictor -b 'amqp://guest@brokerIPaddr'

Creating a client program for our worker is very simple.   It is similar to the worker template except that our version of the predict( ) function does nothing because we are going to invoke it with the special Celery apply_async( ) method that will push the argument to the broker queue and return control immediately to the client.   The object that is returned from this call is similar to what is sometimes called a “future” or a “promise” in the programming language literature.  What it is a placeholder for the returned value.   Once we attempt to evaluate the get() method on this object our client will wait until a reply is returned from the remote worker that picked up the task.

from celery import Celery
app = Celery('predictor', broker='amqp://guest@brokerIPaddr', backend='amqp')

@app.task
def predict(statement):
	return ["stub call"]
	
res = predict.apply_async(["this is a science document ..."])

print res.get()

Now if we have 10000 documents to analyze we can send them in sequence to the queue as follows.

#load all the science abstracts into a list
documents = load_all_science_abstracts()

res = []
for doc in documents:
   res.append(predict.apply_async([doc])

#now wait for them all to be done
predictions = [result.get() for result in res]
#now do an analysis of the predictions

Here we push each analysis task into the queue and save the async returned objects in a list.  Then we create a new list by waiting for each prediction value to be returned.   Our client can run anywhere there is Internet access.  For example this one was debugged on a Jupyter instances running on a laptop.  All you need to do is “pip install celery” and run Juypter.

There is much more to say about Celery and the interested reader should look at the Celery Project site for the definitive guide. Let us now turn to using this with Kubernetes.  We must first create a container to hold the analysis code and all the model data.   For that we will need a Docker file and a shell script to correctly launch celery one the container is deployed.   For those actually interested in trying this, all the files and data are in OneDrive here.   The Docker file shown below has more than we need for this experiment.

# Version 0.1.0
FROM ipython/scipystack
MAINTAINER yourdockername "youremail"
RUN easy_install celery
RUN pip install -U Sphinx
RUN pip install Gcloud
RUN easy_install pattern
RUN easy_install nltk
RUN easy_install gensim
COPY bookproject-key.json /
COPY models /
COPY config /
COPY sciml_data_arxiv.p /
COPY predictor.py /
COPY script.sh /
ENTRYPOINT ["bash", "/script.sh"]

To build the image we first put all the machine learning configuration files in a directory called config and all the learned model files in a directory called models.  At same level we have the predictor.py source.   For reasons we will explain later we will also include the full test data set: sciml_data_arxiv.p. The Docker build starts with the ipython/scipystack container.   We then use easy_install to install Celery as well as four packages used by the ML analyzers: pattern, nltk and gensim.   Though we are not going to use the Gcloud APIs here, we include them with a pip install.   But to make that work we need an updated copy of Sphinx.  To make the APIs work we would need our default client authentication keys.   They are stored in a json file called bookproject-key.json that was obtained from the Gcloud portal as described previously.   Finally we copy all of the files and directory to the root path ‘/’.  Note that the copy from a directory is a copy of the all contained files to the path ‘/’ and not to a new directory.   The ENTRYPOINT runs our script which is shown below.

cp /predictor.py .
export C_FORCE_ROOT='true'
export GOOGLE_APPLICATION_CREDENTIALS='/bookproject-key.json'
echo $C_FORCE_ROOT
celery worker -A predictor -b $1

Bash will run our script in a temp directory, so we need to copy our predictor.py file to that directory.  Because our bash is running as root, we need to convince Celery that it is o.k. to do that.  Hence we export C_FORCE_ROOT as true.  Next, if we were using the Gcloud APIs we need to export the application credentials.   Finally we invoke celery but this time we use the -b flag to indicate that we are going to provide the IP address of the RabbitMQ amqp broker as a parameter and we remove it from the explicit reference in the predictor.py file.  When run the predictor file will look for all the model and configuration data in ‘/’.   We can now build the docker image with the command

>docker build -t “yourdockername/predictor” .

And we can test the container on our laptop with

>docker run -i -t “yourdockername/predictor ‘amqp://guest@rabbitserverIP’

Using “-i -t” allows you to see any error output from the container.   Once it seems to be working we can now push the image to the docker hub.   (to use our version directly, just pull dbgannon/predictor)

We can now return to our Google cloud shell and pull a version of the container there.   If we want to launch the predictor container on the cluster, we can do it one at a time with the “kubectl run” command.  However Kubernetes has a better way to do this using a pod configuration file where we can specify the number of pod instances we want to create.  In the file below, which we will call predict-job.json we specify a job name, the container image in the docker hub,   and the  parameter to pass to the container to pass to the shell script.   We also specify the number of pods to create.   In this case that is 6 as identified in the “parallelism” parameter.

apiVersion: batch/v1
kind: Job
metadata:
   name:predict-job
spec:
   parallelism: 6
   template:
        metadata:
            name: job-wq
       spec:
            containers:
                  - name: c
                  image: dbgannon/predictor
                  args: ["amqp://guest@ipaddress_of_rabbitmq_server"]
          restartPolicy: OnFailure

One command in the cloud shell will now launch six pods each running our predictor container.

$kubectl create -f predict-job.json 

Some Basic Performance Observations.

When using many tasks system based on a distributed worker model there are always three primary questions about the performance of the system.

  1. What is the impact of wide-area distribution on the performance?
  2. How does performance scale with the number of worker containers that are deployed? More specifically, if we N workers, how does the system speed up as N increases?    Is there a point of diminishing return?
  3. Is there a significant per/task overhead that the system imposes?  In other words, If the total workload is T and if it is possible to divide that workload into k tasks each  of size T/k , then what is the best value of k that will maximize performance?

Measuring the behavior of a Celery application as a function of the number workers is complicated by a number of factors.   The first concern we had was the impact of widely distributing the computing resources on the overall performance.   Our message broker (RabbitMQ) was running in a virtual machine in Indiana on the JetStream cloud.   Our client was running a Jupyter notebook on a laptop and the workers were primarily on the Midwest Google datacenter and on a few on other machines in the lab.   We compared this to a deployment where all the workers, the message broker and the client notebook were all running together on the Google datacenter.   Much to our surprise there was little difference in performance between the two deployments.   There are two ways to view this result.   One way is to say that the overhead of wide area distribution was not significant.   The other way to say this is that the overhead of wide area distribution was negligible compared to other performance problems.

A second factor that has an impact on performance as a function of the number of workers is the fact that a single Celery worker may have multiple threads that are responding to asynchronous function calls.   While we monitored the execution we noticed that the number of active threads in one worker could change over time. This made performance somewhat erratic.  Celery’s policy is that it will never have more threads than the number of available cores, so to limit the thread variability we ran workers in container pods on VMs with only one core.

Concerning the question of the granularity of the work partitioning we configured the program so that a number of documents could be processed in one invocation and this number could be set remotely.    By taking a set of 1000 documents and a fixed set of workers, we divided the document set into blocks of size K where K ranged from 1 to 100.   In general, larger blocks were better because the number of Celery invocations was smaller, but the difference was not great.   Another factor involved Celery’s scheduling for deciding which worker get the next invocation.   For large blocks this was not the most efficient because this left holes in the execution schedule when workers were occasionally idle while another was over scheduled.   For very small blocks these holes tended to be small.   We found that a value of K=2 gave reasonably consistent performance.

Finally to test scalability we used three different programs.

  1. The document topic predictor described above where each invocation classified two documents.
  2. A simple worker program that does no computation but just sleeps for 10 seconds before returning a “hello world” string.
  3. A worker that computes part of the Euler sequence sum(1/i**2, i=1..n) where n = 109 .   Each worker computes a block of 107 terms of the sequence and the 100 partial results are added together to get the final result  (which approximates pi2/6  to about 7 decimal places).

The document predictor is very computational intensive and uses some rather large data matrices for the trained machine learning models.   The size of these arrays are about 150 megabytes total.  While this does fit in memory, the computation is going to involve a great deal of processor cache flushing and there may be memory paging effects.   The example that computes the Euler sum requires no data other than the starting point index and the size of the block to sum.   It is pure computation and it will have no cache flushing or memory paging effects, but it will keep the CPU very busy.   The “sleep” example leaves the memory and the CPU completely idle.

We ran all three with one to seven workers.   (6 workers using 6 cores from the small Google demo account and one on another other remote machine).    To compare the results, we computed the time for each program on one worker and plotted the speed-up ratio for 2, 3, 4, 5, 6 and 7 workers.   The results are shown in the graphs below.

predictor-euler-sleep

Figure 3.  Performance as speed-up for each of the three applications with up to 7 workers.

As can be seen, the sleeper scales linearly in the number of workers.   In fact, when executed on multi-core machines it is almost super-linear because of the extra threads that can be used.  (It is very easy for a large number of threads to sleep in parallel.)   On the other hand, the predictor and the Euler examples reached a maximum speed up with around five workers.  Adding more worker pods to the servers did not show improvement because these applications are already very compute intensive.    This was a surprise as we expect all three experiments to scale well beyond seven workers.  Adding more worker pods to the servers did not show improvement because these applications are already very compute intensive.   When looking for the cause of this limited performance, we considered the possibility that the RabbitMQ broker was a bottleneck, but our previous experience with it has allowed us to scale applications to dozens of concurrent reader and writers.   We are also convinced that the Google Container Engine performed extremely well and it was not the source of any of these performance limitations. We suspect (but could not prove) that the Celery work distribution and result gathering mechanisms have overheads that limit scalability as the number of available workers grows.

Conclusion

Google has made it very easy to deploy containerized applications using Kubernetes on their cloud container service.  Kubernetes has some excellent architectural features that allow multiple containers to be co-located on a single server within a pod.   We did not have time here to demonstrate this, but their documentation gives some excellent examples.

Celery is an extremely elegant way to do remote procedure calls in Python.  One only needs to define the function and annotate it with a Celery object.   It can then be remotely invoked with an asynchronous call that returns control to the caller.  A future like object is returned.  By calling a special method on the returned object the caller will pause until the remote call completes and the value is provided to the caller.

Our experiments demonstrated that Celery has limited scalability if it is used without modification and with the RabbitMQ message broker.   However, celery has many parameters and it may be possible that the right combinations will improve our results.  We will report any improved results we discover in a later version of this document.

The State of the Cloud: Evolving to Support Deep Learning and Streaming Data Analytics and Some Research Challenges

(Note:  This is an updated version on 7/21/2016.   The change relates to containers and HPC and it is discussed in the  research topics at the end.)

I was recently invited to serve on a panel for the 2016 IEEE Cloud Conference.  As part of that panel I was asked to put together 15 minutes on the state of cloud technology and pose a few research challenges.   Several people asked me if I had published any of what I said so I decided to post my annotated notes from that mini-talk here. The slide deck that goes along with this can be found here.  There were three others on the panel who each made some excellent points and this document does not necessarily reflect their views.

Cloud computing has been with us for fifteen years now and Amazon’s Web Services have been around for ten.   The cloud was originally created to support on-line services such as email, search and e-commerce.  Those activities generated vast amounts of data and the task of turning this data into value for the user has stimulated a revolution in data analytics and machine learning.  The result of this revolution has been powerful and accurate spoken language recognition, near real-time natural language translation, image and scene recognition and the emergence of a first generation of cloud-based digital assistants and “smart” services.  I want to touch on several aspects of cloud evolution related to these exciting changes.

Cloud Architecture

Cloud architectures have been rapidly evolving to support these computational and data intensive tasks.   The cloud data centers of 2005 were built with racks of off-the-shelf server and standard networking gear, but the demands of the new workloads described above are pushing the cloud architects to consider some radically different approaches.   The first changes were the introduction of software defined networks that greatly improved bisection bandwidth.   This also allowed the data center to be rapidly reconfigured and repartitioned to support customer needs as well as higher throughput for parallel computing loads.   Amazon was the first large public cloud vendor to introduce GPUs to better support high-end computation in the cloud and the other providers have followed suit. To accelerate the web search ranking process, Microsoft introduced FPGA accelerators and an overlay mesh-like network which adds an extra dimension of parallelism to large cloud applications.

The advent of truly large scale data collections made it possible to train very deep neural networks and all of the architectural advances described above have been essential for making progress in this area.   Training deep neural nets requires vast amounts of liner algebra and highly parallel clusters with multiple GPUs per node have become critical enablers.  Azure now support on-demand clusters of nodes with multiple GPUs and dedicated InfiniBand networks. The FPGAs introduced for accelerating search in the Microsoft data centers have also proved to be great accelerators for training convolutional neural networks.   GPUs are great for training deep networks but Nirvana has designed a custom ASIC that they claim to be a better accelerator.   Even Cray is now testing the waters of deep learning.   To me, all of these advances in the architecture of cloud data centers points to a convergence with the trends in supercomputer design.  The future exascale machines that are being designed for scientific computing may have a lot in common with the future cloud data centers.   Who knows?  They may be the same.

Cloud System Software

The software architecture of the cloud has gone through a related evolution.  Along with software defined networking we are seeing the emergence of software defined storage.   We have seen dramatic diversification in the types of storage systems available for the application developer.  Storage models have evolved from simple blob stores like Amazon’s S3 to sophisticated distributed, replicated NoSQL stores designed for big data analytics such as Google’s BigTable and Amazon’s DynamoDB.

Processor virtualization has been synonymous with cloud computing.   While this is largely still true, container technology like Docker has taken on a significant role because of its advantages in terms of management and speed of deployment.  (It is worth noting that Google never used traditional virtualization in their data centers until their recent introduction of IaaS in GCloud.)   Containers are used as a foundation for microservices; a style of building large distributed cloud applications from small, independently deployable components.   Microservices provide a way to partition an application along deployment and language boundaries and they are well suited to Dev-Ops style application development.

Many of the largest applications running on the cloud by Microsoft, Amazon and Google are composed of hundreds to thousands of microservices.   The major challenges presented by these applications are management and scalability.    Data center operating systems tools have evolved to coordinate, monitor and attend to the life-cycle management of many concurrently executing applications, each of which is composed of vast swarms of containerized microservice.  One such systems is Mesos from Mesosphere.

Cloud Machine Learning Tools

The data analytics needed to create the smart services of the future depend upon a combination of statistical and machine learning tools.  Bayesian methods, random forests and others have been growing in popularity and are widely available in open source tools.  For a long time, neural networks were limited to three levels of depth because the training methods failed to show improvements for deeper networks.  But very large data collections and some interesting advances in training algorithms have made it possible to build very accurate networks with hundreds of layers.  However, the computation involved in training a deep network can be massive.   The kernels of the computation involve the dense linear algebra that GPUs are ideally suited and the type of parallelism in the emerging cloud architecture is well suited to this task.   We now have a growing list of open source machine learning toolkits that have been recently released from the cloud computing research community.   These include Amazon’s Tensorflow, AzureML, Microsoft Research Computational Network Tool Kit (CNTK),  Amazon’s Deep Scalable Sparse Tensor Network Engine (DSSTNE), and Nervana’s NEON.    Of course the academic research community has also been extremely productive in this area.  Theano is an important Python toolkit that has been built with contributions from over a dozen universities and institutes.

cloud-ml-layers

Figure 1. cloud ML tools and services stack

Not every customer of cloud-based data analytics wants to build and train ML models from scratch.   Often the use cases for commercial customers are similar, hence another layer of services has emerged based on pre-trained models.   The use cases include image and language recognition, specialized search,  and voice-driven intelligent assistants.   As illustrated in Figure 1, these new services include Cortana (and MSR project Oxford components), Google ML, Amazon Alexa Skills Kit, IBM Watson Services and (using a different style cloud stack) Sentient Aware.

Streaming Data Analytics Services

There are several “exponentials” that are driving the growth of cloud platforms and services.   These include Big Data, mobile apps, and the Internet of things.   The ability to analyze and act on data in motion is extremely important for application area including urban informatics, environmental and ecological monitoring and recovery, analysis of data from scientific experiments and web and data center log analysis.   The Cloud providers and open source research community has developed a host of new infrastructure tools that can be used to manage massive streams of data from remote sources.  These tools can be used to filter data streams, do on-line analysis and use the backend cloud machine learning services.  The tools include Spark Streaming, Amazon Kinesis, Twitter Heron, Apache Flink, Google Dataflow/Apache Beam and the Azure Event hub and data lake.   A more detailed analysis of these tools can be found here.

A Few Research Challenges

As was evident at the IEEE cloud conference, there is no shortage of excellent research going on, but as promised here are a few topics I find interesting.

  1. Cloud Data Center Architecture.  If you are interested in architecture research the Open Compute Project has a number of challenging projects that are being undertaken by groups of researchers.  They were founded by people from companies including Facebook, Intel, Google, Apple, Microsoft, Rackspace, Ericsson, Cisco, Juniper Networks and more and they have contributed open data center designs.   And it is open, so anybody can participate.
  2. Cloud & Supercomputer convergence.   As the sophistication of the cloud data centers approach that of the new and proposed supercomputers it is interesting to look at what architectural convergence might look like.  For example, which modes of cloud application design will translate to supercomputers?   Is it possible that the current microservice based approach to interactive cloud services could be of value to supercomputer centers?   Can we engineer nanosecond inter-container messaging? Can we do a decent job of massive batch scheduling on the cloud with the same parallel efficiency as current supercomputers?
    Update:  It seems that there is already some great progress on this topic.    The San Diego Supercomputer Center has just announced deployment of Singularity on two of their big machines.   Singularity is a special container platform from Gregory M. Kurtzer of LBNL.  There is a great article by Jeff Layton that gives a nice overview of Singularity.
  3. Porting Deep Learning to Supercomputers. There is currently serious interest in doing large scale data analytics on large supercomputers such as those at the national centers.  Some believe that the better algorithms will be available with these advance parallel machines.   Can we compile tensorflow/CNTK/ DSSTNE using MPI for exascale class machines?  In general, are there better ways to parallelize NN training algorithms for HPC platforms?
  4. The current open source stream analytics platforms describe above are designed to handle massive streams of events that are each relative small. However, many scientific event streams are more narrow and have event object that may be massive blobs.   What would it take to modify the open source streaming tools to be broadly applicable to these “big science” use cases.

I welcome feedback on any of the items discussed here.   Many of you know more about these topics than I, so let me know where you think I have incorrectly or overstated any point.