New

Kafka Pipelines and Query Tuning

Notebook


SingleStore Notebooks

Kafka Pipelines and Query Tuning

Ingesting real time data from the International Space Station (ISS)

1. Drop the database if it exists, create a new database, switch to it, and then create a table.

Database Name

In the following cell you will enter your email address as the database name. However, you will need to replace all characters that are not underscores or alpha numberics with underscores.

Example:

If your email address is lorrin.smith-bates@singlestore.com you would use lorrin_smith_bates_singlestore_com

In [1]:

1email_address = "<< enter your email address >>"

Remove characters that can't be used in a database name.

In [2]:

1import re2
3modified_email_address = re.sub(r'[^A-Za-z0-9]', '_', email_address)4modified_email_address

In [3]:

1%%sql2DROP DATABASE IF EXISTS {{ modified_email_address }};3CREATE DATABASE {{ modified_email_address }};4USE {{ modified_email_address }};5CREATE TABLE iss_location(6    name varchar(10),7    id int,8    latitude float,9    longitude float,10    velocity float,11    visibility varchar(20),12    footprint float,13    timestamp bigint,14    daynum float,15    solar_lat float,16    solar_lon float,17    units varchar(20),18    url varchar(255)19);

2. Create a SingleStore pipeline to ingest ISS data from a Kafka topic.

In [4]:

1%%sql2
3CREATE OR REPLACE PIPELINE iss_pipeline AS4    LOAD DATA kafka '100.25.125.23/iss'5    INTO TABLE iss_location6    FORMAT JSON;

3. Test the pipeline.

In [5]:

1%%sql2
3TEST PIPELINE iss_pipeline;

4. Start the Pipeline

In [6]:

1%%sql2
3START PIPELINE iss_pipeline;

5. Get the count of records. Run this a few times to see the number of records ingested.

In [7]:

1%%sql2
3SELECT COUNT(*) FROM iss_location;

6. Get the latest location record. Click the link to see the location of the ISS in Google Maps.

In [8]:

1%%sql2
3SELECT timestamp, url4    FROM iss_location5    ORDER BY timestamp desc6    LIMIT 1;

7. Stop the pipeline and delete the data from the iss_location table.

In [9]:

1%%sql2
3STOP PIPELINE iss_pipeline;4DELETE FROM iss_location;

8. Change the pipeline offsets and interval.

In [10]:

1%%sql2
3ALTER PIPELINE iss_pipeline4  SET BATCH_INTERVAL 300005  SET OFFSETS latest ;

9. Start the Pipeline again.

In [11]:

1%%sql2
3START PIPELINE iss_pipeline;

10. Count the records, notice how the records are populated now after alterning the pipeline.

In [12]:

1%%sql2
3SELECT COUNT(*) from iss_location;

11. Stop the pipeline

In [13]:

1%%sql2
3STOP PIPELINE iss_pipeline;

Query Optimization

1. Restore the 'employees' database that has been backed up into a public S3 bucket

For the database name we'll prepend employees_ to the modified email address again.

In [14]:

1%%sql2RESTORE DATABASE employees AS employees_{{ modified_email_address }}3  FROM S3 'train.memsql.com/employee'4  CONFIG'{"region":"us-east-1"}'5  CREDENTIALS'{}';

2. Switch to the Employees database

In [15]:

1%%sql2USE employees_{{ modified_email_address }};

3. Run a query that joins 4 tables and orders by 4 columns in 3 tables

In [16]:

1%%sql2
3SELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_date4  FROM employees e5  INNER JOIN dept_emp de ON e.emp_no=de.emp_no6  INNER JOIN departments d ON de.dept_no=d.dept_no7  INNER JOIN titles t ON e.emp_no=t.emp_no8  ORDER BY e.first_name, e.last_name, d.dept_name, t.from_date9  LIMIT 10;

4. Examine the query execution profile using EXPLAIN

In [17]:

1%%sql2
3EXPLAIN SELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_date4  FROM employees e5  INNER JOIN dept_emp de ON e.emp_no=de.emp_no6  INNER JOIN departments d ON de.dept_no=d.dept_no7  INNER JOIN titles t ON e.emp_no=t.emp_no8  ORDER BY e.first_name, e.last_name, d.dept_name, t.from_date9  LIMIT 10;

5. Profile the query by using PROFILE.

In [18]:

