Why can't SingleStore generate the correct query plan for LEFT JOINs?

I can’t get SingleStore’s query planner to LEFT JOIN a columnstore and rowstore using the correct algorithm. It always ends up doing a full scan of the columnstore on the right of the join. This is really curious because I noticed on an INNER JOIN, it uses the correct algorithm, so I know it’s possible.

Here are my tables. Names changed with ctrl+f to protect the innocent. One table with 4 billion rows

CREATE TABLE `OrderItems` (
  `CustomerID` int(11) NOT NULL,
  `OrderID` int(11) NOT NULL,
  `ItemTypeID` int(11) NOT NULL,
  `ActivityDate` datetime DEFAULT NULL,
  `LastModified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  KEY `CustomerID` (`CustomerID`,`OrderID`) USING CLUSTERED COLUMNSTORE,
  SHARD KEY `CustomerID_2` (`CustomerID`),
  KEY `Hash_ItemTypeID` (`ItemTypeID`) USING HASH,
  KEY `Hash_ActivityDate` (`ActivityDate`) USING HASH
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL AUTOSTATS_HISTOGRAM_MODE=CREATE AUTOSTATS_SAMPLING=ON SQL_MODE='STRICT_ALL_TABLES'

Since the full natural key I need to join on is represented, I can see that I either need indexes on all those columns or have them part of the columnstore key. I have tried both with the same results:

 create index Hash_ItemTypeID using hash on OrderItems (ItemTypeID);
 create index Hash_ActivityDate using hash on OrderItems (ActivityDate);

The next table fills up to some size batches, possibly up to 50k-100k, and then gets truncated. For this experiment, it didn’t seem to matter
much how big the table was, the query plan was the same, and I dropped the plancache and analyzed the tables frequently to remove noise from the experiment.

CREATE TABLE `OrderIncomingItems` (
  `CustomerID` int(11) DEFAULT NULL,
  `OrderID` int(11) DEFAULT NULL,
  `ItemTypeID` int(11) DEFAULT NULL,
  `ActivityDate` datetime DEFAULT NULL,
  `LastModified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  SHARD KEY `CustomerID` (`CustomerID`),
  KEY `Hash_CustomerID` (`CustomerID`),
  KEY `ActivityDate` (`ActivityDate`),
  KEY `OrderID` (`OrderID`),
  KEY `ItemTypeID` (`ItemTypeID`)
) AUTOSTATS_CARDINALITY_MODE=PERIODIC AUTOSTATS_HISTOGRAM_MODE=CREATE SQL_MODE='STRICT_ALL_TABLES'

I need to join these on the natural key to figure out if any of the “Incoming” rows are inserts vs updates.

I have several involved queries in a sproc that all behave slowly. Here is the simplest:

explain select count(*)
from OrderIncomingItems as cf
left join OrderItems AS cfa
    ON  cfa.CustomerID = cf.CustomerID
	AND cfa.OrderID = cf.OrderID
	AND cfa.ItemTypeID = cf.ItemTypeID
	AND cfa.ActivityDate = cf.ActivityDate
	where cfa.OrderID is null

Project [CAST(COALESCE($0,0) AS SIGNED) AS `count(*)`] est_rows:1
Top limit:[@@SESSION.`sql_select_limit`]
Aggregate [SUM(remote_0.`count(*)`) AS $0]
Gather partitions:all est_rows:1 alias:remote_0
Project [`count(*)`] est_rows:1 est_select_cost:236
Aggregate [COUNT(*) AS `count(*)`]
Filter [cfa.OrderID IS NULL]
HashJoin type:right
|---HashTableProbe [cfa.CustomerID = SUBQ_VWW_0.CustomerID AND cfa.OrderID = SUBQ_VWW_0.OrderID AND cfa.ItemTypeID = SUBQ_VWW_0.ItemTypeID AND cfa.ActivityDate = SUBQ_VWW_0.ActivityDate]
| HashTableBuild alias:SUBQ_VWW_0
| Project [cf.CustomerID, cf.OrderID, cf.ItemTypeID, cf.ActivityDate] est_rows:118
| TableScan MarketplaceReporting.OrderIncomingItems AS cf table_type:sharded_rowstore est_table_rows:118 est_filtered:118
ColumnStoreScan MarketplaceReporting.OrderItems AS cfa, KEY CustomerID (CustomerID, OrderID) USING CLUSTERED COLUMNSTORE table_type:sharded_columnstore est_table_rows:3,571,023,471 est_filtered:3,571,023,471

The above plan is wrong because it does a full columnstore scan on the 4 billion row table. Oddly. I discovered on an inner join it does it correctly.

explain select count(*)
from OrderIncomingItems as cf
inner join OrderItems AS cfa
    ON  cfa.CustomerID = cf.CustomerID
	AND cfa.OrderID = cf.OrderID
	AND cfa.ItemTypeID = cf.ItemTypeID
	AND cfa.ActivityDate = cf.ActivityDate
    where cfa.OrderID is null

