Projections: A Powerful New Way to Speed Up Queries in SingleStore

SingleStore gives you powerful options for creating columnstore tables to improve query performance. These include shard keys, sort keys and indexes. Previously, you could only have one sort and one shard key on a table. We're proud to announce that in SingleStore 8.5, you can use a new feature called projections to provide secondary sort and shard keys.

Projections: A Powerful New Way to Speed Up Queries in SingleStore

A projection is a second copy of all or a subset of the columns of a table. The projection can have different sort and shard keys than the primary table, and different indexes.

Sort keys are helpful to speed up range filters, and shard keys are great for speeding up GROUP BYs and enabling collocated (local) joins. Sort keys can speed up range filters by allowing segment elimination to work better so you can skip most of the data when doing a range filter.

Shard keys can speed up GROUP BYs by allowing them to be done locally on partitions — without any final aggregation step — if the grouping key is contained in the shard key. Similar to GROUP BY speedup, projections can also speed up COUNT(DISTINCT …) operations by allowing them to be done locally if the fields on which DISTINCT is applied contain the shard key.

Shard keys can also speed up collocated joins by allowing partitions of the joining pair of tables to be joined locally on the node where they live, without a shuffle or broadcast.

Projections are a generalization of the secondary index concept for a distributed and sharded database. In traditional databases you can use secondary indexes with included columns (SQL Server, PostgreSQL), or create "covering" indexes (Oracle) to get some of the same benefits you get with projections. Some other modern analytics databases support projections (Vertica) or support materialized views that can be used similar to projections (Amazon Redshift).

SingleStore's columnstore hash indexes are more compact and cheaper to update than projections, and you should use them instead of projections if you just want to speed up equality lookups on a key with minimal additional storage and update overhead. Projections are more powerful than traditional secondary indexes because they may have their own hash indexes.

what-to-know-about-projectionsWhat to know about projections

Like indexes, projections depend on the object on which they are created. When you drop a table, its projections are also dropped. Projections are updated synchronously when you update the primary table, in the same transaction that updates the table. That means you can be confident that if your query is answered with a projection, you'll get the same result as if it was answered with the base table. Creating a projection is offline — you can query but not update the table while you are creating a projection on it.

It’s important to note projections are not free — use them sparingly because they take extra time to update when you update a table, like indexes. And, projections take extra space. You want to make sure the benefit of a projection is more than the cost to maintain it.

example-ways-to-speed-up-queries-with-projectionsExample: Ways to speed up queries with projections

Let's take a look at some representative types of queries and how to speed them up with projections.

Range query speedup

Range queries are common in database applications. These queries filter to find all values in a range, such as "t.c1 between a and b" or "t.c1 > x". If your table has a sort order, then range queries can use that order to scan just part of the data, not all of it.

Figure 1: Illustration of how a sort order can speed up some but not all range filters.

Figure 1 is a simplified illustration that shows how a table with a sort order on c1 can enable a more efficient range filter between values a and b on c1. However, for a range filter on c2, the whole table must be scanned to solve it.

Many real applications have tables with multiple date columns, like "aquisition_date" and "last_order_date" for a "customer" table in an online shopping application. The aquisition_date is the date the customer joined the platform, and the last_order_date is the date of their most recent order.

These columns may not be highly correlated. In this example, if you have range queries on both of these dates, you may want to optimize both of them with a sort key. Now with projections, you can do that much more easily. Suppose you're running a large eCommerce company with millions of customers. And you have this table schema:

create table customer(
id int,
address text,
city text,
state char(2),
zip char(5),
aquisition_date date,
last_order_date date,
sort key(aquisition_date), shard key(id));

Now, suppose we have these two queries and want to make them both as fast as possible.

/* Q1: # orders in last 30 days */
select count(*)
from customer
where last_order_date > date_add(now(), interval -30 day);
/* Q2: number of customers acquired in last year */
/* uses SORT KEY on base table */
select count(*)
from customer
where aquisition_date > date_add(now(), interval -1 year);

