Integrating DynamoDB + SingleStore

3 min read

Jun 27, 2024

Welcome, DynamoDB x Rockset developers! This technical blog will show you how to integrate your DynamoDB database with SingleStore.

Integrating DynamoDB + SingleStore

The combination of DynamoDB and SingleStore is a powerful architecture that leverages the former as a fast, NoSQL, OLTP layer — and the latter as a scalable, multimodel OLAP layer. This augmentation architecture is very common for SingleStore developers; you will also find some consolidate their DynamoDB workload directly into SingleStore. 

Let’s state the facts — there are no analytical databases out there that have the native integration to DynamoDB that Rockset has. At SingleStore, we aim to deliver the most simple alternative to you at the lowest cost.

In this walkthrough, we will specifically be discussing how to integrate DynamoDB with SingleStore via Apache Kafka using SingleStore Pipelines. The blog provides a step-by-step walkthrough — from initial setup to final data validation — emphasizing why SingleStore offers a compelling alternative for Rockset customers.

If this integration option is not quite what you need, feel free to scroll down where we detail alternatives.

In this architecture change, data moves efficiently from DynamoDB to SingleStore, leveraging Kafka for seamless, scalable streaming and SingleStore's real-time pipelines. It ensures minimal downtime, offering an ideal solution for customers migrating from Rockset to SingleStore. 

streaming-data-from-dynamo-db-to-kafkaStreaming data from DynamoDB to Kafka

Since you already have data in DynamoDB, the first step is streaming this data to Kafka. This allows us to seamlessly transfer data to SingleStore.

Kafka setup and topic creation

1bin/kafka-topics.sh --create --topic customer_topic --bootstrap-server 2localhost:9092 --partitions 83bin/kafka-topics.sh --create --topic lineitem_topic --bootstrap-server4localhost:9092 --partitions 85bin/kafka-topics.sh --create --topic orders_topic --bootstrap-server6localhost:9092 --partitions 87
8bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Python script to stream data to Kafka
The following is sample code to use to stream data from Kafka. This code can be housed in SingleStore Notebooks, or scheduled through SingleStore Job Service.

1import boto32import json3from kafka import KafkaProducer4from decimal import Decimal5
6# Initialize DynamoDB resource7dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')8
9# Custom JSON encoder to handle Decimal types10class DecimalEncoder(json.JSONEncoder):11    def default(self, obj):12        if isinstance(obj, Decimal):13            return float(obj)14        return super(DecimalEncoder, self).default(obj)15
16# Initialize Kafka producer with custom JSON encoder17producer = KafkaProducer(18    bootstrap_servers='<your_kafak_machine_ip>:9092',19    value_serializer=lambda v: json.dumps(v,20cls=DecimalEncoder).encode('utf-8')21)22
23# Function to get the record count from a DynamoDB table24def get_table_count(table_name):25    table = dynamodb.Table(table_name)26    response = table.scan(Select='COUNT')27    return response['Count']28
29# Function to read data from DynamoDB and stream to Kafka30def stream_table_to_kafka(table_name, topic_name):31    table = dynamodb.Table(table_name)32    record_count = 0  # Count of records streamed to Kafka33
34    try:35        response = table.scan()36        items = response['Items']37
38        for item in items:39            producer.send(topic_name, item)40            record_count += 141            if record_count % 1000 == 0:42                print(f'{record_count} records streamed from {table_name} to 43{topic_name}')44
45        # Continue scanning if there are more items46        while 'LastEvaluatedKey' in response:47            response = 48table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])49            items = response['Items']50            for item in items:51                producer.send(topic_name, item)52                record_count += 153                if record_count % 1000 == 0:54                    print(f'{record_count} records streamed from {table_name} 55to {topic_name}')56
57        print(f'Total records streamed from {table_name} to {topic_name}: 58{record_count}')59
60        # Validate the record count61        expected_count = get_table_count(table_name)62        if record_count == expected_count:63            print(f'Successfully streamed all records from {table_name} to 64{topic_name}.')65        else:66            print(f'Warning: Mismatch in record count for {table_name}. 67Expected: {expected_count}, Streamed: {record_count}')68
69    except Exception as e:70        print(f'Error streaming data from {table_name} to {topic_name}: {e}')71    finally:72        producer.flush()  # Ensure all messages are sent before exiting73
74if __name__ == '__main__':75    # Define table names and corresponding Kafka topics76    tables_and_topics = {77        'customer': 'customer_topic',78        'lineitem': 'lineitem_topic',79        'orders': 'orders_topic'80    }81
82    try:83        # Stream data from each table to the corresponding Kafka topic one at a84time85        for table_name, topic_name in tables_and_topics.items():86            print(f'Starting to stream data from {table_name} to {topic_name}')87            stream_table_to_kafka(table_name, topic_name)88            print(f'Completed streaming data from {table_name} to 89{topic_name}')90    except Exception as e:91        print(f'Error in streaming process: {e}')92    finally:93        producer.close()94        print('Data streaming complete.')95