1%%sql2PROFILE SELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_date3  FROM employees e4  INNER JOIN dept_emp de ON e.emp_no=de.emp_no5  INNER JOIN departments d ON de.dept_no=d.dept_no6  INNER JOIN titles t ON e.emp_no=t.emp_no7  ORDER BY e.first_name, e.last_name, d.dept_name, t.from_date8  LIMIT 10;

6. Run SHOW PROFILE to view the statistics on an actual run of the query

In [19]:

1%%sql2SHOW PROFILE;

7. Run Visual Profile to see this the profile in a GUI format

Query/Schema Tuning Exercise

Now that we've visualized our query execution plan, let's address some of the issues we've uncovered.

1. Create a Reference table for departments

In [20]:

1%%sql2CREATE REFERENCE TABLE departments_ref(3  dept_no CHAR(4) not null,4  dept_name varchar(40) not null,5  primary key (dept_no),6  key(dept_name)7);8
9INSERT INTO departments_ref (SELECT * FROM departments);

2. Profile the old and the new

In [21]:

1%%sql2-- CONTROL. Here is the original query. We can use this as our control in our experiment.3SELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_date4  FROM employees e5  INNER JOIN dept_emp de ON e.emp_no=de.emp_no6  INNER JOIN departments d ON de.dept_no=d.dept_no7  INNER JOIN titles t ON e.emp_no=t.emp_no8  ORDER BY e.first_name, e.last_name, d.dept_name, t.from_date9  LIMIT 10;10
11-- IMPROVED. Here is the slightly more improved query with the departments_ref table12SELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_date13  FROM employees e14  INNER JOIN dept_emp de ON e.emp_no=de.emp_no15  INNER JOIN departments_ref d ON de.dept_no=d.dept_no16  INNER JOIN titles t ON e.emp_no=t.emp_no17  ORDER BY e.first_name, e.last_name, d.dept_name, t.from_date18  LIMIT 10;19
20-- PROFILE them both and observe the differences.

3. Create a titles table with sort and shard keys defined.

In [22]:

1%%sql2CREATE  TABLE titles_sharded (3  emp_no INT NOT NULL,4  title VARCHAR(50) NOT NULL,5  from_date DATE NOT NULL,6  to_date DATE,7  SORT KEY (emp_no),8  SHARD KEY (emp_no)9);10
11INSERT INTO titles_sharded (SELECT * FROM titles);

4. Add shard and sort keys to the dept_emp table

In [23]:

1%%sql2CREATE TABLE dept_emp_sharded(3  emp_no int not null,4  dept_no char(4) not null,5  from_date date not null,6  to_date date not null,7  SORT KEY (dept_no),8  SHARD KEY(emp_no),9  KEY (dept_no)10);11
12INSERT INTO dept_emp_sharded (SELECT * FROM dept_emp);

In [24]:

1%%sql2SELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_date3  FROM employees e4  INNER JOIN dept_emp de ON e.emp_no=de.emp_no5  INNER JOIN departments_ref d ON de.dept_no=d.dept_no6  INNER JOIN titles_sharded t ON e.emp_no=t.emp_no7  ORDER BY e.first_name, e.last_name, d.dept_name, t.from_date8  LIMIT 10;

5. Add shard and sort keys to the employees table

In [25]:

1%%sql2CREATE TABLE employees_sharded (3    emp_no INT NOT NULL,4    birth_date DATE NOT NULL,5    first_name VARCHAR(14) NOT NULL,6    last_name VARCHAR(16) NOT NULL,7    hire_date DATE NOT NULL,8    SORT KEY (emp_no),9    SHARD KEY (emp_no)10);11
12INSERT INTO employees_sharded (SELECT * FROM employees);

In [26]:

1%%sql2SELECT e.first_name, e.last_name, d.dept_name, t.title, t.from_date, t.to_date3  FROM employees_sharded e4  INNER JOIN dept_emp de ON e.emp_no=de.emp_no5  INNER JOIN departments_ref d ON de.dept_no=d.dept_no6  INNER JOIN titles_sharded t ON e.emp_no=t.emp_no7  ORDER BY e.first_name, e.last_name, d.dept_name, t.from_date8  LIMIT 10;

Details


About this Template

Create a SingleStore pipeline to track the International Space Station and adjust queries & schema to optimize performance.

This Notebook can be run in Standard and Enterprise deployments.

Tags

beginnerkafkapipelinequerytuning

License

This Notebook has been released under the Apache 2.0 open source license.

See Notebook in action

Launch this notebook in SingleStore and start executing queries instantly.