A K/V Store For In-Memory Analytics: Part 1

First, some non-technical madness; fun technical bits below:

Jan 22: My life in the past 2 hours…
Rush to South San Jose from office team photo op –
Get my Kid L (age 16) from high school and rush her to to dentist in mid-Campbell: needs a bunch of cavities filled.
While sitting in the dentist’s office, Kid M (also mine, age 14) calls from Mom’s car – needs a book from Mom’s house; he’s staying away from J (18; 3rd kid) who is quarantined with a 103 degree fever. June (GF) calls – has a prescription for her rash – and a cause figured out finally; K (age 20, 4th kid) calls – she has J’s flu but isn’t feeling so bad, so she’s going to class in Berkeley. June wants to hear how K’s doing so I call June back.
M calls again – needs another book.
M calls a 3rd time – and this time Mom is explaining to she’s figured out what M is up too (and is apologizing for letting him call me so much). June calls – she has a flat in San Mateo, and has never dealt with a flat tire before in her life. I tell her to call AAA since I got kids & need to be home to cook dinner.  She canceled AAA last month.
I’m trying to ask her “how flat is flat”, but if June is noticing it, it’s pretty bad. Some stranger in the background is saying “no you can’t drive it to a station, you’ll ruin the rim” (so yeah, really bad). Two minutes later same stranger is offering to change her spare.
June calls back in 10mins; she has a spare, has standard Toyota gear for tire changing, but one of the lug nuts is rounded off & stuck.  10mins more the stranger gets the nut off, and I’m trying to find a place for June to take her car & get the tire repaired, while sitting in the dentist’s office.
In 30mins L comes out of the DDS chair with a numb mouth & fewer cavities,
then we get to run to Mom’s house & get her & M’s books & guitar, then home so I can cook dinner.
I think I actually wrote some code in the middle of all that, but I’m not really sure.

Feb 4: I’m pondering how to “grow” an internal engineer to work on our internal R-like programming language.  I figure we need a short intro-language to dev/make to get the basic language & tool-building mindset across.  I recall a Byte magazine article about a language called “Mouse”.  Google & the Internet come through: there’s a Wiki article on Mouse


and there at the bottom is the link to the Byte magazine, now fully scanned:


Scary stuff: 35 years later and I can fully recall some obscure programming language, down to all kinds of implementation details.  And ads for “fast 4K static RAMs” and “fully loaded TRS-80’s”.  But not my neighbor of 15 years kids’ names.


A K/V Store for In-Memory Analytics

0xdata.com is building in-memory analytics (no surprise, see 0xdata.com).  What may be a surprise, though, is that there’s a full-fledged high-performance Key/Value store built into H2O and that is central to both our data management and our control logic.

We use the K/V store in two main ways:

  • All the Big Data is stored striped across the cluster with pseudo-random Keys.  For any given Key we can find the node which “homes” it from the Key itself, and cache it locally as needed.  Pseudo-random striping in practice means the data is typically well distributed around the cluster.  Caching (as opposed to a fixed placement) means we do not need to be perfect about which CPU works on which chunk of data – as long as *most* of the work is CPU-local.
  • About 1/2 the control logic goes through the K/V store (and the other half uses the RPC mechanism).  Anything which needs cross-node visibility and persistence uses the store, including progress counters, meta-data for temporary distributed vectors, built models, loaded datasets, and results from all kinds of work we wish to cache to avoid doing it again later.


First some simple facts:

  • The H2O K/V store supports the full Java Memory Model – lazy consistency can be asked for in places but typically only increases performance under certain use-cases (a mix of high volume reads & writes to the same keys).  Note that the JMM does not order writes to non-volatile variables, so high volume writes to unrelated Keys (as is common on temp data in the middle of some big calculation) all runs in parallel with both the network traffic and the actual Value production.  i.e., we don’t typically stall-on-write on any common use-case, and we still maintain exact JMM semantics.
  • All Keys can be cached locally – meaning a typical hot Key ‘get’ is cached, and costs no more than a hashtable ‘get’ – about 150ns.  Same for a ‘put’ – the write is cached locally, then forwarded to another node (AFAIK, this is the fastest K/V get/put on the planet, but see below about not persistent).  The forwarding happens in the background by default (unless you’ve specified volatile-like behavior).  Meanwhile, local readers will see the write immediately, and the writer is stalled for no more time than the hashtable ‘put’ and a single UDP-packet send.
  • H2O also supports transactional K/V updates – the transaction function is forwarded to the Key’s home, where it is run in a loop until the transaction succeeds or is aborted.  We often use this for e.g. asynchronous updates to progress bars.
  • The H2O Cloud is peer-to-peer.  There is no “name-node” nor central Key dictionary.  Each Key has a home-node, but the homes are picked pseudo-randomly per-key.
  • H2O’s store is not persistent, nor is it an HA solution.  We looked at those (and even at one point had a fully functioning Key auto-replicate & repair) and decided that the market was well served by existing technologies.  We opted to put our energies into the Math side of things.


