Building a Distributed GBM on H2O

At 0xdata we build state-of-the-art distributed algorithms – and recently we embarked on building GBM, and algorithm notorious for being impossible to parallelize much less distribute.  We built the algorithm shown in Elements of Statistical Learning II, Trevor Hastie, Robert Tibshirani, and Jerome Friedman on page 387 (shown at the bottom of this post).  Most of the algorithm is straightforward “small” math, but step 2.b.ii says “Fit a regression tree to the targets….”, i.e. fit a regression tree in the middle of the inner loop, for targets that change with each outer loop.  This is where we decided to distribute/parallelize.

The platform we build on is H2O, and as talked about in the prior blog has an API focused on doing big parallel vector operations – and for GBM (and also Random Forest) we need to do big parallel tree operations.  But not really any tree operation; GBM (and RF) constantly build trees – and the work is always at the leaves of a tree, and is about finding the next best split point for the subset of training data that falls into a particular leaf.

The code can be found on our git:

For GBM, look in:

src/main/java/hex/tree/gbm/ - driver, main loop from ESL2
src/main/java/hex/tree/ - shared with RF, score & build 1 layer
src/main/java/hex/tree/ - collect & split histograms per leaf


The High Level Bits

We do a straightforward mapping from vectors to leaves of a tree: we assign a “leaf id” or “node id” to each data row in a new temp vector.  We also choose split points (based on the last pass) before making a pass over the data –
split-point decisions are made on summaries from the previous pass and are always “small data”, and typically aren’t useful to parallelize.  Splitting a leaf makes it an internal node, and we add 2 new leaves below it.

On each pass through the data, each observation R starts at an internal tree node; we get the node id (nid) from our temp vector.  The nid is a direct mapping into a POJO (Plain Old Java Object) for that node in a tree.  The node contains the split decision (e.g., “if age<45 go left, else go right”).  We move the observation R down the decision tree, left or right as appropriate, into a new leaf and record the new nid for next pass in our temp vector.

We then accumulate enough stats to make split decisions for the new leaves at the end of this pass.  Since this is a regression tree, we collect a histogram for each column – the mean & variance (using the recursive-mean technique).  Later we’ll pick the split point with the least variance, and the prediction if we don’t split will be the computed mean.

We repeat this process of moving a row R left or right, assigning a new nid, and summing into the histogram, for all rows.  At the end of this pass we can split each current leaf of the tree, effectively building the tree one layer in a breadth-first fashion.  We repeat for each layer, and after one tree is built we start in on the next.

Effectively then, we’re entirely parallel & distributed within a single tree level but not across trees (GBM trees have sequential dependencies) nor within levels of the same tree.  This parallel-within-level aspect means there’s an upper bound on tree building speed: the latency to build a single level.  However, since we’re completely parallel within a level we can make a single level faster with more CPUs (or conversely build a tree with more data in the same time).  In practice, we’re the fastest single-node GBM we’ve ever seen and we can usefully scale-out with datasets as small as a few 10’s of megs.


Finding an optimal split point is done by choosing a column/predictor/feature and a identifying the value within that feature to be the point at which the data are split so as to minimize some loss function such as Mean Squared Error (or maximize information gain).  The problem devolves to finding the optimal split value for a particular feature (and taking the max across all features). The optimal split is often found (with small data) by sorting the feature values and inspecting the induced split. For big data, this can be a problem (e.g. a distributed sort required for each tree leaf). We choose to approximate sorting with binning instead.

Binning (i.e. a radix sort) is fast & simple, can be done in the same pass as our other tree work, and can get arbitrarily close to the sorted solution with enough bins.  However, there’s always the question of unequal binning and  outliers that skew the bin limits.  We solve these problems with dynamic binning – we choose bin limits anew at each tree level – based on the split bins’ min and max values discovered during the last pass.

