New

Load JSON files with Pipeline from S3

Notebook

SingleStore Notebooks

Load JSON files with Pipeline from S3

Note

This tutorial is meant for Standard & Premium Workspaces. You can't run this with a Free Starter Workspace due to restrictions on Storage. Create a Workspace using +group in the left nav & select Standard for this notebook. Gallery notebooks tagged with "Starter" are suitable to run on a Free Starter Workspace

This notebook helps you navigate through different scenarios data ingestion of JSON files from an AWS S3 location:

  • Ingest JSON files in AWS S3 using wildcards with pre-defined schema

  • Ingest JSON files in AWS S3 using wildcards into a JSON column

Create a Pipeline from JSON files in AWS S3 using wildcards

In this example, we want to create a pipeline from two JSON files called actors1.json and actors2.json stored in an AWS S3 bucket called singlestoredb and a folder called actors. This bucket is located in us-east-1.

Each file has the following shape with nested objects and arrays:

{
  "Actors": [
    {
      "name": "Tom Cruise",
      "age": 56,
      "Born At": "Syracuse, NY",
      "Birthdate": "July 3, 1962",
      "photo": "https://jsonformatter.org/img/tom-cruise.jpg",
      "wife": null,
      "weight": 67.5,
      "hasChildren": true,
      "hasGreyHair": false,
      "children": [
        "Suri",
        "Isabella Jane",
        "Connor"
      ]
    },
    {
      "name": "Robert Downey Jr.",
      "age": 53,
      "Born At": "New York City, NY",
      "Birthdate": "April 4, 1965",
      "photo": "https://jsonformatter.org/img/Robert-Downey-Jr.jpg",
      "wife": "Susan Downey",
      "weight": 77.1,
      "hasChildren": true,
      "hasGreyHair": false,
      "children": [
        "Indio Falconer",
        "Avri Roel",
        "Exton Elias"
      ]
    }
  ]
}

Create a Table

We first create a table called actors in the database demo_database

In [1]:

%%sql
CREATE DATABASE IF NOT EXISTS demo_database;

Action Required

Make sure to select the demo_database database from the drop-down menu at the top of this notebook. It updates the connection_url to connect to that database.

In [2]:

%%sql
CREATE TABLE IF NOT EXISTS demo_database.actors (
name text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
age int NOT NULL,
born_at text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
Birthdate text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
photo text CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
wife text CHARACTER SET utf8 COLLATE utf8_general_ci,
weight float NOT NULL,
haschildren boolean,
hasGreyHair boolean,
children JSON COLLATE utf8_bin NOT NULL,
SHARD KEY ()
);

Create a pipeline

We then create a pipeline called actors in the database demo_database. Since those files are small, batch_interval is not as important and the maximum partitions per batch is only 1. For faster performance, we recommend increasing the maximum partitions per batch. Note, that since the bucket is publcly accessible, you do not need to provide access key and secret.

In [3]:

%%sql
CREATE PIPELINE if not exists demo_database.actors
AS LOAD DATA S3 'studiotutorials/sample_dataset/json_files/wildcard_demo/*.json'
CONFIG '{ \"region\": \"us-east-1\" }'
/*
CREDENTIALS '{"aws_access_key_id": "<Key to Enter>",
"aws_secret_access_key": "<Key to Enter>"}'
*/
BATCH_INTERVAL 2500
MAX_PARTITIONS_PER_BATCH 1
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
SKIP DUPLICATE KEY ERRORS
INTO TABLE `actors`
FORMAT JSON
(
actors.name <- name,
actors.age <- age,
actors.born_at <- `Born At`,
actors.Birthdate <- Birthdate,
actors.photo <- photo,
actors.wife <- wife,
actors.weight <- weight,
actors.haschildren <- hasChildren,
actors.hasGreyHair <- hasGreyHair,
actors.children <- children
);

Start and monitor the pipeline

In [4]:

%%sql
START PIPELINE demo_database.actors;

If there is no error or warning, you should see no error message.

In [5]:

%%sql
SELECT * FROM information_schema.pipelines_errors
WHERE pipeline_name = 'actors' ;

Query the table

In [6]:

%%sql
SELECT * FROM demo_database.actors;

Cleanup ressources

In [7]:

%%sql
DROP PIPELINE IF EXISTS demo_database.actors;
DROP TABLE IF EXISTS demo_database.actors;

Ingest JSON files in AWS S3 using wildcards into a JSON column

As the schema of your files might change, you might want to keep flexibility in ingesting the data into one JSON column that we name json_data. the table we create is named actors_json.

Create Table

In [8]:

%%sql
CREATE TABLE IF NOT EXISTS demo_database.actors_json (
json_data JSON NOT NULL,
SHARD KEY ()
);

Create a pipeline

In [9]:

%%sql
CREATE PIPELINE IF NOT EXISTS demo_database.actors_json
AS LOAD DATA S3 'studiotutorials/sample_dataset/json_files/wildcard_demo/*.json'
CONFIG '{ \"region\": \"us-east-1\" }'
/*
CREDENTIALS '{"aws_access_key_id": "<Key to Enter>",
"aws_secret_access_key": "<Key to Enter>"}'
*/
BATCH_INTERVAL 2500
MAX_PARTITIONS_PER_BATCH 1
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
SKIP DUPLICATE KEY ERRORS
INTO TABLE `actors_json`
FORMAT JSON
(json_data <- %);

Start and monitor pipeline

In [10]:

%%sql
START PIPELINE demo_database.actors_json;

In [11]:

%%sql
# Monitor and see if there is any error or warning
SELECT * FROM information_schema.pipelines_errors
WHERE pipeline_name = 'actors_json' ;

Query the table

In [12]:

%%sql
SELECT * FROM demo_database.actors_json

Cleanup ressources

In [13]:

%%sql
DROP DATABASE IF EXISTS demo_database;

Details

Tags

#advanced#pipeline#json#s3

License

This Notebook has been released under the Apache 2.0 open source license.