Project [CAST(COALESCE($0,0) AS SIGNED) AS `count(*)`] est_rows:1
Top limit:[@@SESSION.`sql_select_limit`]
Aggregate [SUM(remote_0.`count(*)`) AS $0]
Gather partitions:all est_rows:1 alias:remote_0
Project [`count(*)`] est_rows:1 est_select_cost:2
Aggregate [COUNT(*) AS `count(*)`]
HashJoin
|---HashTableProbe [cfa.CustomerID = cf.CustomerID AND cfa.OrderID = cf.OrderID AND cfa.ItemTypeID = cf.ItemTypeID AND cfa.ActivityDate = cf.ActivityDate]
| HashTableBuild alias:cfa
| Project [cfa_0.CustomerID, cfa_0.OrderID, cfa_0.ItemTypeID, cfa_0.ActivityDate] est_rows:1
| ColumnStoreFilter [0]
| ColumnStoreScan MarketplaceReporting.OrderItems AS cfa_0, KEY CustomerID (CustomerID, OrderID) USING CLUSTERED COLUMNSTORE table_type:sharded_columnstore est_table_rows:3,571,023,471 est_filtered:1
BloomFilter table:cfa fields:[cf.CustomerID, cf.OrderID, cf.ItemTypeID, cf.ActivityDate]
Filter [0]
IndexRangeScan MarketplaceReporting.OrderIncomingItems AS cf, KEY OrderID (OrderID) scan:[OrderID IS NULL] table_type:sharded_rowstore est_table_rows:118 est_filtered:1

I think with different sizes in the right-hand table, I can get a slightly different plan on the inner join (nested loop vs hash join) but the central problem always remains–
tl;dr: the left join joins the tables in the wrong order. The inner join correctly loops the smaller table and connects it to the larger table using whatever indexes are available. The left join, though,
scans the entire 4 billion row larger table, and hash joins to the smaller table. This makes no sense.

I’ve tried everything, including changing the columnstore key, remaking the tables so that all columns are nullable, and using USE/FORCE/IGNORE query hints. I’ve tried rewriting with nested SELECTS but the only ones I can come up with are equivalent.

Why can’t SingleStore generate the correct query plan? I am a hardcore NoSQLer, I can write these algorithms correctly in my sleep in an imperative language, seems the query planner has one job and can’t do it at all. In SingleStore it’s even worse than usual, it seems.

Hi @pkaminsky – thanks for the detailed post. So, are you saying that OrderIncomingItems has 50,000 to 100,000 rows, typically, but it doesn’t matter how big it is for plan selection in this case?

Hi @pkaminsky, this is a known limitation with joining a columnstore table with LEFT JOINS - they were not able to take advantage of filters the same way as INNER JOINs. And good news is we have already built an enhancement to address this case which will be available in an upcoming update, expected to be available in the next couple weeks. In the meantime, unfortunately there is not much you can do to work around the issue unless you can avoid the outer join.

@jack Is that also a reason it joins the tables in the wrong order? I ask to left join, with the small table on the left, but instead it starts with a full scan of the large table and hash joins it to the smaller table. However, if I index all columns on the big table, then intuitively I would expect it to scan the small table into an indexed nested loop join. The comparison is often given as O(l log r) vs O(l + r) which would greatly favor the former by many orders of magnitude given the sizes of my tables. This is how the query plan should work intuitively, as well as in any other database, as well as how I would write the algorithms myself if this god-forsaken industry would allow me. To me it seems like the problem is bigger than just not using the columnstore filters. The algorithm is wrong. Someone has a greater-than or less-than sign facing the wrong direction in the code.

We support dynamic hash join/loop join when joining to columnstore tables where there’s an index on the join column on the probe side table. That means you’ll see a hash join in your query plan, but at runtime, if the build side is small, it will internally flip it around and do the equivalent of a loop join, searching the index on the join column of the probe side table. This blog talks about that:

I’m not sure if that’ll work for your scenario yet, but it may. Plus, we’re working on some multi-column key extensions for columnstores (universal storage) for our next release, so if it doesn’t work yet (after the fix Jack mentioned), it may improve in the next release.

Yeah, what Eric said is exactly right. To add to that, the order actually is the right order, but it can be a bit confusing to understand - the way to read the HashJoin in explain is you see the OrderIncomingItems (small table) under HashTableBuild - that is the side we are scanning to put into a hash table, which is the first step. On the other side is OrderItems (big table), which is the hash probe side. With the improvement I mentioned, we will use the rows from the small table to filter into the big table and retrieve the matching rows efficiently. When we need to lookup many rows, this is done by using a bloom filter we built from the small table to quickly filter rows as we scan the big table; and when we only need to lookup a few rows like in your case, it’s done as Eric described above by essentially doing a loop join where we lookup each row in the big table using the indexes.

2 Likes

You put really very helpful information. Keep it up. Keep blogging. Looking to reading your next post!

1 Like

Hi @knight1983_1 – thank you! Yes, we’ll definitely keep blogging.