Choosing bin limits anew means that outliers won’t be present at the next bin split, and it means dense bins will split again with much tighter bounds.  i.e. at each split point we “drill in” on the interesting data, and after a logarithmic number of splits we will find the optimal split point.

For example: suppose we are binning into 3 bins (our default is 1024 bins) and the predictor values are:

{-600e3, -10, -1, -0.8, -0.4, 0.1, 0.3, 0.6, 2.0, 5.0, +600e3}

Notice the extreme outliers, and the dense data around -1.0 to 2.0.  Then the initial bin split points are found by interpolation over the min and max to be -400e3 & +400e3 inducing the split:

{-600e3} {-10, -1, -0.8, -0.4, 0.1, 0.3, 0.6, 2.0, 5.0}  {+600e3}

The outliers are removed into their own splits, and we’re left with a dense middle bin.  If on the next round we decide to split the middle bin, we’ll bin over the min & max of that bin, i.e. from -10 to +5 with split points -5 and 0:

{-10} { -1, -0.8, -0.4}  {0.1, 0.3, 0.6, 2.0, 5.0}

In two passes we’re already splitting around the dense region.  A further pass on the 2 fuller bins will yield:

{ -1 } {-0.8} {-0.4}
{0.1, 0.3, 0.6} { 2.0} { 5.0}

We have fine-grained binning around the dense clusters of data.

Multinomial (multi-class) Trees

The algorithm on ESL2, pg 387 directly supports multinomial trees – we end up building a tree-per-class.  These trees can be built in parallel with each other, and the predictive results from each tree are run together in step2 .b.iii. Note that the trees do not have to make the same split decisions, and typically do not especially for extreme minority classes.  Basically, the tree targeting some minority class is free to make decisions that optimize for that class.


For factor or categorical columns, we bin on the categorical levels (up to our bin limits in one pass).  We also use an equality-split instead of a relational-split, e.g.

{if feature==BLUE then/else}

instead of

{if feature < BLUE then/else}

We've been debating splitting on subsets of categories, to get more aggressive splitting when the data have many levels.  i.e., a 30,000 level categorical column will have a hard time exploring all possible categories in isolation without a lot of split levels.

As with all H2O algorithms, GBM can be accessed from our REST+JSON API, from the browser, from R, Python, Scala, Excel & Mathematica.  Enjoy!



An API For Distributed Analytics

There are so many APIs to choose from!

Features of the space:

  • Lots of data – which I’ll qualify as “bigger than 1 machine” and thus needing parallel i.o, parallel memory, & parallel compute – and distributed algorithms.
  • Ease of programming; hide details (but expose when want to).  High level for ease-of-use, but “under the covers” has to be easy to understand- because no tool solves all problems – so expect extensions & frequent 1-off hacks.
  • Speed: In-memory by default, where memory can range from 2G to 2T and beyond.  Data placement is required (do not schlop data about unless needed, move code to data, no disk i/o by default).
  • Speed By Default: the normal/average/typical programming style will be fast.  You can “trip over yourself” and go slow, but normal usage is fast.  Obvious when you’re moving away from “go fast” to “unknown speed”
  • Correct By Default: the normal/average/typical programming style will not be exposed to weird corner cases.  No data-races (unless you ask for them), no weird ordering rules, job-scheduling, nor counting mappers & reducers, no figuring out sharding or data placement, nor other low-level easy-to-get-wrong details.  Resource management simple by design.
  • Accessible for non-expert programmers, scientists, engineers, managers – looking for a tool, not wanting the tool to be more complicated than the problem


Design decisions:

Automatic data placement: It’s a hard problem, and its been hard for a long time – but technology is changing: networks are fast relative to the size of memory.  You can move a significant fraction of memory in a cluster in relatively little time.  “Disk is the new tape”:  we want to do work in-memory for that 1000x speedup over disk, but this requires loading memory into one of many little slices in the cluster – which implies data-placement.  Start with random placement – while it’s never perfect, it’s also rarely “perfectly wrong” – you get consistent decent access patterns. Then add local caching to catch the hot common read blocks, and local caching of hot or streaming writes.  For H2O, this is one of the reasons for our K/V store; we get full JMM semantics, exact consistency, but also raw speed: 150ns for a cache-hitting read or write.  Typically a cache miss is a few msec (1 network hop there and back).

