Cool things I have worked on: Clustered analytic database (by )

In my last blog post, I talked about how I was involved in building a database optimised for low latency on single-record reads.

So it was a bit of a change to later work on a database optimised for high throughput on bulk querying! Rather than getting single records, it was all about finding all the records that match some criterion and doing something with them as fast as possible, where "fast" is measured in "records per second". When I started there, the minimum time to perform a trivial query (eg, SELECT 1) was about three seconds, let alone any that fetched any data. However, on the right hardware, it could process untold numbers of gigabytes of data per second once it had gotten going. That three-second minimal round trip time was negligible when dealing with queries that processed hundreds of terabytes of data in ten minutes. That said, one of my first projects was to fix some low-hanging fruit, which brought the minimal round trip time down to around 300 milliseconds.

Unlike the previous database, this one had an SQL parser; rather than an API to get and set records, you could enter SQL queries in all their glory, and get back result sets.

You didn't generally INSERT, UPDATE or DELETE individual records (although that was possible, it just didn't perform well, as will be explained later). Generally, data came into the system through CSV or BCP files that were plopped into the filesystem. The system would sort them on a chosen key (why? You'll see), then split into partitions of something like a million records. These partitions were all compressed into "tree files". And because each partition was compressed independently, these jobs could be spread over the cluster.

The compression algorithm was interesting. If you put data in a CSV and gzip it, you'll knock it down to a quarter or so of its size, depending on the data. Our algorithm usually got a factor of ten better compression. This was a big deal for our customers as they had to store many petabytes of data, and dividing the size by a factor of forty meant spending a fortieth of the money on immense disk arrays. Also, most of our query performance was limited by how fast you could read data from disk what with modern CPUs being so good; so compressing things by a factor of forty meant forty times the bulk throughput.

It worked by looking at each column and working out a set of unique values present in it, and sorting them. These columns could then be compressed with fairly conventional techniques for storing sorted lists of things (such as storing each element as the difference from the previous element, and then using conventional algorithms such as deflate on the result).

So if we had the following data:

MakeModelColourQuantity
FordMondeoWhite7
FordMondeoBlue6
FordTransitWhite7
FordTransitRed7
SkodaFabiaWhite4

We would store it as a bunch of columns:

  • Make: Ford,Skoda
  • Model: Fabia,Mondeo,Transit
  • Colour: Blue,Red,White
  • Quantity: 4,6,7

But you still need to know how to reconstruct the records from those values. Each record could be thought of as a vector of integer indices, each of which was an index into the sorted values for that column. If you just stored those records-of-indices, you'd be able to reconstruct the records and would probably have gained some good compression.

If we start the indices at 0 for the first value in each column, our original data might now look like this, referring back to the sorted columns we stored:

MakeModelColourQuantity
0122
0101
0222
0212
1020

But we did more than that. We didn't actually write those records-of-indices to disk. The compression engine took pairs of columns in the records-of-indices representation, and again deduplicated and sorted each of them.

Which pairs you took mattered greatly to how good a compression you got, in subtle ways, but we tried to get related columns. Make and Model are connected, so we'd do well to pick that one first. Each model only ever comes from a single manufacturer in our example, so the combined (make,model) columns only have three values - (0,1), (0,2) and (1,0). So we could store those in a list we'd call an "index set", which actually went onto disk, deduplicated and sorted and compressed like a normal column:

  • Make+Model: (0,1), (0,2), (1,0)

Now the table (in memory) looks like this:

Make+ModelColourQuantity
022
001
122
112
320

Again, we pick a pair of columns. There's no obvious choice any more, so I don't know what our default heuristic would actually do, but let's say we decided to join Make+Model with quantity. The pairings we find are (0,2), (0,1), (1,2) which happens twice, and (3,0), so we'd store:

  • (Make+Model)+Quantity: (0,1), (0,2), (1,2), (3,0)

(not forgetting that the sorted list of values is difference-encoded and compressed, rather than written literally like that).

Our table in memory now has only two columns:

(Make+Model)+QuantityColour
12
00
22
21
32

At this point, we can't continue any further. Each entry in that table corresponds to a record in the original data, and by looking up the index sets we can expand out the columns to recreate the original table in its entirety; so we'd sort that final two-column table and store it (difference-encoded and compressed, of course). Our final entry in the file, the "root indexset", would be:

  • root: (0,0), (1,2), (2,1), (2,2), (3,2)

But we don't need to decompress the entire tree. Unlike a "gzipped CSV", if we just want some of the columns of the table, we can not bother traversing the index set tree to extract every column. If we just wanted the colour for every car in the table, we'd read the root indexset, and the colour valueset so we could look up the colour values, and we'd be done. If we wanted just the makes, we'd need to read the root indexset, the make+model+quantity indexset, the make+model indexset, and the make valueset.

However, if we wanted MIN(Make), MAX(Make) or did a SELECT DISTINCT Make, then we don't even need the root set - we can just open the Make valueset and take the first or last value, or all values.

Anyway, tree files, once built, were given unique partition IDs and put into some kind of storage. We didn't really care what the storage was, and supported several different backends - anything that let us save a file with a name, and get the file back with the same name, would do nicely. We supported raw filesystems (over NFS or on a clustered filesystem so all the servers in the cluster could see it), HDFS, and various enterprise storage devices.

So how did we know what tree files made up each table? Well, a data structure called the registry mapped table names to table objects. We supported a range of different types of table objects, but the most common one was called a "Lazy Union", which represented a UNION ALL in the SQL sense - a bunch of tables concatenated together. It was lazy because we used it to union together all the tree files that made up a table, but the system avoided actually trying to load all the tree files and UNION ALL them together (as that would generally be many petabytes of data); in fact, we basically never actually read any tree files in a lazy union - we just queried it for metadata. The lazy union object itself stored a list of tree files and metadata about them, which was kept in a file called a "partition cache".

So what happened if the user did an INSERT? Well, we had a feature to enable this; the extra records would go into (effectively) a CSV file associated with the table, and a reference to that CSV file put into the partition cache for that table. When the CSV got big enough, we could built it into a tree file and start a new CSV file.

As for UPDATE and DELETE: Well, one caveat of our representation was that we never modified tree files. So to UPDATE or DELETE something, we'd create a "delta file" that referenced the tree file we wanted to modify, then contained instructions as to how to modify it (deleting or updating records). We'd replace the reference to the tree in the partition cache with a reference to the delta file; and voila, the system would see changed or removed records.

We used the immutability of tree files to support point-in-time querying. If you set a query timestamp on your database session, the system would effectively roll back the log of changes to the partition cache to restore the table to the state it was in at that point in time, undoing bulk imports, INSERTs, UPDATEs, and DELETEs.

This trick also worked for ALTER TABLE. If you modified the schema of a table, every existing reference to a tree (or a delta file or a CSV or anything else) in the partition cache would be replaced by a reference to the original file plus a schema-change file that explained the change. So the old data would be translated as it was read, with columns being removed, re-typed, added (with a default value), etc. And, of course, if you put a query timestamp on to query the tables as they were in the past, those scheme changes would appear to be undone; deleted columns would return.

When a query came in that looked like SELECT ...some expressions... FROM table WHERE ...some conditions..., we would look the table up in the registry, and get a lazy union object connected to the partition cache file representing the current state of the table. The query planner would "push down" the condition clauses into the lazy union (LU) object, and it would use the metadata in the partition cache to restrict the list of partitions that might match. The main kind of metadata we stored was the minimum and maximum values of every column in each tree file. If our WHERE clause was column BETWEEN a AND b, we could skip any tree whose value range for that column didn't intersect a..b; and many other kinds of WHERE-clause expression could be used to eliminate based on range.

This is why we sorted each batch of data as it came in. Most of the data we stored was logs of something, so each batch covered a fix time range anyway, and so would produce a bunch of trees restricted to a range of some datetime column; so we could query efficiently on date ranges. But if we then sorted by the next most common WHERE-clause constraint column, we would constrain the trees in the batch to each cover a narrow range of that column, and therefore query more efficiently for ranges of values for that column.

But what about restrictions on columns other than event date and the sort column? Other values in the table, unless correlated with one of those columns, tended to end up rather randomly scattered, so the min and max for that column in every partition tended to approximate the min and max for the entire table, and we couldn't eliminate trees in the lazy union using constraints on that column.

Normal databases let you create indexes on columns, but a B-tree index tended to take up space in the order of many bytes per record in the base table. Unless the table had a lot of columns, this means the index is something like a tenth the size of the uncompressed data. As our tree files were about a fortieth the size of the typical data we had, even a single B-tree index would dwarf the original table; and the compression was a big selling point for people. THat wouldn't do. We needed subtler tricks.

So we let users nominate columns they'd like to do equality queries on (field=literal). For each partition, we'd then create a bloom filter of all the values of that column occurring in the partition, and store it in the partition cache, associated with the tree containing that partition. Bloom filters are basically a set, but with a twist: you insert values into the set, then you can query the set to see if a value exists. While a normal set gives you a yes or no answer in this case, a bloom filter gives you a "maybe or no" answer; it's either definitely not in the set or it might be in the set. In exchange for that vagueness, the bloom filter is a lot smaller than a conventional set representation. How big it is is something you can choose - larger ones are less likely to say "maybe" when the answer is really "no" (false positives). In our cases, we went for a 5% false positive rate by default, which gave us bloom filters that consumed five bits per unique value in that column in the tree file; as each tree held about a million records, so would store anywhere between one and a million unique values for a given column, that meant the filters consumed anything from a byte to 600KiB. When a query came in of the form SELECT ... FROM table WHERE bloom_filtered_column=x, we'd scan through all the bloom filters in the partition cache to look for trees that might contain the value x, and then perform the query on those.

And we could get more advanced than that. If users wanted to optimise queries of the form column LIKE '%foo bar%' - looking for a substring in a field - they could create another kind of bloom filter, which only worked on text columns. In this case, we'd go through each value of that column in the tree and extract all the groups of N characters (N might be four or five, typically). So if the field contained I eat food, the groups of four characters would be "I ea", "eat", "eat", "at f", "t fo", "foo", and "food". We'd take all of those for the entire column in the tree, and make a Bloom filter of them. Then if the user asked for a substring of N characters or more - N had to be chosen to be smaller than or equal to shortest expected search substring - we would do the same for the search string and then look in the bloom filters for those. So if the user had column LIKE '%eat food%' in their WHERE clause, we'd split that into "eat", "at f", "t fo", "foo", and "food". Only trees whose bloom filter return a "maybe" for all of those substrings can possibly contain the target string, so we can eliminate any that don't. We had a few other variants on the bloom filter, too, for even more specialist cases.

So, let's recap. I've explained how we physically stored table data, how we imported batches of data, and started to explain how we do simple queries on a single table.

As I was saying, when such a query came in, we'd get this Lazy Union (LU) object that represented all the trees in the table, and we'd tell it to restrict itself to trees that might be pertinent to the query based on the WHERE clause. If this step didn't reduce the list of trees at all, that was fine; what mattered was that we eliminated as many trees as possible to improve performance by considering less data, while never eliminating a tree that might contain records of interest.

Once we had the restricted list of trees in the lazy union, we would be able to generate the parallel query plan. In this simple case, we'd generate a template job that could be applied to each tree file (in this case, this job would open the tree file it was being applied to, request only the columns it needed for the SELECT clause, and then filter them on the WHERE clause to find only the matching records). That template job would be applied to each tree found in the Lazy Union in parallel across the cluster, and the results streamed back to the user.

Join queries were a bit fiddlier. If the user wanted a simple two-table join, such as SELECT ... FROM a, b WHERE a.x = 123 AND b.y = 456 AND a.z = b.z, we would load up two LUs, one for a and one for b, and restrict both by the WHERE clauses that only affected on table: a.x = 123 on a and b.y = 456 on b. Those would limit the numbers of partitions involved, but still, any partition from a could contain rows that match rows from any partition of b - so we would need to consider all possible combinations. This meant running a number of jobs proportional to the product of the size of the tables. So we tried to reduce that in a few ways.

Firstly, once we'd built a restricted lazy union for one table (say, a), we could quickly work out an approximation to the MAX and MIN of the join column, z, in table a. Because we'd already restricted the LU on the a.x = 123 clause and just had partitions matching that, we could just look at the MAX and MIN of z in those partitions (data which we held in the metadata in the partition cache), and then when we built the restricted LU for b, we could restrict it on b.y = 456 AND b.z BETWEEN min AND max, thereby further restricting b. Of course, once we'd done that, we could approximate the min and max of z in b using the same technique, and further restrict the LU for a, and thus get a smaller z-range for a, and further restrict the LU for b - in fact, we could continue this until the range of z stopped shrinking if we wanted to, but in practice, it usually wasn't worth going for more than a couple of iterations.

Also, rather than building a job for every possible combination, we could instead look at each individual partition of table a, find the actual set of values of z in that partition, then restrict a copy of the LU (already restricted as already described) even further to find only matching trees from b to combine with that partition from a.

Some queries don't need to return all the matching records; some queries were just for aggregrates such as COUNT or SUM or MIN or MAX; some queries were GROUP BY queries that would aggregrate in groups; some queries still returned every record but needed to sort them first because of an ORDER BY. In these cases, we still generated per-tree jobs in parallel to find relevant records, but then we'd run a single special job on a single cluster node that would read the results of the parallel phase and do final processing.

There's a final simple case - a query so simple it just needs a single job. SELECT 1, for instance, which needs no table data from tree files. Or queries that can be fulfilled purely from the tree metadata in the partition cache, so again didn't need to read any tree files.

But what about more complex queries? Joins of three or more tables? Queries with subqueries? Common table expressions with a WITH..AS? Well, we handled all of them by splitting them up into a directed acyclic graph of simple "sub-queries", which operated on a combination of actual "base" tables and the results of other queries; a single final sub-query would produce the final results of the query. In fact, internally, we converted the entire query into a big WITH..AS with each query being one of the above cases we already know how to handle.

We would run the subqueries in parallel, as soon as all the tables they needed existed. To begin with, only the subqueries that depended purely on base tables would be able to run, but the results of subqueries would be built into temporary tables, in a special registry namespace local to that query. As subqueries finished, other subqueries would become runnable because their input tables would exist. We'd run subqueries in parallel across the cluster, which was pretty cool. Eventually, the final query would become runnable; we'd run it and send the results to the user and be done.

So a query that joined tables a,b,c and d could join a and b in parallel with joining c and d, then join the results of those two.

But with all this parallelism going on - different subqueries running different parallel jobs at the same time, all competing for cluster resources with each other and other queries in the system - analysing what happened during a query could have been difficult. Just writing logs on each cluster node wasn't really enough. So each query was given a cluster-wide unique ID when it started, which was passed down to every subprocess involved in executing it. Each process within a subquery had a role name unique within the subquery, assigned as part of the query plan; so a process could be identified by the query ID, the subquery name, and the process role name. So we logged events into a per-process log file on the node the process ran on, identified by the process' unique identifier. Once the query was done, we'd either make every node delete all log files for the query ID if nothing went wrong; but if the query failed, or if a trace was explicitly requested, we'd gather all the log files for the query together onto one node and consolidate them into a "trace file".

This file was an sqlite database, containing all the log events in a highly structured form that made it easy to pull out only events pertaining to particular parts of the query. We also stored high-resolution timestamps so we could extract precise timings for performance analysis. The log events fell into three groups: Traditional log strings as you'd find in any UNIXy log file, machine-readable data items with a key and a value, and phase start/stop events. The latter kind broke the query down into phases, ranging from high-level stuff such as entire subqueries and the planning versus execution phases of subqueries down to fine-grained parts of individual jobs: opening tree files, performing various algorithms, and so on. These served two purposes: they gave other log events a context as to what the system was doing when they happened, and they let us see where the time was spent in a query for performance reasons. The machine-readable data items were generated to record various interesting properties (one example was that every process logged its rusage counters at the end of execution - CPU seconds consumed, peak memory size, number of IO reads and writes, that sort of thing). We recorded a lot of interesting things, such as the peak parallelism of subquery execution.

Because of all the parallelism, phases overlapped with each other; so I took great pleasure in writing a tool that would take a query trace file and produce a Gantt-style chart showing how the time within it broke down. This thing was very popular; support staff could ask users to send them the trace from a failed query, or request a trace for a poorly-performing query, and then we could dissect the trace file to work out what happened. Also, they were popular with the test team; a query that was carefully written to test a particular algorithm in the system could check that the algorithm was actually used by looking in the query trace. As a future change to the query optimiser might alter how queries executed, it would otherwise have been quite common for a test that checked we got correct results in a situation where a certain algorithm was used to no longer actually trigger that algorithm, meaning a part of the system would be left untested. Logging the peak parallelism of subquery execution as a machine-readable data item meant that the various limitations on maximum parallelism could be tested; and tests that were meant to check that everything worked correctly at high levels of parallelism could check that the desired parallelism was actually achieved and some other bottleneck in the system didn't prevent it.

It was a very different world, working on that system compared to the low-latency database I covered in the previous blog post. Which, me being me, just makes me think about ways in which the two approaches could be merged into a database capable of supporting both kinds of workload...

1 Comment

  • By John Cowan, Mon 12th Dec 2016 @ 3:35 am

    Alaric, would you drop me a line saying who you were working for on this database? Sounds a lot like my present employer, 1010data.

Other Links to this Post

RSS feed for comments on this post.

Leave a comment

WordPress Themes

Creative Commons Attribution-NonCommercial-ShareAlike 2.0 UK: England & Wales
Creative Commons Attribution-NonCommercial-ShareAlike 2.0 UK: England & Wales