Scaling tends to make even simple things, like counting, seem difficult. In the past, businesses used specialized databases for particular tasks, including high-speed, high-throughput event counters. Due to the constraints of legacy systems, some people still assume that relational databases cannot handle high-throughput tasks at scale. However, due to advances like in-memory storage, high-throughput counting no longer requires a specialized, single-purpose database.
Why do we even need counters?
Before we get into the implementation, you might be asking why we need counters at all. Why not just collect event logs and compute counts as needed?
In short, querying a counter is much faster than counting log records, and many applications require instant access to this kind of data. Counting logs requires a large table scan and aggregation to produce a count. If you have an updatable counter, it is a single record lookup. The challenge with high-throughput counters is that building a stateful, fault tolerant distributed system can be challenging. Fortunately, SingleStore solves those hard problems for you, so you can focus on building your application.
In the rest of this article we’ll design a simple robust counter database running on a modest SingleStore cluster, and benchmark how it performs.
Counters are records
Let’s start by creating the following schema:
create database test; use test; create table counters_60 ( time_bucket int unsigned not null, event_type int unsigned not null, counter int unsigned not null, primary key (time_bucket, event_type) ); create table event_types ( event_type int unsigned not null primary key, event_name varchar(128), owner varchar(64), status enum ('active', 'inactive') );
The column time_bucket is the timestamp on the event rounded to the nearest minute. Making the time_bucket and event_type the primary key allows us to easily index events by time and type.
insert into counters_60 select unix_timestamp() / 60, 1234, 1 on duplicate key update counter = counter + 1;
If a primary key value does not exist, this query will insert a new record into SingleStore. If the primary key value exists, the counter will be incremented. This is informally called an “upsert.” The management of event_types is outside the scope of this article, but it’s trivial (and fast) to join the counter table to a table containing event metadata such as its human-friendly name.
Let’s also insert some data into the event_types table:
insert into event_types values (1234, 'party', 'memsql', 'active');
Now you have the counts of each event type bucketed by minute. This counter data can easily be aggregated and summarized with simple SQL queries:
-- all-time historical counts of various event types select e.event_type, e.event_name, sum(c.counter) from counters_60 c, event_types e where c.event_type=e.event_type and e.event_type in (1234, 4567, 7890) group by 1, 2; -- total number of events in the last hour select sum(counter), sum(counter)/60 as 'avg per min' from counters_60 where event_type = 1234 and time_bucket >= unix_timestamp() / 60 - 60; -- total number of events in time series, bucketed in 10-minute intervals select floor((unix_timestamp()/60 - time_bucket)/10) as interval, sum(counter) from counters_60 where event_type = 1234 and time_bucket >= unix_timestamp() / 60 - 60 group by 1;
1.6 Million increments per second
Inserting naively into the counters table, one record at a time, actually gets you pretty far. In our testing this resulted in a throughput of 200,000 increments per second. It’s nice to get impressive performance by default. Then we tried to see how much farther we could go.
In this simulation we processed 1,000 different event types. We created a threaded python script to push as many increments a second as possible. We made three changes to the naive version: multi-insert batches, disabling cluster-wide transactions, and sorting the records in each batch to avoid deadlocking.
insert into counters_60 values (23768675, 1234, 1), (23768675, 4567, 1), (23768675, 7890, 1), ... on duplicate key update counter = counter + 1;
We used a 6 node AWS cluster with 2 aggregators and 4 leaves to simulate the workload. Each node was m3.2xlarge consisting of 8 cores and 15GB of RAM, with an hourly cost of \$2.61 for the entire cluster. When starting this script on both aggregator nodes, we achieved a throughput of 1.6M upserts a second.
In this simulation we use a Python script to simulate the data ingest. In the real world, we see our customers use technologies like Storm, Kafka and Spark Streaming to collect events in a distributed system for higher throughput. For more information on SingleStore integration with stream processing engines, see this blog post on how Pinterest uses SingleStore and Spark streaming to track real-time event data.
Want to build your own high throughput counter? Download SingleStore today!