ingesting-data-from-kafka-to-singlestoreIngesting data from Kafka to SingleStore

Finally, we'll create pipelines in SingleStore to ingest data from Kafka topics into SingleStore tables. This step showcases the power and simplicity of SingleStore's real-time data pipelines.

SingleStore table DDLs and pipeline creation

1CREATE DATABASE tpch PARTITIONS 8;2
3-- pipeline and table ddls:4-- Drop tables if they exist5DROP TABLE IF EXISTS orders;6DROP TABLE IF EXISTS lineitem;7DROP TABLE IF EXISTS customer;8
9CREATE TABLE orders (10    o_orderkey text NOT NULL,11    o_custkey text NOT NULL,12    o_orderstatus text NOT NULL,13    o_totalprice text NOT NULL,14    o_orderdate text NOT NULL,15    o_orderpriority text NOT NULL,16    o_clerk text NOT NULL,17    o_shippriority text NOT NULL,18    o_comment text NOT NULL,19    UNIQUE KEY pk (o_orderkey) UNENFORCED RELY,20    SHARD KEY __SHARDKEY (o_orderkey),21    SORT KEY o_orderkey (o_orderkey)22) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL23  AUTOSTATS_HISTOGRAM_MODE=CREATE24  AUTOSTATS_SAMPLING=ON25  SQL_MODE='STRICT_ALL_TABLES';26
27CREATE TABLE lineitem (28    l_orderkey text NOT NULL,29    l_partkey text NOT NULL,30    l_suppkey text NOT NULL,31    l_linenumber text NOT NULL,32    l_quantity text NOT NULL,33    l_extendedprice text NOT NULL,34    l_discount text NOT NULL,35    l_tax text NOT NULL,36    l_returnflag text NOT NULL,37    l_linestatus text NOT NULL,38    l_shipdate text NOT NULL,39    l_commitdate text NOT NULL,40    l_receiptdate text NOT NULL,41    l_shipinstruct text NOT NULL,42    l_shipmode text NOT NULL,43    l_comment text NOT NULL,44    UNIQUE KEY pk (l_orderkey, l_linenumber) UNENFORCED RELY,45    SHARD KEY __SHARDKEY (l_orderkey),46    SORT KEY l_orderkey (l_orderkey)47) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL48  AUTOSTATS_HISTOGRAM_MODE=CREATE49  AUTOSTATS_SAMPLING=ON50  SQL_MODE='STRICT_ALL_TABLES';51
52CREATE TABLE customer (53    c_custkey text NOT NULL,54    c_name text NOT NULL,55    c_address text NOT NULL,56    c_nationkey text NOT NULL,57    c_phone text NOT NULL,58    c_acctbal text NOT NULL,59    c_mktsegment text NOT NULL,60    c_comment text NOT NULL,61    UNIQUE KEY pk (c_custkey) UNENFORCED RELY,62    SHARD KEY __SHARDKEY (c_custkey),63    SORT KEY c_custkey (c_custkey)64) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL65  AUTOSTATS_HISTOGRAM_MODE=CREATE66  AUTOSTATS_SAMPLING=ON67  SQL_MODE='STRICT_ALL_TABLES';68
69-- Create pipeline for orders table70CREATE OR REPLACE PIPELINE orders_pipeline AS71LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/orders_topic'72INTO TABLE orders73(74    o_orderstatus <- o_orderstatus, o_clerk <- o_clerk,75    o_orderdate <- o_orderdate,o_shippriority <- o_shippriority,76    o_custkey <- o_custkey,o_totalprice <- o_totalprice,77    o_orderkey <- o_orderkey,o_comment <- o_comment,78    o_orderpriority <- o_orderpriority79)80FORMAT JSON;81
82
83-- Create pipeline for lineitem table84CREATE OR REPLACE PIPELINE lineitem_pipeline AS85LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/lineitem_topic'86INTO TABLE lineitem87(88    l_orderkey <- l_orderkey,89    l_partkey <- l_partkey,l_suppkey <- l_suppkey,90    l_linenumber <- l_linenumber,l_quantity <- l_quantity,91    l_extendedprice <- l_extendedprice,92    l_discount <- l_discount,l_tax <- l_tax,93    l_returnflag <- l_returnflag,l_linestatus <- l_linestatus,94    l_shipdate <- l_shipdate,95    l_commitdate <- l_commitdate,l_receiptdate <- l_receiptdate,96    l_shipinstruct <- l_shipinstruct,l_shipmode <- l_shipmode,97    l_comment <- l_comment98)99FORMAT JSON;100
101-- Create pipeline for customer table102CREATE OR REPLACE PIPELINE customer_pipeline AS103LOAD DATA KAFKA '<your_kafak_machine_ip>:9092/customer_topic'104INTO TABLE customer105(106    c_custkey <- c_custkey,107    c_name <- c_name,108    c_address <- c_address,109    c_nationkey <- c_nationkey,110    c_phone <- c_phone,111    c_acctbal <- c_acctbal,112    c_mktsegment <- c_mktsegment,113    c_comment <- c_comment114)115FORMAT JSON;116
117start pipeline orders_pipeline;118start pipeline customer_pipeline;119start pipeline lineitem_pipeline;

