New

Real-Time Anomaly Detection

Notebook

SingleStore Notebooks

Real-Time Anomaly Detection

In this notebook, we embark on a cutting-edge exploration of real-time anomaly detection in IoT sensor data, harnessing the robust capabilities of SingleStoreDB and advanced analytical techniques. Our journey begins with the efficient ingestion of sensor data into SingleStoreDB, setting the stage for dynamic and insightful analysis. The heart of this notebook lies in its innovative approach to handling and interpreting sensor data. We utilize the power of vector embeddings, generated through the UMAP library, to transform high-dimensional sensor readings into a format ripe for anomaly detection. These embeddings, capturing the essence of weather parameters like wind, rain, and temperature, are then seamlessly integrated into SingleStoreDB.

Our focus intensifies as we apply SingleStoreDB's dot_product function to these embeddings, unearthing anomalies in real-time. This not only provides a swift identification of irregularities but also paints a vivid picture of sensor data behavior over time. We don’t just stop at detection; the notebook further enriches the data analysis with a visually engaging, real-time dashboard. This dashboard, crafted using Plotly and Rich libraries, offers an interactive and constantly updated view of the anomalies, allowing users to monitor and respond to sensor data trends as they happen. Join us in this exciting venture as we blend SQL, SingleStoreDB, and Python to unlock new possibilities in real-time anomaly detection. Whether you're a data scientist, an IoT enthusiast, or simply intrigued by the power of real-time analytics, this notebook is your gateway to understanding and leveraging the full potential of IoT sensor data.

Architecture Diagram

Database Setup

Overview

This section focuses on the initial setup of the database iot_sensor_db in SingleStore, specifically designed for handling IoT sensor data. It includes creating the necessary tables to store both historical and real-time sensor data, along with vector embeddings for anomaly detection.

SQL Script Description

The provided SQL script performs the following operations:

  1. Database Initialization

    • DROP DATABASE IF EXISTS iot_sensor_db; Ensures a clean slate by dropping the iot_sensor_db database if it already exists.

    • CREATE DATABASE iot_sensor_db; Creates a new database named iot_sensor_db.

    • USE iot_sensor_db; Sets iot_sensor_db as the current database for subsequent operations.

  2. Table Creation

    • CREATE TABLE sensor_data_with_vectors This table is designed to store processed sensor data along with vector embeddings for anomaly detection.

      • Columns:

        • date: Timestamp of the sensor data.

        • city, longitude, latitude: Location information of the sensor.

        • vent, pluie, temp: Sensor readings for wind (vent), rain (pluie), and temperature (temp).

        • anomaly: Flag indicating whether the data point is an anomaly.

        • embeddings: Text column for storing vector embeddings.

    • CREATE TABLE sensor_data_stage Serves as a staging area for raw sensor data before processing.

      • Columns: Similar to sensor_data_with_vectors, but used for staging raw data.

In [1]:

%%sql
DROP DATABASE IF EXISTS iot_sensor_db;
CREATE DATABASE iot_sensor_db;
USE iot_sensor_db;
CREATE TABLE sensor_data_with_vectors (
date DATETIME,
city VARCHAR(50),
longitude VARCHAR(50),
latitude VARCHAR(50),
vent FLOAT(8,2),
pluie FLOAT(8,2),
temp FLOAT(8,2),
anomaly VARCHAR(10),
embeddings text
);
CREATE TABLE sensor_data_stage (
city VARCHAR(50),
longitude VARCHAR(50),
latitude VARCHAR(50),
vent FLOAT(8,2),
pluie FLOAT(8,2),
temp FLOAT(8,2),
embeddings text,
date DATETIME
);

Setting Up and Initiating the Sensor Data Pipeline

Overview

This section details the setup and initiation of two pipelines in SingleStore: sensor_data_pipeline for historical data load and sensor_realtime_data_pipeline for real-time data analysis. Both pipelines stream and ingest IoT sensor data from S3 buckets into respective tables in SingleStore.

SQL Script Description

Historical Data Load Pipeline
  1. Pipeline Creation

    • CREATE OR REPLACE PIPELINE sensor_data_pipeline AS Creates or replaces a pipeline named sensor_data_pipeline.

    • Configuration:

      • Source: S3 bucket path s3://real-time-anomaly-detection-demo/demothon/with_cities_embeddings.csv.

      • Target: Table sensor_data_with_vectors.

      • Data Format: CSV with specific delimiters and header line ignored.

  2. Pipeline Activation

    • START PIPELINE sensor_data_pipeline FOREGROUND; Initiates the pipeline for data ingestion.

  3. Data Verification

    • SELECT * FROM sensor_data_with_vectors limit 2; Fetches the first two rows from sensor_data_with_vectors to verify data ingestion.

