
Ingest data from MONGODB® to SingleStore using SQL commands
Notebook

![]() | ![]() |
When do you use SingleStore's native replication capability from MongoDB ?
SingleStore's native data replication gives you the ability to do one-time snapshot, or continuous change data capture CDC from MongoDB® to SingleStoreDB. This provides a quick and easy way to replicate data and power up analytics on MongoDB® data.
What you will learn in this notebook:
Replicate MongoDB® collections to SingleStore
Directly without transformations
Flattening required fields into columns of a table
Normalizing collection into multiple tables
1. Replicate directly without transformations
To replicate the required collections, provide the list of collections using "collection.include.list": "<Collection list>"
at the time of link creation, the parameter takes a comma-separated list of regular expressions that match collection names (in databaseName.collectionName format)
In [1]:
1
%%sql2
DROP DATABASE IF EXISTS sample_analytics;3
CREATE DATABASE sample_analytics;
Action Required
Make sure to select a database from the drop-down menu at the top of this notebook. It updates the connection_url to connect to that database.
In [2]:
1
%%sql2
CREATE LINK cdclink AS MONGODB3
CONFIG '{"mongodb.hosts":"ac-t7n47to-shard-00-00.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-01.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-02.tfutgo0.mongodb.net:27017",4
"collection.include.list": "sample_analytics.customers",5
"mongodb.ssl.enabled":"true",6
"mongodb.authsource":"admin",7
"mongodb.members.auto.discover": "true"8
}'9
CREDENTIALS '{10
"mongodb.user":"mongo_sample_reader",11
"mongodb.password":"SingleStoreRocks27017"12
}'
Check if the link got created
In [3]:
1
%%sql2
SHOW LINKS;
The following step automatically creates the required tables and pipelines on SingleStoreDB for every collection configured for replication
In [4]:
1
%%sql2
CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK cdclink '*' FORMAT AVRO;
Start pipelines to begin replicating the data
In [5]:
1
%%sql2
START ALL PIPELINES;
In [6]:
1
%%sql2
SHOW TABLES;
The customer collection from MongoDB are replicated into SingleStore in the default format of _id and _more BSON columns that are compatible with Kai API
In [7]:
1
%%sql2
SELECT (_id :> JSON),(_more :> JSON) FROM customers LIMIT 2;
2. Flattening required fields from document into columns
CDC replication also gives additional flexibility to define your own table structure at SingleStore as you bring in data from MongoDB collections. In the following examples data from MongoDB collections are transformed when brought to SingleStoreDB
Fields like username
, name
, email
are flattened into columns of the table and rest of the document is stored in _more column.
The following commands create a table, a stored procedure and a pipeline required for the data replication
In [8]:
1
%%sql2
CREATE TABLE `customers_flattened` (3
`_id` bson NOT NULL,4
`username` text CHARACTER SET utf8 COLLATE utf8_general_ci,5
`name` text CHARACTER SET utf8 COLLATE utf8_general_ci,6
`email` text CHARACTER SET utf8 COLLATE utf8_general_ci,7
`_more` bson NOT NULL COMMENT 'KAI_MORE' ,8
`$_id` as BSON_NORMALIZE_NO_ARRAY(`_id`) PERSISTED longblob COMMENT 'KAI_AUTO' ,9
SHARD KEY `__SHARDKEY` (`$_id`),10
UNIQUE KEY `__PRIMARY` (`$_id`) USING HASH,11
SORT KEY `__UNORDERED` ()12
)
In [9]:
1
%%sql2
CREATE OR REPLACE PROCEDURE `customers_apply_changes`(changes query(`__operation` int(11) NOT NULL, `_id` longblob NOT NULL, `_more` longblob NOT NULL))3
RETURNS void AS4
DECLARE rowsDeleted INT;5
BEGIN REPLACE INTO `sample_analytics`.`customers_flattened` SELECT `_id`:>BSON AS `_id`, BSON_EXTRACT_STRING(`_more`,'username') AS `username`, BSON_EXTRACT_STRING(`_more`,'name') AS `name`, BSON_EXTRACT_STRING(`_more`,'email') AS `email`,6
BSON_EXCLUDE_MASK(`_more`,'{"_id": 1,"username": 1,"name": 1,"email": 1}') AS `_more`FROM changes WHERE __operation != 1;7
SELECT count(*) INTO rowsDeleted FROM changes WHERE changes.__operation = 1;8
IF rowsDeleted > 0 THEN9
DELETE dest FROM `sample_analytics`.`customers_flattened` AS dest INNER JOIN changes ON dest.`$_id` = BSON_NORMALIZE_NO_ARRAY(changes.`_id`) WHERE changes.__operation = 1; END IF;10
END;
In [10]:
1
%%sql2
CREATE AGGREGATOR PIPELINE `customers_apply_changes`3
AS LOAD DATA LINK cdclink 'customers'4
BATCH_INTERVAL 25005
MAX_PARTITIONS_PER_BATCH 16
DISABLE OFFSETS METADATA GC7
REPLACE8
KEY(`_id`)9
INTO PROCEDURE `customers_apply_changes`10
FORMAT AVRO11
(12
__operation <- `__operation`,13
_id <- `payload`::`_id`,14
_more <- `payload`::`_more`15
)
In [11]:
1
%%sql2
START ALL PIPELINES;
In [12]:
1
%%sql2
SHOW TABLES;
In [13]:
1
%%sql2
SELECT _id :> JSON,username, name, email, _more :> JSON FROM customers_flattened LIMIT 10;
3. Normalize a collection into multiple tables
In the following example a collection of MongoDB is normalized into two different tables on SingleStore.
In [14]:
1
%%sql2
DROP DATABASE IF EXISTS sample_airbnb;3
CREATE DATABASE sample_airbnb;
Action Required
Make sure to select a database from the drop-down menu at the top of this notebook. It updates the connection_url to connect to that database.
In [15]:
1
%%sql2
CREATE LINK source_listingsAndReviews AS MONGODB3
CONFIG '{"mongodb.hosts":"ac-t7n47to-shard-00-00.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-01.tfutgo0.mongodb.net:27017,ac-t7n47to-shard-00-02.tfutgo0.mongodb.net:27017",4
"collection.include.list": "sample_airbnb.*",5
"mongodb.ssl.enabled":"true",6
"mongodb.authsource":"admin",7
"mongodb.members.auto.discover": "true"8
}'9
CREDENTIALS '{10
"mongodb.user":"mongo_sample_reader",11
"mongodb.password":"SingleStoreRocks27017"12
}'
In [16]:
1
%%sql2
SHOW LINKS;
In [17]:
1
%%sql2
CREATE TABLE `listings` (3
`_id` BSON NOT NULL,4
`name` text CHARACTER SET utf8 COLLATE utf8_general_ci,5
`access` text CHARACTER SET utf8 COLLATE utf8_general_ci,6
`accommodates` int(11) DEFAULT NULL,7
`_more` BSON NOT NULL,8
`$_id` as BSON_NORMALIZE_NO_ARRAY(`_id`) PERSISTED longblob,9
SHARD KEY `__SHARDKEY` (`$_id`),10
UNIQUE KEY `__PRIMARY` (`$_id`) USING HASH,11
SORT KEY `__UNORDERED` ()12
)
In [18]:
1
%%sql2
CREATE TABLE `reviews` (3
`listingid` BSON NOT NULL,4
`review_scores_accuracy` int(11) DEFAULT NULL,5
`review_scores_cleanliness` int(11) DEFAULT NULL,6
`review_scores_rating` text CHARACTER SET utf8 COLLATE utf8_general_ci,7
`$listingid` as BSON_NORMALIZE_NO_ARRAY(`listingid`) PERSISTED longblob,8
SHARD KEY `__SHARDKEY` (`$listingid`),9
UNIQUE KEY `__PRIMARY` (`$listingid`) USING HASH,10
SORT KEY `__UNORDERED` ()11
)
In [19]:
1
%%sql2
CREATE OR REPLACE PROCEDURE `listingsAndReviews_apply_changes`(changes query(`__operation` int(11) NOT NULL, `_id` longblob NOT NULL, `_more` longblob NOT NULL))3
RETURNS void AS4
DECLARE rowsDeleted INT;5
BEGIN6
7
REPLACE INTO `listings` SELECT `_id`:>BSON AS `_id`, BSON_EXTRACT_STRING(`_more`,'name') AS `name`, BSON_EXTRACT_STRING(`_more`,'access') AS `access`,8
BSON_EXTRACT_BIGINT(`_more`,'accommodates') AS `accommodates`, BSON_EXCLUDE_MASK(`_more`,'{"_id": 1,"name": 1,"review_scores": 1,"access" : 1, "accommodates" : 1}') AS `_more`9
FROM changes WHERE __operation != 1;10
11
REPLACE INTO `reviews` SELECT `_id`:>BSON AS `listingid`, BSON_EXTRACT_BIGINT(`_more`,'review_scores','review_scores_accuracy') AS `review_scores_accuracy`,12
BSON_EXTRACT_BIGINT(`_more`,'review_scores','review_scores_cleanliness') AS `review_scores_cleanliness`, BSON_EXTRACT_BIGINT(`_more`,'review_scores','review_scores_rating') AS `review_scores_rating`13
FROM changes WHERE __operation != 1;14
15
SELECT count(*) INTO rowsDeleted FROM changes WHERE changes.__operation = 1;16
IF rowsDeleted > 0 THEN17
DELETE dest FROM `listings` AS dest INNER JOIN changes ON dest.`$_id` = BSON_NORMALIZE_NO_ARRAY(changes.`_id`) WHERE changes.__operation = 1;18
DELETE dest FROM `reviews` AS dest INNER JOIN changes ON dest.`$listingid` = BSON_NORMALIZE_NO_ARRAY(changes.`_id`) WHERE changes.__operation = 1;19
END IF;20
21
END;
In [20]:
1
%%sql2
CREATE AGGREGATOR PIPELINE `listingsAndReviews`3
AS LOAD DATA LINK source_listingsAndReviews 'sample_airbnb.listingsAndReviews'4
BATCH_INTERVAL 25005
MAX_PARTITIONS_PER_BATCH 16
DISABLE OFFSETS METADATA GC7
REPLACE8
KEY(`_id`)9
INTO PROCEDURE `listingsAndReviews_apply_changes`10
FORMAT AVRO11
(12
__operation <- `__operation`,13
_id <- `payload`::`_id`,14
_more <- `payload`::`_more`15
)
In [21]:
1
%%sql2
START ALL PIPELINES;
In [22]:
1
%%sql2
SHOW TABLES;
In [23]:
1
%%sql2
SELECT _id:>JSON ,name, access, accommodates FROM listings LIMIT 10;
In [24]:
1
%%sql2
SELECT listingid:>JSON, review_scores_accuracy,review_scores_cleanliness, review_scores_rating FROM reviews LIMIT 10;

Details
About this Template
Setup CDC Replication from MongoDB ® to SingleStore using SQL commands.
This Notebook can be run in Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.
See Notebook in action
Launch this notebook in SingleStore and start executing queries instantly.