Some Big Picture Details:

H2O’s K/V store is a classic peer-to-peer Distributed Hash Table, with the Keys distributed around the cluster via a psuedo-random hash function.  Pseudo-random because we can (and frequently do) muck with it, to force Keys to ‘home’ to different nodes (usually for load-balance reasons).  A Key’s ‘home’ is solely responsible for breaking ties in racing writes and is the “source of truth” for that Key.  To repeat: Keys can be cached anywhere, and both reads & writes can be cached (although a write is not complete until it reaches ‘home’, gets ordered with other writes, and an ACK is returned).  The ‘home’ is only consulted when breaking ties on conflicting writes or to fetch the value on a Key miss.

Keys are not much more than a blob of unique bytes, often char arrays from Strings or random UUID’s.

Values hold bits for maintaining consistency, plus status bits for being on some backing store (for the user-mode swap-to-disk), plus a big blob of bytes.  The blob of bytes is typically a serialized POJO, and if so aninflated copy of the POJO is kept around also.  We use generics to auto-inflate the POJO form:

MyPOJO pojo = DKV.get(key).get();

This code will set the variable “pojo” to a POJO pulled from the K/V store.  If all caches hit, this will take about 150ns.  There is a lot of raw compressed data (the Big Data part of big data), so Big Data is read directly from the bytes and its “POJO” form is the self byte array – i.e., we don’t keep two copies of Big Data (both a serialized and deserialized form).

Inflated POJOs (and their backing Values) are “immutable”.  While we cannot enforce this at the Java level, updates to the POJOs will not stick in the K/V store unless a ‘put’ is done.  More on this later, but mostly it turns into a coding style issue: if you need to update a POJO AND make the changes globally visible, you need to do a “DKV.put(key,pojo)” at some point.

The further restriction on POJOs is that they inherit from the class “Iced”.  The bytecode weaver will then inject all the code needed to serialize & deserialize (and a JSON pretty-printer, and a bunch of other code).  This rules out the default Java collections (although we have some equivalents – that can run *distributed*, because the collection can get larger than what a single node can hold).  In practice this hasn’t been an issue.  We serialize Iced & primitives, arrays of Iced & primitives and recursive subclasses of Iced.


Reliable Remote Procedure Call

H2O is a clustered solution which requires network communication to JVMs in unrelated process or machine memory spaces.  That network communication can be fast or slow, or may drop packets & sockets (even TCP can silently fail), and may need to be retried.  We have implemented a reliable RPC mechanism which retries failed communications at the RPC level.  An RPC contains a command (or call) to execute on the remote, plus the call arguments; there is a return value.  Both args & returns may be void, or small or may contain gigabytes of data.

Our mechanism has all the obvious optimizations: message data is compressed in a variaty of ways (because CPU is cheaper than network).  Short messages are sent via 1 or 2 UDP packets; larger message use TCP for congestion control. RPCalls are retried periodically until we get an ACK back; the ACK also contains the call’s return value.  The ACK itself is also retried until the called node gets an ACKACK back (and this ends the cycle of Call/ACK/ACKACK).  We handle all the problems with double-sending of tasks & replies.  The end experience is the client makes a blocking call, sending the ‘this’ POJO over the wire – and gets back a modified ‘this’ POJO with results filled in.

In practice, we can pull cables from a running cluster, and plug them back in, and the cluster will recover; – or drop >50% of all UDP packets and still have the system work (albeit more slowly with lots of retries).


And For Next Time, The Gory Details


Been too long since I’ve blogged, but this blog has become quite long already!  And I think I’m going to need pictures also… so, next time the gory details (and yes, building on the above parts).