Map/Reduce: It’s a simple paradigm shown to scale well.  Much of Big Data involves some kind of structure (log files, bit/byte streams, up to well organized SQL/Hive/DB files).  Map is very generic, and allows an arbitrary function on a unit of work/data to be easily scaled.  Reduce brings Big down to Small in a logarithmic number of steps.  Together, they handle a lot of problems.  Key to making this work: simplicity in the API.  Map converts an A to a B, and Reduce combines two B’s into one B – for any arbitrary A and B.  No resource management, no counting Map or Reduce slots, no shuffle, no keys.  Just a nice clean Map and Reduce.  For H2O this means: Map reads your data directly (type A) and produces results in a Plain-Olde Java POJO’s (type B) – which is also the Map’s “this” pointer.  Results returned directly in “this”.  “This is Not Your Father’s Map Reduce”

Correct By Default: No multi-threaded access questions.  Synchronization, if needed, is provided already.  No figuring out sharding or data placement; replication (caching) will happen as-needed.  NO resource management, other than Xmx (Java heap size).  Like sync, resource management is notoriously hard to get right so don’t require people to do it.  For 0xdata, this means we use very fine grained parallelism (no Map is too small, Doug Lea Fork/Join), and very fine-grained Reduces (so all Big Data shrinks as rapidly as possible).