For purposes of illustration, we filled this table with a random set of 33,554,432 rows, as described in Appendix A. Using this data set, on an 8-core laptop:

Q1 runs in 23 milliseconds.

Q2 runs in 7 milliseconds.

Q2 runs fast without any changes, and it benefits from the sort key on aquisition_date defined in the CREATE TABLE statement for customer. Suppose Q1 is frequent and you really need it to run faster. What can you do? Create a projection sorted on last_order_date! Here's the command for that:

create projection p_odate
(sort key(last_order_date), shard key(id)) as
select * from customer;

At the time of writing, you have to use a hint to use this projection for a range filter query. You can write it like so:

select count(*)
from customer with (use_projection = "p_odate")
where last_order_date > date_add(now(), interval -30 day);

Now, it runs in 6 milliseconds, and benefits from the sort order of p_odate. In summary, projections can be a big help if you need the fastest possible range filters on two different keys that are not correlated with each other.

GROUP BY speedup

singlestore> select zip, count(*) c from customer group by zip order
by c desc limit 5;
+-------+-----+
| zip | c |
+-------+-----+
| 50015 | 415 |
| 49803 | 413 |
| 20818 | 413 |
| 76541 | 412 |
| 20714 | 410 |
+-------+-----+
5 rows in set (0.63 sec)

The plan for this shows it does a Repartition and ShuffleGroupBy which can be slower than a local group by. How do you use a projection to eliminate the need to repartition? Create a projection that shards on zip. Let's try that:

create projection p_zip (shard key(zip), sort key(aquisition_date)) as
select * from customer;

Now, if we do:

explain select zip, count(*) c from customer group by zip order by c
desc limit 5;

We see that the new plan does local HashGroupBy without a Repartition. The only data movement command is a Gather near the top of the plan. Running the query (after dropping the original plan from the plan cache), we get:

singlestore> select zip, count(*) c from customer group by zip order
by c desc limit 5;
+-------+-----+
| zip | c |
+-------+-----+
| 50015 | 415 |
| 49803 | 413 |
| 20818 | 413 |
| 76541 | 412 |
| 20714 | 410 |
+-------+-----+
5 rows in set (0.23 sec)

This is almost three times as fast as without the projection. Just like for range queries, it's possible to substantially speed up GROUP BY queries with a projection. For this GROUP BY optimization to apply, the projection must shard on the grouping key(s), or a subset of the grouping keys (e.g. a projection that shards by "zip" can be used to do local GROUP BY when grouping on "state, zip").

Collocated join speedup

Continuing our example, suppose we have an orders table that can join to customer on customer.id:

create table orders(oid text, cid int, description text);

Let's assume there is one order per customer. We can mock that up with this INSERT:

insert into orders select uuid(), id, uuid() from customer;

Let's examine the plan when we join these two tables:

explain select * from customer c join orders o on (c.id = o.cid);

The plan repartitions the orders table, and repartitioning can be expensive. Here's an excerpt of the explain plan showing that:

| | Repartition [o.oid, o.cid, o.description] AS r0 … est_rows:33,554,432
| | ColumnStoreScan db1.orders AS o, SORT KEY __UNORDERED ()
est_table_rows:33,554,432 est_filtered:33,554,432

We know that the customer table is sharded on id. So if we create a projection that shards the orders table on cid, a collocated join will be possible.

create projection p_orders_cid (shard key(cid), sort key()) as
select * from orders;

Here's the EXPLAIN plan for the query now that we have the new projection. Lines are truncated on the right to fit: 

