How Careful Engineering Led to Processing Over a Trillion Rows Per Second

EB

Eric Boutin

Former Director of Engineering

How Careful Engineering Led to Processing Over a Trillion Rows Per Second

On March 13, we published a blog demonstrating the performance of SingleStore in the context of ad hoc analytical queries. Specifically, we showed that the query

SELECT stock_symbol, count(*) as c
FROM trade
GROUP BY stock_symbol
ORDER BY c desc
LIMIT 10;

can process 1,280,625,752,550 rows per seconds on a SingleStore cluster containing 448 Intel Skylake cores clocked at 2.5GHz. In this blog post, we drill down into how this was made possible by carefully designing code, exploiting distributed execution, and instruction-level and data-level parallelism.

Why is such high throughput needed? Users of applications expect a response time of less than a quarter of a second. Higher throughput means more data can be processed within that time frame.

Parallelism Through Distributed Execution

For a single threaded execution of the query to process 1,280,625,752,550 rows in one second, it would require processing a row in less than 0.002 clock cycles (at a 2.5GHz clock). This isn’t possible on today’s hardware, so SingleStore scales beyond a single core, beyond a single server, and executes the query on an entire cluster.

Distributed execution is one of the key strengths of SingleStore. Improving performance is easy. Step 1: Add hardware, step 2: Query gets faster. It is a well understood tradeoff. With 448 cores, a row needs to be processed in 0.87 clock cycles on average.

However, 0.87 clock cycles per row is a very small number. Processing a row requires quite a bit more than 0.87 instructions. We could add 50x more Skylake cores, but that starts getting expensive. Is there more parallelism we could extract to further speed up the query? Yes. This can be done by carefully crafting code. Lucky for you, we already did that work in SingleStore.

Let’s get into the technical aspects of the work.

Instruction-Level Parallelism

How do we extract more parallelism out of the system in order to process a row every 0.87 clock cycles on average? The first form of parallelism that we look at is instruction-level parallelism, or pipelining. Modern microprocessors have multiple execution units, and are able to execute multiple instructions in parallel. Instead of executing a single instruction at a time, modern microprocessors have multiple execution units, which can each execute instructions in parallel. The key to leverage this parallelism is to write predictable code, i.e. code with branches that the microprocessor can easily predict, and to avoid write-write contention.

