Welcome, DynamoDB x Rockset developers! This technical blog will show you how to integrate your DynamoDB database with SingleStore.

The combination of DynamoDB and SingleStore is a powerful architecture that leverages the former as a fast, NoSQL, OLTP layer — and the latter as a scalable, multimodel OLAP layer. This augmentation architecture is very common for SingleStore developers; you will also find some consolidate their DynamoDB workload directly into SingleStore.
Let’s state the facts — there are no analytical databases out there that have the native integration to DynamoDB that Rockset has. At SingleStore, we aim to deliver the most simple alternative to you at the lowest cost.
In this walkthrough, we will specifically be discussing how to integrate DynamoDB with SingleStore via Apache Kafka using SingleStore Pipelines. The blog provides a step-by-step walkthrough — from initial setup to final data validation — emphasizing why SingleStore offers a compelling alternative for Rockset customers.
If this integration option is not quite what you need, feel free to scroll down where we detail alternatives.
In this architecture change, data moves efficiently from DynamoDB to SingleStore, leveraging Kafka for seamless, scalable streaming and SingleStore's real-time pipelines. It ensures minimal downtime, offering an ideal solution for customers migrating from Rockset to SingleStore.
Streaming data from DynamoDB to Kafka
Since you already have data in DynamoDB, the first step is streaming this data to Kafka. This allows us to seamlessly transfer data to SingleStore.
Kafka setup and topic creation
1bin/kafka-topics.sh --create --topic customer_topic --bootstrap-server 2localhost:9092 --partitions 83bin/kafka-topics.sh --create --topic lineitem_topic --bootstrap-server4localhost:9092 --partitions 85bin/kafka-topics.sh --create --topic orders_topic --bootstrap-server6localhost:9092 --partitions 87 8bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Python script to stream data to Kafka
The following is sample code to use to stream data from Kafka. This code can be housed in SingleStore Notebooks, or scheduled through SingleStore Job Service.
1import boto32import json3from kafka import KafkaProducer4from decimal import Decimal5 6# Initialize DynamoDB resource7dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')8 9# Custom JSON encoder to handle Decimal types10class DecimalEncoder(json.JSONEncoder):11 def default(self, obj):12 if isinstance(obj, Decimal):13 return float(obj)14 return super(DecimalEncoder, self).default(obj)15 16# Initialize Kafka producer with custom JSON encoder17producer = KafkaProducer(18 bootstrap_servers='<your_kafak_machine_ip>:9092',19 value_serializer=lambda v: json.dumps(v,20cls=DecimalEncoder).encode('utf-8')21)22 23# Function to get the record count from a DynamoDB table24def get_table_count(table_name):25 table = dynamodb.Table(table_name)26 response = table.scan(Select='COUNT')27 return response['Count']28 29# Function to read data from DynamoDB and stream to Kafka30def stream_table_to_kafka(table_name, topic_name):31 table = dynamodb.Table(table_name)32 record_count = 0 # Count of records streamed to Kafka33 34 try:35 response = table.scan()36 items = response['Items']37 38 for item in items:39 producer.send(topic_name, item)40 record_count += 141 if record_count % 1000 == 0:42 print(f'{record_count} records streamed from {table_name} to 43{topic_name}')44 45 # Continue scanning if there are more items46 while 'LastEvaluatedKey' in response:47 response = 48table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])49 items = response['Items']50 for item in items:51 producer.send(topic_name, item)52 record_count += 153 if record_count % 1000 == 0:54 print(f'{record_count} records streamed from {table_name} 55to {topic_name}')56 57 print(f'Total records streamed from {table_name} to {topic_name}: 58{record_count}')59 60 # Validate the record count61 expected_count = get_table_count(table_name)62 if record_count == expected_count:63 print(f'Successfully streamed all records from {table_name} to 64{topic_name}.')65 else:66 print(f'Warning: Mismatch in record count for {table_name}. 67Expected: {expected_count}, Streamed: {record_count}')68 69 except Exception as e:70 print(f'Error streaming data from {table_name} to {topic_name}: {e}')71 finally:72 producer.flush() # Ensure all messages are sent before exiting73 74if __name__ == '__main__':75 # Define table names and corresponding Kafka topics76 tables_and_topics = {77 'customer': 'customer_topic',78 'lineitem': 'lineitem_topic',79 'orders': 'orders_topic'80 }81 82 try:83 # Stream data from each table to the corresponding Kafka topic one at a84time85 for table_name, topic_name in tables_and_topics.items():86 print(f'Starting to stream data from {table_name} to {topic_name}')87 stream_table_to_kafka(table_name, topic_name)88 print(f'Completed streaming data from {table_name} to 89{topic_name}')90 except Exception as e:91 print(f'Error in streaming process: {e}')92 finally:93 producer.close()94 print('Data streaming complete.')95
Ingesting data from Kafka to SingleStore
Finally, we'll create pipelines in SingleStore to ingest data from Kafka topics into SingleStore tables. This step showcases the power and simplicity of SingleStore's real-time data pipelines.
SingleStore table DDLs and pipeline creation
1CREATE DATABASE tpch PARTITIONS 8;2 3-- pipeline and table ddls:4-- Drop tables if they exist5DROP TABLE IF EXISTS orders;6DROP TABLE IF EXISTS lineitem;7DROP TABLE IF EXISTS customer;8 9CREATE TABLE orders (10 o_orderkey text NOT NULL,11 o_custkey text NOT NULL,12 o_orderstatus text NOT NULL,13 o_totalprice text NOT NULL,14 o_orderdate text NOT NULL,15 o_orderpriority text NOT NULL,16 o_clerk text NOT NULL,17 o_shippriority text NOT NULL,18 o_comment text NOT NULL,19 UNIQUE KEY pk (o_orderkey) UNENFORCED RELY,20 SHARD KEY __SHARDKEY (o_orderkey),21 SORT KEY o_orderkey (o_orderkey)22) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL23 AUTOSTATS_HISTOGRAM_MODE=CREATE24 AUTOSTATS_SAMPLING=ON25 SQL_MODE='STRICT_ALL_TABLES';26 27CREATE TABLE lineitem (28 l_orderkey text NOT NULL,29 l_partkey text NOT NULL,30 l_suppkey text NOT NULL,31 l_linenumber text NOT NULL,32 l_quantity text NOT NULL,33 l_extendedprice text NOT NULL,34 l_discount text NOT NULL,35 l_tax text NOT NULL,36 l_returnflag text NOT NULL,37 l_linestatus text NOT NULL,38 l_shipdate text NOT NULL,39 l_commitdate text NOT NULL,40 l_receiptdate text NOT NULL,41 l_shipinstruct text NOT NULL,42 l_shipmode text NOT NULL,43 l_comment text NOT NULL,44 UNIQUE KEY pk (l_orderkey, l_linenumber) UNENFORCED RELY,45 SHARD KEY __SHARDKEY (l_orderkey),46 SORT KEY l_orderkey (l_orderkey)47) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL48 AUTOSTATS_HISTOGRAM_MODE=CREATE49 AUTOSTATS_SAMPLING=ON50 SQL_MODE='STRICT_ALL_TABLES';51 52CREATE TABLE customer (53 c_custkey text NOT NULL,54 c_name text NOT NULL,55 c_address text NOT NULL,56 c_nationkey text NOT NULL,57 c_phone text NOT NULL,58 c_acctbal text NOT NULL,59 c_mktsegment text NOT NULL,60 c_comment text NOT NULL,61 UNIQUE KEY pk (c_custkey) UNENFORCED RELY,62 SHARD KEY __SHARDKEY (c_custkey),63 SORT KEY c_custkey (c_custkey)64) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL65 AUTOSTATS_HISTOGRAM_MODE=CREATE66 AUTOSTATS_SAMPLING=ON67 SQL_MODE='STRICT_ALL_TABLES';68 69-- Create pipeline for orders table70CREATE OR REPLACE PIPELINE orders_pipeline AS71LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/orders_topic'72INTO TABLE orders73(74 o_orderstatus <- o_orderstatus, o_clerk <- o_clerk,75 o_orderdate <- o_orderdate,o_shippriority <- o_shippriority,76 o_custkey <- o_custkey,o_totalprice <- o_totalprice,77 o_orderkey <- o_orderkey,o_comment <- o_comment,78 o_orderpriority <- o_orderpriority79)80FORMAT JSON;81 82 83-- Create pipeline for lineitem table84CREATE OR REPLACE PIPELINE lineitem_pipeline AS85LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/lineitem_topic'86INTO TABLE lineitem87(88 l_orderkey <- l_orderkey,89 l_partkey <- l_partkey,l_suppkey <- l_suppkey,90 l_linenumber <- l_linenumber,l_quantity <- l_quantity,91 l_extendedprice <- l_extendedprice,92 l_discount <- l_discount,l_tax <- l_tax,93 l_returnflag <- l_returnflag,l_linestatus <- l_linestatus,94 l_shipdate <- l_shipdate,95 l_commitdate <- l_commitdate,l_receiptdate <- l_receiptdate,96 l_shipinstruct <- l_shipinstruct,l_shipmode <- l_shipmode,97 l_comment <- l_comment98)99FORMAT JSON;100 101-- Create pipeline for customer table102CREATE OR REPLACE PIPELINE customer_pipeline AS103LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/customer_topic'104INTO TABLE customer105(106 c_custkey <- c_custkey,107 c_name <- c_name,108 c_address <- c_address,109 c_nationkey <- c_nationkey,110 c_phone <- c_phone,111 c_acctbal <- c_acctbal,112 c_mktsegment <- c_mktsegment,113 c_comment <- c_comment114)115FORMAT JSON;116 117start pipeline orders_pipeline;118start pipeline customer_pipeline;119start pipeline lineitem_pipeline;
You can use the following query to check the timings for your pipeline.
1select2 pipeline_name PipelineName3 ,max(start_time) StartTime4 ,format(sum(rows_streamed),0) RowsStreamed5 ,format(sum(mb_streamed),2) MBStreamed6 ,format(sum(batch_time),0) BatchTimeSec7 ,format(sum(batch_time) / 60, 2) BatchTimeMin8 ,format(sum(rows_streamed) / sum(batch_time),0) RowsperSec9 ,format(sum(mb_streamed) / sum(batch_time),2) MBperSec10from11 information_schema.pipelines_batches_summary12where13 database_name = 'ai' -- replace with your database name14 and pipeline_name = 'orders_pipeline' -- replace with your pipeline name15group by16 pipeline_name17order by18 sum(rows_streamed) / sum(batch_time) desc;
Batch load Rockset data
Now that your real-time streams are set up, you will want to bulk load the rest of your data from Rockset. Since you’re already familiar with SingleStore pipelines by now, this part is super easy!
To move your data directly from Rockset to SingleStore using SingleStore Pipelines, follow these simple steps:
Step 1
Select the data from Rockset into S3 (or other object store). The following example shows the export of data into S3 in PARQUET format. You can also specify JSON format.
1INSERT INTO 's3://analyticsdata/query1'2 INTEGRATION = 's3export'3 FORMAT = (TYPE='PARQUET', INCLUDE_QUERY_ID=true)4SELECT * FROM commons.analytics
Step 2
Once the data is exported to S3 using the preceding command, you can set up the SingleStore pipeline directly from S3. Check out our documentation for a more detailed reference.
1CREATE PIPELINE <Pipeline_name> AS2LOAD DATA S3 's3://<bucket_name>/<filename>.json'3CONFIG '{"region":"us-west-2"}'4CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX", 5 "aws_secret_access_key": "XXXXXXXXXXX"}'6INTO TABLE keypairs7FORMAT PARQUET8(`key` <- keypairs::`key`,9`value` <- keypairs::`value`);10 11 12START PIPELINE Pipeline_name;
Once the pipeline loading is complete, we are done with the snapshot load stage of the migration.
Alternatives
We have more ways to replicate DynamoDB data to SingleStore:
- DynamoDB to SingleStore via AWS DMS. We have a few customers (like Particl) doing this in production today.
- DynamoDB to SingleStore via AWS SQS and Lambda functions. Lambda functions can transform messages in batches and generate SQL inserts, or update queries that can be run directly against the database.
Conclusion
Migrating from Rockset to SingleStore is a seamless process, thanks to SingleStore's robust and efficient real-time data pipelines. By following the steps outlined in this blog, you can easily integrate your existing DynamoDB system, ensuring minimal downtime and maximum efficiency.
SingleStore not only simplifies the migration process but also offers superior performance and real-time analytics database capabilities, making it an ideal choice for Rockset customers looking to enhance their data management solutions.
Remember, with SingleStore, you're not just migrating your data — you're upgrading your entire data infrastructure to a more powerful and scalable solution.