+---------------------------------------------------------------------
| EXPLAIN
+---------------------------------------------------------------------
| Gather partitions:all est_rows:33,818,640 alias:remote_0 parallelism
| Project [c.id, c.address, c.city, c.state, c.zip, c.aquisition_date,
| HashJoin
| |---HashTableProbe [o.cid = c.id]
| | HashTableBuild alias:o
| | Project [o_0.oid, o_0.cid, o_0.description] est_rows:33,554,432
| | ColumnStoreFilter [<after per-thread scan begin> AND <before per
| | ColumnStoreScan db1.orders PROJECTION p_orders_cid AS o_0, SORT
| ColumnStoreFilter [<after per-thread scan begin> AND <before per-thr
| ColumnStoreScan db1.customer AS c, SORT KEY aquisition_date (aquisit
+---------------------------------------------------------------------

Notice there is no Repartition operation. The only cross-node data movement operation is the Gather at the top. This is a collocated join. Let's modify the query so it does not produce so much output. This also gets a collocated join:

select *
from customer c join orders o on (c.id = o.cid) order by c.id limit 5;

Running it takes 5.88 sec. We can add a hint to it to not use the projection:

select *
select *
from customer c join orders o
with (disable_projection_replacement = 1) on (c.id = o.cid)
order by c.id
limit 5;

This takes 8.50 sec. As with range-filter and GROUP-BY optimization, projections can help.

In this example, it would have been better to have the base table of "orders" sharded on cid. There's no substitute for having the right physical design of each base table. But if you need two different structures to satisfy two or more queries, projections are a great tool.

COUNT(DISTINCT …) speedup

Certain COUNT(DISTINCT …) queries can also benefit from projections. Using the example table customer(...) we get these results:

select count(distinct zip) from customer;
+---------------------+
| count(distinct zip) |
+---------------------+
| 100000 |
+---------------------+
1 row in set (0.86 sec)

Here's an abridged version of the EXPLAIN plan for the query:

| Project [CAST($0 AS SIGNED) AS `count(distinct zip)`] est_rows:1
| Repartition [customer.zip] AS r0 …
| ShuffleGroupBy [] groups:[customer.zip]
| ColumnStoreScan db1.customer, SORT KEY aquisition_date …

It shows the projection p_zip is not being used, and a ShuffleGroupBy and Repartition are employed (potentially expensive data movement). This is needed because the customer base table isn't sharded on zip.

We can try again with a hint to force use of the projection that is sharded on zip.

select count(distinct zip)
from customer with(use_projection = "p_zip");

A shortened EXPLAIN plan for this is:

| Project …
| Aggregate [SUM(remote_0.`count(distinct zip)`) AS $0]
| Gather partitions:all est_rows:1
| Project [`count(distinct zip)`] est_rows:1
| Aggregate [HASH_COUNT_DISTINCT(customer.zip) AS `count(distinct
zip)`]
| ColumnStoreFilter …
| ColumnStoreScan db1.customer PROJECTION p_zip …

This does not have a ShuffleGroupBy or Repartition. The only data movement operator is an inexpensive Gather at the top that collects the count from each partition.

If you have to shard on one key for a specific reason (like to enable a collocated join) but need sharding on another key to enable faster COUNT(DISTINCT …) calculation, a projection can help. Depending on your data and application, it's possible to get impressive, nearly order-of-magnitude speedups for COUNT(DISTINCT …) operations with projections.

Improving row lookup throughput

Another use of projections is to improve row lookup throughput when you are looking up rows based on a key value. Prior to the introduction of projections, any key lookup where the search key didn't contain the shard key would require a broadcast to every leaf node to look up rows having that key on that node.

If the lookup key contains the shard key, then a point-to-point message can be sent from the aggregator node to the leaf node containing the key, using the hash function used for sharding. Consider this example:

create table t(a bigint, b bigint, c bigint, shard key(a), sort
key(a));

See Appendix B for how to load this with 16M rows of random data. Now, we can create a projection on this:

create projection p_b (shard key(b), sort key(b)) as select * from t;

The plan to lookup a row on t.a is a one-partition plan:

explain select * from t where a = 428465751512;

(The following is an excerpt of the plan):

| Gather partitions:single alias:remote_0 parallelism_level:partition
| Project [t.a, t.b, t.c]
| ColumnStoreFilter [t.a = 428465751512]
| ColumnStoreScan db2.t, SORT KEY a (a) table_type:sharded_columnstore

Notice that the Gather says "partitions:single". This means that the aggregator node sends a message just to the node containing the partition that might contain the requested key (t.a value). Given that we created p_b, we can get the same type of plan for lookups on b:

explain select * from t with (use_projection = "p_b")
where b = 625703747662;
| Gather partitions:single alias:remote_0 parallelism_level:partition
| Project [t.a, t.b, t.c]
| ColumnStoreFilter [t.b = 625703747662]
| ColumnStoreScan db2.t PROJECTION p_b, SORT KEY b (b) table_type:shar

The hint is currently required in this case. Otherwise the projection won't be used, and you'll see Gather partitions:all indicating a broadcast Gather. You can further optimize this example by adding hash index keys on a and b on their respective projections.

the-future-of-projectionsThe future of projections

Projections is now in public preview. Projections are usable in production, but the public preview status indicates that they're new and we're continuing to improve them. Near term, we will continue to improve the query optimizer so it uses projections when they are beneficial without requiring any query hints.

Longer term, there are many things we can build on top of projections, like allowing"

  • Multiple UNIQUE constraints on different columns of a table
  • Online CREATE PROJECTION
  • Online ALTER of the shard key and sort key of the primary table structure
  • Bookmark lookups back into the base table for selective queries to retrieve missing columns of a table (e.g. table t is sharded on x, projection p is sharded on y with a hash index on y but does not include all columns of t, and you want to do "select * from t where y = constant" extremely fast).

Furthermore, a projection is a basic kind of materialized view. Many of the underpinnings of projections can be used to build general materialized views into SingleStore, which could have  filters, joins and aggregations.

conclusionConclusion

Projections are an amazingly powerful tool for speeding up different kinds of queries, both for analytical and transactional use cases. We've shown how to speed up range filters, group-by queries, collocated joins and distinct counts with projections. 

As with any kind of index, there can be too much of a good thing. Use projections sparingly, only when there's no alternative base table organization that will do everything you need. If you come up with a new way to use projections, please let us know on our Forums, and share your findings with other SingleStore users! 

We're also excited about all the things we'll be able to do with the foundation provided by projections. 

Appendix A: Customer data set

We use the following data set modeling about 33.5 million customer records in examples.

create table customer(id int, address text, city text,
state char(2), zip char(5), aquisition_date date,
last_order_date date, sort key(aquisition_date), shard key(id));
insert into customer
select 1,uuid(), substr(uuid(),1,13),
substr(uuid(),1,2),
substr(trunc(rand()*100000),1,5),
date_add(now(), interval -trunc(rand()*3650) day),
date_add(now(), interval -trunc(rand()*365) day);
delimiter //
do
declare n int;
begin
select count(*) into n from customer;
while n < 32*1000*1000 loop
insert into customer
select id+(select max(id) from customer), /* gives unique serial
numbers for each customer */
uuid(), substr(uuid(),1,13),
substr(uuid(),1,2),
substr(trunc(rand()*100000),1,5),
date_add(now(), interval -trunc(rand()*3650) day),
date_add(now(), interval -trunc(rand()*365) day)
from customer;
select count(*) into n from customer;
end loop;
end //
delimiter ;
/* make sure no order happened before customer started */
update customer set last_order_date = aquisition_date
where aquisition_date > last_order_date;
/* do this for repeatable results; forces one sorted run */
optimize table customer full;

Appendix B: Simple three-column table

To help with some examples, we generate a simple 3-column table t of nearly unique random integers like so:

drop table if exists t;
create table t(a bigint, b bigint, c bigint,
shard key(a), sort key(a));
insert into t values
(trunc(1000000000000*rand()),trunc(1000000000000*rand()),
trunc(1000000000000*rand()));
delimiter //
do
declare n bigint = 1000000000000;
declare c int;
begin
select count(*) into c from t;
while c < 16*1000*1000 loop
insert into t
select trunc(n*rand()), trunc(n*rand()), trunc(n*rand())
from t;
select count(*) into c from t;
end loop;
end
//
delimiter ;

Share