You can use the following query to check the timings for your pipeline.

1select2   pipeline_name PipelineName3   ,max(start_time) StartTime4   ,format(sum(rows_streamed),0) RowsStreamed5   ,format(sum(mb_streamed),2) MBStreamed6   ,format(sum(batch_time),0) BatchTimeSec7   ,format(sum(batch_time) / 60, 2) BatchTimeMin8   ,format(sum(rows_streamed) / sum(batch_time),0) RowsperSec9   ,format(sum(mb_streamed) / sum(batch_time),2) MBperSec10from11   information_schema.pipelines_batches_summary12where13   database_name = 'ai' -- replace with your database name14   and pipeline_name = 'orders_pipeline' -- replace with your pipeline name15group by16   pipeline_name17order by18 sum(rows_streamed) / sum(batch_time) desc;

batch-load-rockset-dataBatch load Rockset data

Now that your real-time streams are set up, you will want to bulk load the rest of your data from Rockset. Since you’re already familiar with SingleStore pipelines by now, this part is super easy!

To move your data directly from Rockset to SingleStore using SingleStore Pipelines, follow these simple steps:

Step 1
Select the data from Rockset into S3 (or other object store). The following example shows the export of data into S3 in PARQUET format. You can also specify JSON format.

1INSERT INTO 's3://analyticsdata/query1'2 INTEGRATION = 's3export'3 FORMAT = (TYPE='PARQUET', INCLUDE_QUERY_ID=true)4SELECT * FROM commons.analytics

Step 2
Once the data is exported to S3 using the preceding command, you can set up the SingleStore pipeline directly from S3.
Check out our documentation for a more detailed reference.

1CREATE PIPELINE <Pipeline_name> AS2LOAD DATA S3 's3://<bucket_name>/<filename>.json'3CONFIG '{"region":"us-west-2"}'4CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX", 5             "aws_secret_access_key": "XXXXXXXXXXX"}'6INTO TABLE keypairs7FORMAT PARQUET8(`key` <- keypairs::`key`,9`value` <- keypairs::`value`);10
11
12START PIPELINE Pipeline_name;

Once the pipeline loading is complete, we are done with the snapshot load stage of the migration.

alternativesAlternatives

We have more ways to replicate DynamoDB data to SingleStore:

  1. DynamoDB to SingleStore via AWS DMS. We have a few customers (like Particl) doing this in production today.
  2. DynamoDB to SingleStore via AWS SQS and Lambda functions. Lambda functions can transform messages in batches and generate SQL inserts, or update queries that can be run directly against the database.

conclusionConclusion

Migrating from Rockset to SingleStore is a seamless process, thanks to SingleStore's robust and efficient real-time data pipelines. By following the steps outlined in this blog, you can easily integrate your existing DynamoDB system, ensuring minimal downtime and maximum efficiency.

SingleStore not only simplifies the migration process but also offers superior performance and real-time analytics database capabilities, making it an ideal choice for Rockset customers looking to enhance their data management solutions.

Remember, with SingleStore, you're not just migrating your data — you're upgrading your entire data infrastructure to a more powerful and scalable solution.


Share

Start building with SingleStore