Real-Time Data Analysis Pipeline
  1. Pipeline Creation

    • CREATE OR REPLACE PIPELINE sensor_realtime_data_pipeline AS Establishes a new pipeline named sensor_realtime_data_pipeline.

    • Configuration:

      • Source: S3 bucket path s3://real-time-anomaly-detection-demo/demothon/demo_day_data2.csv.

      • Target: Table sensor_data_stage.

      • Data Format: CSV with specific delimiters and header line ignored.

      • Additional Setting: SET date = NOW(); assigns the current timestamp to the date column.

  2. Pipeline Activation

    • START PIPELINE sensor_realtime_data_pipeline FOREGROUND; Activates the pipeline for real-time data ingestion.

  3. Data Verification

    • SELECT * FROM sensor_data_stage limit 1; Retrieves the first row from sensor_data_stage to confirm data ingestion.

Usage

The establishment of these pipelines is essential for the real-time and historical analysis of IoT sensor data. sensor_data_pipeline facilitates the ingestion of historical data for retrospective analysis, while sensor_realtime_data_pipeline caters to ongoing, real-time data analysis needs.

In [2]:

%%sql
CREATE OR REPLACE PIPELINE sensor_data_pipeline AS
LOAD DATA S3 's3://s2db-demos-pub-bucket/real-time-anomaly-detection-demo/demothon/with_cities_embeddings.csv'
INTO TABLE `sensor_data_with_vectors`
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 LINES;

In [3]:

%%sql
START PIPELINE sensor_data_pipeline FOREGROUND;

In [4]:

%%sql
SELECT * FROM sensor_data_with_vectors limit 2;

In [5]:

%%sql
CREATE OR REPLACE PIPELINE sensor_realtime_data_pipeline AS
LOAD DATA S3 's3://s2db-demos-pub-bucket/real-time-anomaly-detection-demo/demothon/demo_day_data2.csv'
INTO TABLE `sensor_data_stage`
(city, longitude, latitude, vent, pluie, temp, embeddings)
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\r\n'
IGNORE 1 LINES
SET date = NOW();

In [6]:

%%sql
START PIPELINE sensor_realtime_data_pipeline FOREGROUND;

In [7]:

%%sql
SELECT * FROM sensor_data_stage limit 1;

Data Preparation for Analysis

Overview

This section covers the necessary steps to prepare IoT sensor data for analysis. It involves installing a required library, data retrieval from the sensor_data_stage table, and preprocessing to ensure data quality.

Python Script Description

  1. Library Installation

    • !pip install umap-learn --quiet Installs the umap-learn library quietly without verbose output. UMAP (Uniform Manifold Approximation and Projection) is used for dimensionality reduction.

  2. Import Statements

    • Note: It's advised to restart the Python Kernel before importing umap to ensure the library is properly loaded.

    • Imports various libraries including umap, normalize from sklearn.preprocessing, sqlalchemy, create_engine, json, and pandas.

  3. Database Connection

    • engine = create_engine(connection_url) Establishes a connection to the database using connection_url.

  4. Data Retrieval and Preprocessing

    • df = pd.read_sql('select * from sensor_data_stage', engine) Retrieves data from the sensor_data_stage table into a pandas DataFrame.

    • df = df.bfill(axis=0) Fills null values in the DataFrame by propagating non-null values backward.

    • df = df.dropna() Drops any remaining rows with null values to ensure the dataset is clean for analysis.

In [8]:

!pip install umap-learn --quiet

Note

Restart Kernel if importing umap gives error

In [9]:

import umap
from sklearn.preprocessing import normalize
import sqlalchemy
from sqlalchemy import create_engine
import json
import pandas as pd

In [10]:

# Filling null values usingbfill()
engine = create_engine(connection_url)
df = pd.read_sql('select * from sensor_data_stage', engine)
df = df.bfill(axis=0)
df = df.dropna()

Generating Vector Embeddings using UMAP Library

Overview

This section focuses on creating vector embeddings from sensor data using the UMAP library. The embeddings are generated to reduce the dimensionality of the data while preserving its structure, aiding in efficient analysis.

Python Script Description

  1. Data Selection for Embedding Generation

    • new_df1 = df.iloc[50:100] Creates a subset of the DataFrame df, selecting rows 50 to 100. This subset is used for generating vector embeddings.

  2. Feature Selection

    • features = new_df1[['vent', 'pluie', 'temp']] Selects the columns vent, pluie, and temp from new_df1 as features for the embedding process. These represent sensor readings for wind, rain, and temperature.

  3. UMAP Reducer Initialization and Transformation

    • reducer = umap.UMAP(n_components=15) Initializes a UMAP reducer to reduce the feature space to 15 components.

    • embeddings = reducer.fit_transform(features) Applies the UMAP transformation to the selected features, generating low-dimensional embeddings from the high-dimensional sensor data.

  4. Normalization and Embedding Storage

    • normalized_embeddings = normalize(embeddings, norm='l2') Normalizes the generated embeddings using L2 norm, ensuring uniform scale.

    • new_df1['embeddings'] = list(normalized_embeddings) Appends the normalized embeddings as a new column to new_df1.

  5. Displaying Results

    • new_df1.head() Displays the first few rows of new_df1 to verify the embedding generation and integration process.