Fast By Default: Use of the default Map/Reduce API will produce programs that run in parallel & distributed across the cluster, at memory bandwidth speeds for both reads and writes.  Other clustered / parallel paradigms are available but are not guaranteed to go fast.  The API has a simple obvious design, and all calls have a “cost model” associated with them (these calls are guaranteed fast, these calls are only fast in these situations, these calls will work but may be slow, etc.  For 0xdata, code that accesses any number of columns at once (e.g. a single row) – but independent rows – will run at memory bandwidth speeds.  Same for writing any number of columns, including writing subsets (filtering) on rows.  Reductions will happen every so many Maps in a Log-tree fashion.  All filter results are guaranteed to be strongly ordered as well (despite the distributed & parallel execution).

Easy / Multiple APIs – Not all APIs are for all users!  Java & Map/Reduce are good for Java programmers – but people do math in R, Python and a host of other tools.  For 0xdata, we are a team of language implementers as well as mathematicians and systems’ engineers.  We have implemented a generic REST/JSON API and can drive this API from R, python, bash, and Excel – with the ability to add more clients easily.  From inside the JVM, we can drive the system using Scala, or a simple REPL with an R-like syntax.


Lets get a little more concrete here, and bring out the jargon –

A H2O Data Taxonomy

Primitives – at the bottom level, the data are a Java primitive – be it a byte, char, long or double.  Or at least that’s the presentation.  Under the hood we compress aggressively, often seeing 2x to 4x more compression than the GZIP’d file on disk – and we can do math on this compressed form typically at memory bandwidth speeds (i.e., the work to decompress is hidden in the overhead of pulling the data in from memory).  We also support the notion of “missing data” – a crucial notion for data scientists.  It’s similar to Double.NaN, but for all data types.

A Chunk – The basic unit of parallel work, typically holding  1e3 to 1e6 (compressed) primitives.  This data unit is completely hidden, unless you are writing batch-Map calls (where the batching is for efficiency).  It’s big enough to hide control overheads when launching parallel tasks, and small enough to be the unit of caching.  We promise that Chunks from Vecs being worked on together will be co-located on the same machine.

A Vec – A Distributed Vector.  Just like a Java array, but it can hold more than 2e31 elements – limited only by available memory.  Usually used to hold data of a single conceptual type, such as a person’s age or IP address or name or last-click-time or balance, etc.  This is the main conceptual holder of Big Data, and collections of these will typically make up all your data.  Access will be parallel & distributed, and at memory-bandwidth speeds.

A Frame – A Collection of Vecs.  Pattered after R’s data.frame (it’s worked very well for more than 20 years!).  While Vecs might Big Data (and thus can be expensive to touch all of), Frames are mere pointers to Vecs.  We add & drop columns and reorganize them willy-nilly.  The data munging side of things has a lot of convenience functions here.


The Map/Reduce API

(1) Make a subclass of MRTask2, with POJO Java fields that inherent from Iced, or are primitives, or arrays of either.  Why subclassed from Iced?  Because the bytecode weaver will inject code to do a number of things, including serialization & JSON display, and code & loop optimizations.

class Calc extends MRTask {

(2) Break out the small-data Inputs to the Map, and initialize them in an instance of your subclass.  “Small data” will be replicated across the cluster, and available as read-only copies everywhere.  Inputs need to be read-only as they will be shared on each node in the cluster.  “Small” needs to fit in memory and my example is with doubles, but mega-byte sized data is cheap & commonly done.

  final double mean;   // Read-only, shared, distributed
  final int maxHisto;  // Read-only, shared, distributed
  Calc(double meanX, int maxHisto) { this.mean = meanX;  this.maxHisto = maxHisto; }

(3) Break out the small-data Outputs from your map, and initialize them in the Map call.  Because they are initialized in the Map, they are guaranteed thread-local.  Because you get a new one for every Map call, they need to be rolled-up in a matching Reduce.

  long histogram[];
  double sumError;
  void map( ... ) {
    histogram = new long[maxHisto]; // New private histogram[] object

(4) Break out the Big Data (inputs or outputs).  This will be passed to a doAll() call, and every Chunk of the Big Data will get a private cloned instance of the Calc object, distributed across the cluster:

new Calc(mean,vec.max()).doAll(myBigVector /*or Frame or Vec[] or ....*/);

(5) Implement your Map.  Here we show a Batching-Map, which typically does a plain “for” loop over all the elements in the Chunk.  Access your data with the “at0” (Chunk-relative addressing) call – which is the fastest accessor but requires a chunk (and takes an “int” index).  Plain Vec-relative addressing is a little slower, but takes a full “long” index: “ idx)”.

  void map( Chunk chk ) {
     histogram = new long[maxHisto];
     for( int i=0; i&lt;chk.len; i++ ) {
       double err = chk.at0(i)-mean;
       sumError += err*err;

(6) Implement a Reduce for any output fields.  Note that Reduce has a “type B” in the “this” pointer, and is passed a 2nd one:

  void reduce( Calc that ) {
     sumError += that.sumError;
     // Add the array elements with a simple for-loop... we use this
     // simple utility.
     histogram = ArrayUtils.add(histogram,that.histogram);

(7) That’s it!  Results are in your original Calc object:

Calc results = new Calc(mean,vec.max()).doAll(myBigVector);
System.out.println(results.sumError+" "+Arrays.toString(results.histogram));

The Rest of the Story

You have to get the data in there – and we’ll import from HDFS, S3, NFS, local disk, or through your browser’s upload.  You can drive data upload from Java, but more typically from R, python, REST/JSON, or Excel.  Same for outputting Big Data results: we’ll write back Big Data to any store, while being driven by any of the above languages. If you build a predictive model, you’ll want to eventually use the model in production.  You can use it in-memory as-is, scoring new datasets on the model – and for example constantly streaming new data through the model while at the same time constant churning out new models to be streamed through.  Or you can get a Java version of any model suitable for dropping into your production environment.

And that’s the end of my whirl-wind tour of the H2O Distributed Computing API.  Hope you like it!

Comments & suggestions welcome.