Real-Time Anomaly Detection
Notebook
In this notebook, we embark on a cutting-edge exploration of real-time anomaly detection in IoT sensor data, harnessing the robust capabilities of SingleStoreDB and advanced analytical techniques. Our journey begins with the efficient ingestion of sensor data into SingleStoreDB, setting the stage for dynamic and insightful analysis. The heart of this notebook lies in its innovative approach to handling and interpreting sensor data. We utilize the power of vector embeddings, generated through the UMAP library, to transform high-dimensional sensor readings into a format ripe for anomaly detection. These embeddings, capturing the essence of weather parameters like wind, rain, and temperature, are then seamlessly integrated into SingleStoreDB.
Our focus intensifies as we apply SingleStoreDB's dot_product function to these embeddings, unearthing anomalies in real-time. This not only provides a swift identification of irregularities but also paints a vivid picture of sensor data behavior over time. We don’t just stop at detection; the notebook further enriches the data analysis with a visually engaging, real-time dashboard. This dashboard, crafted using Plotly and Rich libraries, offers an interactive and constantly updated view of the anomalies, allowing users to monitor and respond to sensor data trends as they happen. Join us in this exciting venture as we blend SQL, SingleStoreDB, and Python to unlock new possibilities in real-time anomaly detection. Whether you're a data scientist, an IoT enthusiast, or simply intrigued by the power of real-time analytics, this notebook is your gateway to understanding and leveraging the full potential of IoT sensor data.
Database Setup
Overview
This section focuses on the initial setup of the database iot_sensor_db
in SingleStore, specifically designed for handling IoT sensor data. It includes creating the necessary tables to store both historical and real-time sensor data, along with vector embeddings for anomaly detection.
SQL Script Description
The provided SQL script performs the following operations:
Database Initialization
DROP DATABASE IF EXISTS iot_sensor_db;
Ensures a clean slate by dropping theiot_sensor_db
database if it already exists.CREATE DATABASE iot_sensor_db;
Creates a new database namediot_sensor_db
.USE iot_sensor_db;
Setsiot_sensor_db
as the current database for subsequent operations.
Table Creation
CREATE TABLE sensor_data_with_vectors
This table is designed to store processed sensor data along with vector embeddings for anomaly detection.Columns:
date
: Timestamp of the sensor data.city
,longitude
,latitude
: Location information of the sensor.vent
,pluie
,temp
: Sensor readings for wind (vent), rain (pluie), and temperature (temp).anomaly
: Flag indicating whether the data point is an anomaly.embeddings
: Text column for storing vector embeddings.
CREATE TABLE sensor_data_stage
Serves as a staging area for raw sensor data before processing.Columns: Similar to
sensor_data_with_vectors
, but used for staging raw data.
In [1]:
%%sqlDROP DATABASE IF EXISTS iot_sensor_db;CREATE DATABASE iot_sensor_db;USE iot_sensor_db;CREATE TABLE sensor_data_with_vectors (date DATETIME,city VARCHAR(50),longitude VARCHAR(50),latitude VARCHAR(50),vent FLOAT(8,2),pluie FLOAT(8,2),temp FLOAT(8,2),anomaly VARCHAR(10),embeddings text);CREATE TABLE sensor_data_stage (city VARCHAR(50),longitude VARCHAR(50),latitude VARCHAR(50),vent FLOAT(8,2),pluie FLOAT(8,2),temp FLOAT(8,2),embeddings text,date DATETIME);
Setting Up and Initiating the Sensor Data Pipeline
Overview
This section details the setup and initiation of two pipelines in SingleStore: sensor_data_pipeline
for historical data load and sensor_realtime_data_pipeline
for real-time data analysis. Both pipelines stream and ingest IoT sensor data from S3 buckets into respective tables in SingleStore.
SQL Script Description
Historical Data Load Pipeline
Pipeline Creation
CREATE OR REPLACE PIPELINE sensor_data_pipeline AS
Creates or replaces a pipeline namedsensor_data_pipeline
.Configuration:
Source: S3 bucket path
s3://real-time-anomaly-detection-demo/demothon/with_cities_embeddings.csv
.Target: Table
sensor_data_with_vectors
.Data Format: CSV with specific delimiters and header line ignored.
Pipeline Activation
START PIPELINE sensor_data_pipeline FOREGROUND;
Initiates the pipeline for data ingestion.
Data Verification
SELECT * FROM sensor_data_with_vectors limit 2;
Fetches the first two rows fromsensor_data_with_vectors
to verify data ingestion.
Real-Time Data Analysis Pipeline
Pipeline Creation
CREATE OR REPLACE PIPELINE sensor_realtime_data_pipeline AS
Establishes a new pipeline namedsensor_realtime_data_pipeline
.Configuration:
Source: S3 bucket path
s3://real-time-anomaly-detection-demo/demothon/demo_day_data2.csv
.Target: Table
sensor_data_stage
.Data Format: CSV with specific delimiters and header line ignored.
Additional Setting:
SET date = NOW();
assigns the current timestamp to thedate
column.
Pipeline Activation
START PIPELINE sensor_realtime_data_pipeline FOREGROUND;
Activates the pipeline for real-time data ingestion.
Data Verification
SELECT * FROM sensor_data_stage limit 1;
Retrieves the first row fromsensor_data_stage
to confirm data ingestion.
Usage
The establishment of these pipelines is essential for the real-time and historical analysis of IoT sensor data. sensor_data_pipeline
facilitates the ingestion of historical data for retrospective analysis, while sensor_realtime_data_pipeline
caters to ongoing, real-time data analysis needs.
In [2]:
%%sqlCREATE OR REPLACE PIPELINE sensor_data_pipeline ASLOAD DATA S3 's3://s2db-demos-pub-bucket/real-time-anomaly-detection-demo/demothon/with_cities_embeddings.csv'INTO TABLE `sensor_data_with_vectors`FIELDS TERMINATED BY ','ENCLOSED BY '"'LINES TERMINATED BY '\n'IGNORE 1 LINES;
In [3]:
%%sqlSTART PIPELINE sensor_data_pipeline FOREGROUND;
In [4]:
%%sqlSELECT * FROM sensor_data_with_vectors limit 2;
In [5]:
%%sqlCREATE OR REPLACE PIPELINE sensor_realtime_data_pipeline ASLOAD DATA S3 's3://s2db-demos-pub-bucket/real-time-anomaly-detection-demo/demothon/demo_day_data2.csv'INTO TABLE `sensor_data_stage`(city, longitude, latitude, vent, pluie, temp, embeddings)FIELDS TERMINATED BY ','ENCLOSED BY '"'LINES TERMINATED BY '\r\n'IGNORE 1 LINESSET date = NOW();
In [6]:
%%sqlSTART PIPELINE sensor_realtime_data_pipeline FOREGROUND;
In [7]:
%%sqlSELECT * FROM sensor_data_stage limit 1;
Data Preparation for Analysis
Overview
This section covers the necessary steps to prepare IoT sensor data for analysis. It involves installing a required library, data retrieval from the sensor_data_stage
table, and preprocessing to ensure data quality.
Python Script Description
Library Installation
!pip install umap-learn --quiet
Installs theumap-learn
library quietly without verbose output. UMAP (Uniform Manifold Approximation and Projection) is used for dimensionality reduction.
Import Statements
Note: It's advised to restart the Python Kernel before importing
umap
to ensure the library is properly loaded.Imports various libraries including
umap
,normalize
fromsklearn.preprocessing
,sqlalchemy
,create_engine
,json
, andpandas
.
Database Connection
engine = create_engine(connection_url)
Establishes a connection to the database usingconnection_url
.
Data Retrieval and Preprocessing
df = pd.read_sql('select * from sensor_data_stage', engine)
Retrieves data from thesensor_data_stage
table into a pandas DataFrame.df = df.bfill(axis=0)
Fills null values in the DataFrame by propagating non-null values backward.df = df.dropna()
Drops any remaining rows with null values to ensure the dataset is clean for analysis.
In [8]:
!pip install umap-learn --quiet
Note
Restart Kernel if importing umap gives error
In [9]:
import umapfrom sklearn.preprocessing import normalizeimport sqlalchemyfrom sqlalchemy import create_engineimport jsonimport pandas as pd
In [10]:
# Filling null values usingbfill()engine = create_engine(connection_url)df = pd.read_sql('select * from sensor_data_stage', engine)df = df.bfill(axis=0)df = df.dropna()
Generating Vector Embeddings using UMAP Library
Overview
This section focuses on creating vector embeddings from sensor data using the UMAP library. The embeddings are generated to reduce the dimensionality of the data while preserving its structure, aiding in efficient analysis.
Python Script Description
Data Selection for Embedding Generation
new_df1 = df.iloc[50:100]
Creates a subset of the DataFramedf
, selecting rows 50 to 100. This subset is used for generating vector embeddings.
Feature Selection
features = new_df1[['vent', 'pluie', 'temp']]
Selects the columnsvent
,pluie
, andtemp
fromnew_df1
as features for the embedding process. These represent sensor readings for wind, rain, and temperature.
UMAP Reducer Initialization and Transformation
reducer = umap.UMAP(n_components=15)
Initializes a UMAP reducer to reduce the feature space to 15 components.embeddings = reducer.fit_transform(features)
Applies the UMAP transformation to the selected features, generating low-dimensional embeddings from the high-dimensional sensor data.
Normalization and Embedding Storage
normalized_embeddings = normalize(embeddings, norm='l2')
Normalizes the generated embeddings using L2 norm, ensuring uniform scale.new_df1['embeddings'] = list(normalized_embeddings)
Appends the normalized embeddings as a new column tonew_df1
.
Displaying Results
new_df1.head()
Displays the first few rows ofnew_df1
to verify the embedding generation and integration process.
In [11]:
# code to generate embeddings for real time datanew_df1 = df.iloc[50:100]features = new_df1[['vent', 'pluie', 'temp']]reducer = umap.UMAP(n_components=15)embeddings = reducer.fit_transform(features)
In [12]:
normalized_embeddings = normalize(embeddings, norm='l2')new_df1['embeddings'] = list(normalized_embeddings)
In [13]:
new_df1.head()
Anomaly Detection and Data Integration
Anomaly Detection Using SingleStore dot_product Function
Anomaly Detection Loop:
Iterates over each row in
new_df
.Extracts embeddings and converts them into JSON format.
Constructs an SQL query using SingleStore's
dot_product
function to measure similarity between the current row's embeddings and those in thesensor_data_with_vectors
table.The query fetches the
anomaly
status based on the highest similarity scores.SQL query execution:
result = pd.read_sql_query(query, con=engine)
.Anomalies are appended to
new_df
or set to a default value if no similar records are found.
Data Appending to Historical Table
Data Type Casting:
Ensures appropriate data types for columns in
new_df
(e.g., convertingdate
to datetime,city
,longitude
,latitude
to strings, etc.).
Appending to SQL Table:
new_df.to_sql('sensor_data_with_vectors', con=engine, if_exists='append', index=False)
appends the processed data innew_df
to thesensor_data_with_vectors
table in the database.
In [14]:
new_df = df.iloc[50:70].copy()# iterate over each row in the new DataFramefor index, row in new_df.iterrows():# get the embeddings from the current rowembeddings = row['embeddings']# convert numpy array to list and then to a JSON stringembeddings_json = json.loads(embeddings)# create the query stringquery = f"""SELECT anomaly, COUNT(anomaly) as countFROM (SELECT anomaly, dot_product(JSON_ARRAY_PACK('{embeddings_json}'),JSON_ARRAY_PACK(sensor_data_with_vectors.embeddings)) AS similarityFROM sensor_data_with_vectorsORDER BY similarity DESCLIMIT 20) AS subqueryGROUP BY anomalyORDER BY count DESC;"""# execute the queryresult = pd.read_sql_query(query, con=engine)# check if the result is emptyif not result.empty:# append the result to the current row in the new DataFramenew_df.loc[index, 'anomaly'] = result['anomaly'].values[0]else:# set anomaly to None or some default valuenew_df.loc[index, 'anomaly'] = 'none'
In [15]:
new_df.head()
In [16]:
# appending the new dataframe to main the table : sensor_data_with_vectorsnew_df['date'] = pd.to_datetime(new_df['date'])new_df['city'] = new_df['city'].astype(str)new_df['longitude'] = new_df['longitude'].astype(str)new_df['latitude'] = new_df['latitude'].astype(str)new_df['vent'] = new_df['vent'].astype(float)new_df['pluie'] = new_df['pluie'].astype(float)new_df['temp'] = new_df['temp'].astype(float)new_df['anomaly'] = new_df['anomaly'].astype(str)new_df['embeddings'] = new_df['embeddings'].astype(str)# Append data to SQL tablenew_df.to_sql('sensor_data_with_vectors', con=engine, if_exists='append', index=False)
Dashboard for Monitoring Anomalies over Time
Data Visualization Setup
Library Imports:
pandas
,plotly.express
, andsqlalchemy
.Database Connection: Establishes connection to the database using
create_engine(connection_url)
.
Data Retrieval and Processing
SQL Query for Data Fetching: Retrieves anomaly data from
sensor_data_with_vectors
table, excluding entries with 'none' in the anomaly field.Data Preparation:
Converts
date
column to datetime format and extracts date part into a new columndate_only
.Groups data by
date_only
andanomaly
, counting occurrences to prepare for visualization.
Plotting Anomalies over Time
Overall Anomaly Trends:
Utilizes
plotly.express
to create a histogram representing anomaly counts over time.Each anomaly type is color-coded for distinction.
City-Specific Anomaly Trends:
Further groups data by
city
along withdate_only
andanomaly
.Loops through a predefined list of cities to create separate histograms for each city, showcasing city-specific anomaly trends.
In [17]:
import pandas as pdimport plotly.express as pxfrom sqlalchemy import create_engineengine = create_engine(connection_url)
In [18]:
# df = pd.read_sql('select * from sensor_data_with_vectors limit 50000;', engine)df = pd.read_sql("select * from sensor_data_with_vectors where anomaly <> 'none' limit 50000;", engine)
In [19]:
df['date'] = pd.to_datetime(df['date'])df['date_only'] = df['date'].dt.date# Group data by date and anomaly, then count the instancesgrouped_df = df.groupby(['date_only', 'anomaly']).size().reset_index(name='counts')# Create line plot with Plotlyfig = px.histogram(grouped_df, x='date_only', y='counts', color='anomaly',title='Anomalies over Time', labels={'date_only': 'Date', 'counts': 'Anomaly Count'})# Show plotfig.show()
In [20]:
# Group data by date, city and anomaly, then count the instancesgrouped_df = df.groupby(['date_only', 'city', 'anomaly']).size().reset_index(name='counts')# List your citiescities = ['Washington DC', 'New York', 'Los Angeles'] # Add or change according to your DataFrame# Create a separate plot for each cityfor city in cities:city_df = grouped_df[grouped_df['city'] == city]fig = px.histogram(city_df, x='date_only', y='counts', color='anomaly',title=f'Anomalies over Time for {city}', labels={'date_only': 'Date', 'counts': 'Anomaly Count'})fig.show()
Real-Time Anomaly Detection Dashboard
Environment Setup and Library Imports
Library Installation: Installs
tabulate
,pymysql
,Ipython
, andrich
libraries.Imports: Includes libraries such as
time
,os
,shutil
,pymysql
,rich.console
,rich.table
,IPython.display
,sqlalchemy
, andpandas
.
Dashboard Function Definition
Function
display_table_contents
:Establishes a database connection using
create_engine(connection_url)
.Executes an SQL query to fetch initial data from
sensor_data_with_vectors
with specific columns (date
,vent
,pluie
,temp
,anomaly
).Enters a loop to continuously update and display the dashboard.
Dashboard Display Mechanics
Console and Table Setup:
Clears the console output and creates a console instance with
rich.console
.Determines the terminal width and sets up a dynamic table layout.
Adds columns with specific styles and alignments for better readability.
Data Display and Refresh Loop:
Adds the top 50 rows from the fetched data to the table.
Styles rows based on the anomaly type (e.g., different colors for different anomaly types).
Refreshes the display every 10 seconds, fetching updated data from the database.
In [21]:
!pip install tabulate pymysql Ipython rich --quiet
In [22]:
import timeimport osimport shutilimport pymysqlfrom rich.console import Consolefrom rich.table import Tablefrom rich import boxfrom IPython.display import clear_output
In [23]:
from sqlalchemy import create_engineimport pandas as pddef display_table_contents():# Create a database engineengine = create_engine(connection_url)# Execute query to fetch initial table contentsquery = 'SELECT date, vent, pluie, temp, anomaly FROM sensor_data_with_vectors ORDER BY date DESC'table_data = pd.read_sql_query(query, engine)while True:# Clear console outputclear_output(wait=True)# Create a console instanceconsole = Console()# Get the terminal widthterminal_width = shutil.get_terminal_size().columns# Print the title with centered alignmenttitle = "[bold magenta]REAL TIME ANALYTICS DASHBOARD[/bold magenta]"console.print(title.center(terminal_width))# Create a table instancetable = Table(show_header=True, header_style="bold", box=None)# Add columns to the tabletable.add_column("Date", justify="center", style="cyan", width=terminal_width // 5 + 5)table.add_column("Vent", justify="center", style="magenta", width=terminal_width // 5)table.add_column("Pluie", justify="center", style="yellow", width=terminal_width // 5)table.add_column("Temp", justify="center", style="green", width=terminal_width // 5)table.add_column("Anomaly", justify="center", width=terminal_width // 5)# Add rows to the tablefor row in table_data.head(50).itertuples(index=False):# Convert datetime to string before adding to the tableformatted_row = [str(cell) for cell in row]# Check the anomaly typeanomaly_type = formatted_row[4]# Determine the style based on the anomaly typeif anomaly_type == 'pluie':style = "bold blue"elif anomaly_type == 'vent':style = "bold magenta"elif anomaly_type == 'temp':style = "bold green"else:style = ""# Add the row with the appropriate styletable.add_row(*formatted_row, style=style)# Print the tableconsole.print(table)# Wait for 30 seconds before refreshingtime.sleep(10)# Execute query to fetch updated table contentsupdated_data = pd.read_sql_query(query, engine)# Update the table_data with the fetched datatable_data = updated_data# Call the function to start displaying the table contentsdisplay_table_contents()
Details
About this Template
Real-time anomaly detection in IoT sensor data, harnessing the robust capabilities of SingleStoreDB and advanced analytical techniques.
This Notebook can be run in Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.