In [11]:

# code to generate embeddings for real time data
new_df1 = df.iloc[50:100]
features = new_df1[['vent', 'pluie', 'temp']]
reducer = umap.UMAP(n_components=15)
embeddings = reducer.fit_transform(features)

In [12]:

normalized_embeddings = normalize(embeddings, norm='l2')
new_df1['embeddings'] = list(normalized_embeddings)

In [13]:

new_df1.head()

Anomaly Detection and Data Integration

Anomaly Detection Using SingleStore dot_product Function

  • Anomaly Detection Loop:

    • Iterates over each row in new_df.

    • Extracts embeddings and converts them into JSON format.

    • Constructs an SQL query using SingleStore's dot_product function to measure similarity between the current row's embeddings and those in the sensor_data_with_vectors table.

    • The query fetches the anomaly status based on the highest similarity scores.

    • SQL query execution: result = pd.read_sql_query(query, con=engine).

    • Anomalies are appended to new_df or set to a default value if no similar records are found.

Data Appending to Historical Table

  • Data Type Casting:

    • Ensures appropriate data types for columns in new_df (e.g., converting date to datetime, city, longitude, latitude to strings, etc.).

  • Appending to SQL Table:

    • new_df.to_sql('sensor_data_with_vectors', con=engine, if_exists='append', index=False) appends the processed data in new_df to the sensor_data_with_vectors table in the database.

In [14]:

new_df = df.iloc[50:70].copy()
# iterate over each row in the new DataFrame
for index, row in new_df.iterrows():
# get the embeddings from the current row
embeddings = row['embeddings']
# convert numpy array to list and then to a JSON string
embeddings_json = json.loads(embeddings)
# create the query string
query = f"""
SELECT anomaly, COUNT(anomaly) as count
FROM (
SELECT anomaly, dot_product(
JSON_ARRAY_PACK('{embeddings_json}'),
JSON_ARRAY_PACK(sensor_data_with_vectors.embeddings)
) AS similarity
FROM sensor_data_with_vectors
ORDER BY similarity DESC
LIMIT 20
) AS subquery
GROUP BY anomaly
ORDER BY count DESC;
"""
# execute the query
result = pd.read_sql_query(query, con=engine)
# check if the result is empty
if not result.empty:
# append the result to the current row in the new DataFrame
new_df.loc[index, 'anomaly'] = result['anomaly'].values[0]
else:
# set anomaly to None or some default value
new_df.loc[index, 'anomaly'] = 'none'

In [15]:

new_df.head()

In [16]:

# appending the new dataframe to main the table : sensor_data_with_vectors
new_df['date'] = pd.to_datetime(new_df['date'])
new_df['city'] = new_df['city'].astype(str)
new_df['longitude'] = new_df['longitude'].astype(str)
new_df['latitude'] = new_df['latitude'].astype(str)
new_df['vent'] = new_df['vent'].astype(float)
new_df['pluie'] = new_df['pluie'].astype(float)
new_df['temp'] = new_df['temp'].astype(float)
new_df['anomaly'] = new_df['anomaly'].astype(str)
new_df['embeddings'] = new_df['embeddings'].astype(str)
# Append data to SQL table
new_df.to_sql('sensor_data_with_vectors', con=engine, if_exists='append', index=False)

Dashboard for Monitoring Anomalies over Time

Data Visualization Setup

  • Library Imports: pandas, plotly.express, and sqlalchemy.

  • Database Connection: Establishes connection to the database using create_engine(connection_url).

Data Retrieval and Processing

  • SQL Query for Data Fetching: Retrieves anomaly data from sensor_data_with_vectors table, excluding entries with 'none' in the anomaly field.

  • Data Preparation:

    • Converts date column to datetime format and extracts date part into a new column date_only.

    • Groups data by date_only and anomaly, counting occurrences to prepare for visualization.

Plotting Anomalies over Time

  • Overall Anomaly Trends:

    • Utilizes plotly.express to create a histogram representing anomaly counts over time.

    • Each anomaly type is color-coded for distinction.

  • City-Specific Anomaly Trends:

    • Further groups data by city along with date_only and anomaly.

    • Loops through a predefined list of cities to create separate histograms for each city, showcasing city-specific anomaly trends.

In [17]:

import pandas as pd
import plotly.express as px
from sqlalchemy import create_engine
engine = create_engine(connection_url)

In [18]:

# df = pd.read_sql('select * from sensor_data_with_vectors limit 50000;', engine)
df = pd.read_sql("select * from sensor_data_with_vectors where anomaly <> 'none' limit 50000;", engine)

In [19]:

df['date'] = pd.to_datetime(df['date'])
df['date_only'] = df['date'].dt.date
# Group data by date and anomaly, then count the instances
grouped_df = df.groupby(['date_only', 'anomaly']).size().reset_index(name='counts')
# Create line plot with Plotly
fig = px.histogram(grouped_df, x='date_only', y='counts', color='anomaly',
title='Anomalies over Time', labels={'date_only': 'Date', 'counts': 'Anomaly Count'})
# Show plot
fig.show()

In [20]:

# Group data by date, city and anomaly, then count the instances
grouped_df = df.groupby(['date_only', 'city', 'anomaly']).size().reset_index(name='counts')
# List your cities
cities = ['Washington DC', 'New York', 'Los Angeles'] # Add or change according to your DataFrame
# Create a separate plot for each city
for city in cities:
city_df = grouped_df[grouped_df['city'] == city]
fig = px.histogram(city_df, x='date_only', y='counts', color='anomaly',
title=f'Anomalies over Time for {city}', labels={'date_only': 'Date', 'counts': 'Anomaly Count'})
fig.show()

Real-Time Anomaly Detection Dashboard

Environment Setup and Library Imports

  • Library Installation: Installs tabulate, pymysql, Ipython, and rich libraries.

  • Imports: Includes libraries such as time, os, shutil, pymysql, rich.console, rich.table, IPython.display, sqlalchemy, and pandas.

Dashboard Function Definition

  • Function display_table_contents:

    • Establishes a database connection using create_engine(connection_url).

    • Executes an SQL query to fetch initial data from sensor_data_with_vectors with specific columns (date, vent, pluie, temp, anomaly).

    • Enters a loop to continuously update and display the dashboard.

Dashboard Display Mechanics

  • Console and Table Setup:

    • Clears the console output and creates a console instance with rich.console.

    • Determines the terminal width and sets up a dynamic table layout.

    • Adds columns with specific styles and alignments for better readability.

  • Data Display and Refresh Loop:

    • Adds the top 50 rows from the fetched data to the table.

    • Styles rows based on the anomaly type (e.g., different colors for different anomaly types).

    • Refreshes the display every 10 seconds, fetching updated data from the database.

In [21]:

!pip install tabulate pymysql Ipython rich --quiet

In [22]:

import time
import os
import shutil
import pymysql
from rich.console import Console
from rich.table import Table
from rich import box
from IPython.display import clear_output

In [23]:

from sqlalchemy import create_engine
import pandas as pd
def display_table_contents():
# Create a database engine
engine = create_engine(connection_url)
# Execute query to fetch initial table contents
query = 'SELECT date, vent, pluie, temp, anomaly FROM sensor_data_with_vectors ORDER BY date DESC'
table_data = pd.read_sql_query(query, engine)
while True:
# Clear console output
clear_output(wait=True)
# Create a console instance
console = Console()
# Get the terminal width
terminal_width = shutil.get_terminal_size().columns
# Print the title with centered alignment
title = "[bold magenta]REAL TIME ANALYTICS DASHBOARD[/bold magenta]"
console.print(title.center(terminal_width))
# Create a table instance
table = Table(show_header=True, header_style="bold", box=None)
# Add columns to the table
table.add_column("Date", justify="center", style="cyan", width=terminal_width // 5 + 5)
table.add_column("Vent", justify="center", style="magenta", width=terminal_width // 5)
table.add_column("Pluie", justify="center", style="yellow", width=terminal_width // 5)
table.add_column("Temp", justify="center", style="green", width=terminal_width // 5)
table.add_column("Anomaly", justify="center", width=terminal_width // 5)
# Add rows to the table
for row in table_data.head(50).itertuples(index=False):
# Convert datetime to string before adding to the table
formatted_row = [str(cell) for cell in row]
# Check the anomaly type
anomaly_type = formatted_row[4]
# Determine the style based on the anomaly type
if anomaly_type == 'pluie':
style = "bold blue"
elif anomaly_type == 'vent':
style = "bold magenta"
elif anomaly_type == 'temp':
style = "bold green"
else:
style = ""
# Add the row with the appropriate style
table.add_row(*formatted_row, style=style)
# Print the table
console.print(table)
# Wait for 30 seconds before refreshing
time.sleep(10)
# Execute query to fetch updated table contents
updated_data = pd.read_sql_query(query, engine)
# Update the table_data with the fetched data
table_data = updated_data
# Call the function to start displaying the table contents
display_table_contents()

Details

Tags

#vectordb#realtime

License

This Notebook has been released under the Apache 2.0 open source license.