[Execution units in the Haswell processor. Modern microprocessors contain multiple execution units allowing them to execute multiple instructions in parallel. In the Haswell architecture, a CPU core has 4 integer arithmetic units, so a single core can process 4 arithmetic operations in parallel. This is in addition to multi-core parallelism. Source: https://software.intel.com/en-us/forums/intel-isa-extensions/topic/737959\]

Let’s consider the query SELECT key, COUNT(*) FROM t GROUP BY key.

A naive implementation of the query would go as follows. For each key, increment a counter for that key. To make it simple, we consider that the column ‘key’ is a set of small, non-sparse, integers, contained in an array. Every key is included in the range (0, MAX_KEY).

int counters[MAX_KEY];
memset(counters, 0, sizeof(counters));
for (int i = 0; i < NUM_ROWS; ++i)
++counters[key[i]];

On a MacBook, this single threaded algorithms can process 1,343,784,994 keys per second for very simple data. Let’s do better.

There is a branch in the for-loop to identify the termination condition of the algorithm. This branch can be predicted very well by the processor, as most of the time the outcome of the condition i < NUM_ROWS is false. This allows the processor to speculatively execute another iteration successfully. The actual problem here is that with a small number of distinct keys, there are write-write conflicts when updating the counters array. For example, if the key array contains [0,1, 4, 4, 4, 2, 4], then the processor pipeline will stall. The microprocessor tries to process the second ‘4’ before the first ‘4’ has been completed (because of the pipelining). The processor pipeline will stall because two instructions try to update the same memory location, which limits the instruction-level parallelism. How do we fix this? We can use two separate set of counters, this way the second ‘4’ will update a different memory location than the first ‘4’. The third ‘4’ will update the same memory location as the first ‘4’, and that can be further improved by just adding more counter arrays.

int counters1[MAX_KEY];
int counters2[MAX_KEY];
memset(counters1, 0, sizeof(counters1));
memset(counters2, 0, sizeof(counters2));

for (int i = 0; i < NUM_ROWS; i += 2){
++counters1[key[i]];
++counters2[key[i+1]];
}

Of course the example would need to be modified to handle the tail of the array if the number of rows is not even and the counters would have to be merged, but we keep the example simple for ease of understanding. On a MacBook Pro, this example processes rows 2.67 times faster than the naive example.

How does this work in practice? The query select stock_symbol, count(*) as c from trades group by stock_symbol, doesn’t group by a small integer, it groups by stock_symbol, a VARCHAR column.

We organize the data such that trades with the same stock*symbols are stored close to each other. The trades are stored in a columnar format, which is organized as a set of immutable_segments. A segment is simply a large collection of records, and is the unit of compression. Due to the nature of the data, a segment only contains trades for a handful of different stocks. When creating the segment, we identify the set of distinct stock*symbol in a segment, and store them in a _dictionary*. The dictionary provides a way to represent the stock symbol of a trade as a small index in the dictionary, rather than a string. It is a common compression technique in databases. So instead of:

AAPLAMZNAAPLAMZNAMZN..etc...

We produce a dictionary with the entries:

AAPL0
AMZN1
......

This allows us to represent the stock symbols column as:

01011...etc...

Now, when grouping by the stock symbol, we can actually group by a small integer.

But Wait, There’s More!

Data-Level Parallelism, or Single Instruction, Multiple Data (SIMD) is the other form of parallelism that we leverage for this query. It allows us to use a few instructions to process multiple rows at once. SIMD offers instructions executing on very wide registers (256 bits in AVX2, 512 bits in AVX-512). The registers are separated into lanes, each of which contains a separate value. Given that stock_symbols are represented using small dictionary indices, we can fit the stock symbols of 32 trades in a single register, allowing us to process 32 records from the trade table at once.

There are challenges in implementing the query using SIMD. If we continue to derive the naive implementation that was described in the previous section, we notice that there are data-dependent writes (e.g. ++counters1[key[i]]). There is no way to naively translate the above code to update the count of all 32 stocks in a given register at the same time. Further, many of the lanes of a register could need to update the same memory location. This amplifies the write-write conflict problem, that was described in the previous section, by an order of magnitude.

The algorithm that SingleStore uses to execute a vectorized group by, called in-register aggregation, is detailed in the peer-reviewed paper BIPie: Fast Selection and Aggregation on Encoded Data using Operator Specialization, which is being published at the SIGMOD conference. We now give a glimpse of how in-register aggregation works.

In the naive code of the previous section, we used a counter array, then two counter arrays to track the count of each keys. In-register aggregation pushes this further, and uses 32 counters for each stock*symbol. In fact, we use an entire SIMD register for each stock_symbol that we expect to see in a segment. We call those registers the _counter* registers. The counter registers are split into lanes, and the sum of all the lanes of a register gives us the number of times the stock appeared.

Below is an example of registers using 4 lanes, although AVX2 allows us to use up to 32 lanes.

Sample Dictionary:

stock_symbolDictionary id
A0
AAPL1
AMZN2

Counter Registers:

(There is a counter register for each distinct stock symbol.)

A:

2000

\= 2 trades total

AAPL:

0210

\= 3 trades total

AMZN:

102154

\= 67 trades total

As we process the trades, we read an entire register worth of stock symbols (well, the dictionary ids of stock symbols). We compare the content of the register with each dictionary id, and update the count register accordingly. Each iteration of the algorithm processes 32 trades at a time.

In the following example, we read a batch of stock_symbols in registers.

1 (AAPL)1 (AAPL)1 (AAPL)2 (AMZN)

For each stock symbol s, we execute a SIMD comparison of the register allowing us to identify which of the lanes contain a trade for s. If we compare the above register with ‘AAPL’, we obtain the following SIMD register:

1110

The register contains ‘1’ when the register contains a trade for AAPL, and 0 when it doesn’t. Then, we add the resulting register to the AAPL register, which becomes:

AAPL

1320

\= 6 trades total

Each register is compared against each stock_symbols. This technique is optimized for a small number of distinct stock_symbols in a segment. For a large number of groups and small registers, this technique wouldn’t work very well. However, with the large AVX2 and AVX-512 registers, this works quite well.

Depending on the number of groups, the speedup of implementing this algorithm is quite spectacular. For a small number of groups, we can see beyond 10x of speedup when the data is loaded from L3 cache. The bottleneck at this point shifts to the memory bandwidth of main memory. To further optimize performance, we use techniques such as bitpacking, which improves the number of trades we can read per seconds from main memory. This increases the processing cost but allows us to skip past the memory bandwidth bottleneck. Let’s save this topic for another day.

What about the order by and limit also in the query?

Small potatoes.

In this query, the bulk of the work is to execute the select stock_symbol, COUNT(*) from trades GROUP BY stock_symbol.

Key Takeaways

Modern computer systems have a lot of parallelism to exploit. Distributed query execution techniques can dramatically increase the computing power available to respond to a query. In addition, with careful design of algorithms, it is possible to extract an order of magnitude more parallelism by leveraging pipelining and SIMD.

It is amazing how much work can be done in a clock cycle.


Share