Demonstrate ML function Anomaly Detect
Notebook
Note
You can use your existing Standard or Premium workspace with this Notebook.
This feature is currently in Private Preview. Please reach out to support@singlestore.com to confirm if this feature can be enabled in your org.
This Jupyter notebook will help you:
Download the bank transaction dataset from kaggle for anomaly detection
Store the data in a SingleStore table
Use ML Functions for training and predictions
Visualize the results of anomaly detection on test dataset
Prerequisites: Ensure ML Functions are installed on your deployment (AI > AI & ML Functions).
Step 1: Import necessary Libraries
In [1]:
1pip install -q kagglehub
In [2]:
1import os2import pandas as pd3import singlestoredb as s24import json5import kagglehub6import getpass7from singlestoredb import create_engine8from IPython.display import display9 10import plotly.graph_objects as go11from plotly.subplots import make_subplots
Step 2: Test Connection to SingleStore
In [3]:
1# Ensure that you have selected a SinglestoreDB connection (workspace) before running this cell2try:3 conn = s2.connect()4 conn.autocommit(True) # Set autocommit for notebook simplicity5 print("Connection successful!")6except Exception as e:7 print(f"Connection failed: {e}")
Step 3: Create Database and Tables
In [4]:
1# Create the database2db_name = "temp"3 4try:5 conn.cursor().execute(f"CREATE DATABASE IF NOT EXISTS {db_name};")6 conn.cursor().execute(f"USE {db_name};")7 print(f"Database '{db_name}' is ready.")8except Exception as e:9 print(e)
Step 4: Prepare and Load Data
We will use a publicly available Kaggle Bank Transactions dataset. We'll download the data into Pandas DataFrames and then load them into SingleStore.
Note
You may need to whitelist your Firewall to allow this notebook instance to access the Kaggle server url. In case the notebook doesn't have access, a Toast message will appear guiding you to add this url when running the following code cell.
In [5]:
1print("Downloading dataset from Kaggle...")2path = kagglehub.dataset_download("valakhorasani/bank-transaction-dataset-for-fraud-detection")3print(f"Dataset downloaded to: {path}")4 5# Read the CSV file6df = pd.read_csv(f"{path}/bank_transactions_data_2.csv")7df["TransactionDate"] = pd.to_datetime(df["TransactionDate"], format="%Y-%m-%d %H:%M:%S", errors="coerce")8df["PreviousTransactionDate"] = pd.to_datetime(df["PreviousTransactionDate"], format="%Y-%m-%d %H:%M:%S", errors="coerce")9# Display dataset info10print(f"\nDataset shape: {df.shape}")11print(f"Columns: {list(df.columns)}")12print("\nFirst few rows:")13df.head()
In [6]:
1try:2 # Create a SQLAlchemy engine3 engine = s2.create_engine(database=db_name)4 5 # Use pandas.to_sql to load the complete data to a table6 df.to_sql("metrics", con=engine, if_exists='append', index=False)7 8 print("Data loading complete.")9except Exception as e:10 print(f"Error loading data: {e}")
Create Train Test splits from complete data
Ensure that the rows belonging to the same sequence are grouped in the same split. The following SQL query takes care of this split
In [7]:
1%%sql2USE "temp";3 4DROP TABLE IF EXISTS training_data;5DROP TABLE IF EXISTS test_data;6 7-- Training set8CREATE TABLE training_data AS9SELECT *10FROM (11 SELECT12 *,13 ROW_NUMBER() OVER (ORDER BY TransactionDate) AS rn,14 COUNT(*) OVER () AS total_rows15 FROM temp.metrics16) t17WHERE rn <= 0.8 * total_rows18ORDER BY TransactionDate;19 20-- Test set21CREATE TABLE test_data AS22SELECT *23FROM (24 SELECT25 *,26 ROW_NUMBER() OVER (ORDER BY TransactionDate) AS rn,27 COUNT(*) OVER () AS total_rows28 FROM temp.metrics29) t30WHERE rn > 0.8 * total_rows31ORDER BY TransactionDate;
Out [7]:
Pre-process train data set to include recent context rows
Combine the last N (e.g., 20) most recent rows from
train_datawith all rows fromtest_data, forming a continuous chronological dataset.The last N rows help provide recent context for model evaluation.
The most recent 20 rows are then excluded from the final result since they are used only for observing short-term trends.
In [8]:
1%%sql2USE temp;3 4-- Drop old context table if it exists5DROP TABLE IF EXISTS test_data_with_history;6 7-- Create a new test_data_with_history table:8-- includes the last 20 rows from training_data + all rows from test_data9CREATE TABLE test_data_with_history AS10SELECT *11FROM (12 SELECT *13 FROM (14 SELECT *15 FROM training_data16 ORDER BY TransactionDate DESC17 LIMIT 2018 ) recent_training19 UNION ALL20 SELECT *21 FROM test_data22)23ORDER BY TransactionDate ASC;
Out [8]:
Step 5: Train the Anomaly Detection Model
Now we'll use the %%s2ml train cell magic to train our AnomalyDetection model.
In [9]:
1%s2ml list
Optionally delete any previous model
In [10]:
1%s2ml delete --model cc_anomaly_model_v1 --f
Out [10]:
{'status': 'deleted',
'modelID': '174365d5-0135-47af-bf4f-2a582a03b5ba',
'model': 'cc_anomaly_model_v1'}In [11]:
1%%s2ml train as anomaly_model2task: AnomalyDetection3model: cc_anomaly_model_v14db: temp5input_table: training_data6target_column: TransactionAmount7target_time_column: TransactionDate8target_series_column: Channel9description: "Training a anomaly detection model"10runtime: cpu-small11selected_features: {"mode":"*","features":null}12force: True
Step 6: Check Model Status
Model training is an asynchronous job. We can use the %s2ml status magic to check on its progress. The model is ready to use when the status shows Completed.
(This may take a minute or two. Re-run the cell to refresh the status.)
In [12]:
1%s2ml status --model cc_anomaly_model_v1
In [13]:
1print(json.dumps(anomaly_model, indent=4))
Step 7: Run Predictions
Once the model is COMPLETED, you can use it directly in SQL via the ML_ANOMALY_DETECT() function.
Here is an example prediction for one row
In [14]:
1%%sql2show tables in temp;
Out [14]:
Run the model on a subset of data to view the results format
In [15]:
1%%sql2SELECT COUNT(*) as test_count FROM test_data_with_history;
Out [15]:
In [16]:
1%%sql2SELECT * FROM test_data_with_history ORDER BY TransactionDate ASC LIMIT 10;
Out [16]:
In [17]:
1%%sql2SELECT cluster.ML_ANOMALY_DETECT('cc_anomaly_model_v1', TO_JSON(selected_data.*)) AS is_anomaly3FROM (SELECT * FROM temp.test_data_with_history ORDER BY TransactionDate ASC) AS selected_data;
Out [17]:
Step 8: Visualize Anomalies
In [18]:
1# Run Predictions2cursor = conn.cursor();3 4test_data_count = f"""5 SELECT count(*) FROM temp.test_data;6"""7cursor.execute(test_data_count)8 9QUERY_LIMIT = cursor.fetchall()[0][0] # Number of records to analyze
In [19]:
1# Run Predictions2predictions = f"""3 SELECT cluster.ML_ANOMALY_DETECT('cc_anomaly_model_v1', TO_JSON(selected_data.*)) AS is_anomaly4 FROM (SELECT * FROM temp.test_data_with_history ORDER BY TransactionDate ASC) AS selected_data;5"""6 7cursor.execute(predictions)8 9 10# Parse and prepare data for visualization11 12column_names = [desc[0] for desc in cursor.description]13df = pd.DataFrame(list(cursor.fetchall()), columns=column_names)14 15print(f"✓ Retrieved {len(df)} records from database")16 17 18df.head()
In [20]:
1# ============================================2# AI generated code to visualize Predictions3# ============================================4 5SERIES_COLORS = {6 'ATM': '#1f77b4',7 'Branch': '#2ca02c',8 'Online': '#9467bd',9 'POS': '#d62728',10 'Mobile': '#ff7f0e'11}12DEFAULT_COLOR = '#8c564b'13 14# Parse JSON and filter out error rows15def safe_parse_json(x):16 try:17 parsed = json.loads(x)18 # Check if it's an error response19 if 'error' in parsed:20 return None21 return parsed22 except:23 return None24 25df['is_anomaly_parsed'] = df['is_anomaly'].apply(safe_parse_json)26 27# Filter out rows that failed to parse or had errors28df_valid = df[df['is_anomaly_parsed'].notna()].copy()29print(f"✓ Filtered out {len(df) - len(df_valid)} error/invalid rows")30print(f"✓ Processing {len(df_valid)} valid prediction rows")31 32if len(df_valid) == 0:33 print("❌ No valid predictions found. All rows contain errors.")34 print("\nSample error:")35 print(df['is_anomaly'].iloc[0])36else:37 # Check what keys are in the JSON38 print(f"✓ JSON keys found: {list(df_valid['is_anomaly_parsed'].iloc[0].keys())}")39 40 # Expand JSON into separate columns41 df_expanded = pd.json_normalize(df_valid['is_anomaly_parsed'])42 43 print(f"✓ Expanded columns: {list(df_expanded.columns)}")44 45 # Combine dataframes46 df_final = pd.concat([df_valid.reset_index(drop=True), df_expanded], axis=1)47 48 # Handle potential case sensitivity in column names49 # Check for different possible timestamp column names50 timestamp_col = None51 for possible_name in ['TS', 'ts', 'Ts', 'timestamp', 'Timestamp']:52 if possible_name in df_final.columns:53 timestamp_col = possible_name54 break55 56 if timestamp_col is None:57 print("Available columns:", list(df_final.columns))58 raise ValueError("Could not find timestamp column. Please check the column names above.")59 60 # Convert timestamp to datetime61 df_final['TS'] = pd.to_datetime(df_final[timestamp_col])62 print("Before drop:", len(df_final))63 64 # --- DROP WARM-UP CONTEXT ROWS ---65 warmup_count = 2066 df_final = df_final.sort_values('TS').reset_index(drop=True)67 if len(df_final) > warmup_count:68 df_final = df_final.iloc[warmup_count:].reset_index(drop=True)69 print(f"✓ Ignored the first {warmup_count} warm-up rows used for context.")70 print("After drop:", len(df_final))71 72 73 # Handle potential case sensitivity for Series column74 series_col = None75 for possible_name in ['Series', 'series', 'SERIES']:76 if possible_name in df_final.columns:77 series_col = possible_name78 break79 80 if series_col is None:81 raise ValueError("Could not find Series column")82 83 # Standardize column name84 if series_col != 'Series':85 df_final['Series'] = df_final[series_col]86 87 # Filter out any null Series values before sorting88 df_final = df_final[df_final['Series'].notna()].copy()89 90 # Handle potential case sensitivity for other columns91 column_mapping = {92 'Actual': ['Actual', 'actual'],93 'Forecast': ['Forecast', 'forecast'],94 'Lower bound': ['Lower bound', 'lower bound', 'lower_bound'],95 'Upper bound': ['Upper bound', 'upper bound', 'upper_bound'],96 'is_Anomaly': ['is_Anomaly', 'is_anomaly', 'isAnomaly']97 }98 99 for standard_name, possible_names in column_mapping.items():100 for possible_name in possible_names:101 if possible_name in df_final.columns:102 if possible_name != standard_name:103 df_final[standard_name] = df_final[possible_name]104 break105 106 print(f"✓ Data preparation complete")107 108 109 # STEP 3: Dynamic series configuration110 111 series_types = sorted([s for s in df_final['Series'].unique() if isinstance(s, str)])112 num_series = len(series_types)113 114 print(f"✓ Found {num_series} unique series types: {', '.join(series_types)}")115 116 subplot_titles = [f"{series} Transactions" for series in series_types]117 row_heights = [1.0 / num_series] * num_series118 119 120 # STEP 4: Create optimized Plotly visualization121 122 fig = make_subplots(123 rows=num_series,124 cols=1,125 subplot_titles=tuple(subplot_titles),126 vertical_spacing=0.08,127 row_heights=row_heights128 )129 130 for idx, series_type in enumerate(series_types, start=1):131 series_data = df_final[df_final['Series'] == series_type].sort_values('TS')132 series_color = SERIES_COLORS.get(series_type, DEFAULT_COLOR)133 134 # Add confidence interval (upper bound)135 fig.add_trace(go.Scatter(136 x=series_data['TS'],137 y=series_data['Upper bound'],138 mode='lines',139 line=dict(width=0),140 showlegend=False,141 hoverinfo='skip',142 name='Upper Bound'143 ), row=idx, col=1)144 145 # Add confidence interval (lower bound with fill)146 fig.add_trace(go.Scatter(147 x=series_data['TS'],148 y=series_data['Lower bound'],149 mode='lines',150 line=dict(width=0),151 fillcolor='rgba(68, 168, 129, 0.12)',152 fill='tonexty',153 showlegend=False,154 hoverinfo='skip',155 name='Lower Bound'156 ), row=idx, col=1)157 158 # Add forecast line159 fig.add_trace(go.Scatter(160 x=series_data['TS'],161 y=series_data['Forecast'],162 mode='lines',163 name='Forecast',164 line=dict(color='gray', dash='dash', width=1.5),165 showlegend=(idx==1),166 hovertemplate='<b>Forecast</b><br>%{x}<br>$%{y:.2f}<extra></extra>'167 ), row=idx, col=1)168 169 # Add actual values - Use Scattergl for performance170 fig.add_trace(go.Scattergl(171 x=series_data['TS'],172 y=series_data['Actual'],173 mode='lines',174 name='Actual',175 line=dict(color=series_color, width=2),176 showlegend=(idx==1),177 hovertemplate='<b>Actual</b><br>%{x}<br>$%{y:.2f}<extra></extra>'178 ), row=idx, col=1)179 180 # Highlight anomalies181 anomalies = series_data[series_data['is_Anomaly'] == True]182 if len(anomalies) > 0:183 fig.add_trace(go.Scatter(184 x=anomalies['TS'],185 y=anomalies['Actual'],186 mode='markers',187 name='Anomaly',188 marker=dict(189 size=12,190 color='red',191 symbol='x',192 line=dict(width=2, color='darkred')193 ),194 showlegend=(idx==1),195 hovertemplate='<b>⚠️ ANOMALY</b><br>%{x}<br>$%{y:.2f}<extra></extra>'196 ), row=idx, col=1)197 198 # Update axes199 fig.update_xaxes(title_text="Time", row=idx, col=1, showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.1)')200 fig.update_yaxes(title_text="Amount ($)", row=idx, col=1, showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.1)')201 202 203 # STEP 5: Optimized layout204 plot_height = max(900, 280 * num_series)205 206 fig.update_layout(207 height=plot_height,208 width=1400,209 title={210 'text': f"Credit Card Transaction Anomaly Detection<br><sub>ML_ANOMALY_DETECT - {len(df_final):,} Records | {df_final['is_Anomaly'].sum()} Anomalies Detected</sub>",211 'x': 0.5,212 'xanchor': 'center',213 'y': 0.99,214 'yanchor': 'top',215 'font': {'size': 18}216 },217 hovermode='closest',218 showlegend=True,219 legend=dict(220 orientation="h",221 yanchor="top",222 y=-0.02,223 xanchor="center",224 x=0.5,225 bgcolor='rgba(255,255,255,0.9)',226 bordercolor='rgba(0,0,0,0.2)',227 borderwidth=1,228 font=dict(size=12)229 ),230 template='plotly_white',231 margin=dict(l=80, r=40, t=100, b=80),232 )233 234 fig.show()235 236 237 # STEP 6: Enhanced summary statistics238 239 print("\n" + "="*75)240 print(f"{'ANOMALY DETECTION SUMMARY':^75}")241 print("="*75)242 print(f"Total records analyzed: {len(df_final):,}")243 print(f"Date range: {df_final['TS'].min().strftime('%Y-%m-%d')} to {df_final['TS'].max().strftime('%Y-%m-%d')}")244 print(f"Anomalies detected: {df_final['is_Anomaly'].sum()}")245 print(f"Overall anomaly rate: {(df_final['is_Anomaly'].sum() / len(df_final) * 100):.2f}%")246 print(f"\nUnique series types: {num_series}")247 print("\nBreakdown by Series:")248 print("-"*75)249 250 for series in series_types:251 series_df = df_final[df_final['Series'] == series]252 series_count = len(series_df)253 anomaly_count = series_df['is_Anomaly'].sum()254 anomaly_rate = (anomaly_count/series_count*100) if series_count > 0 else 0255 series_avg = series_df['Actual'].mean()256 series_max = series_df['Actual'].max()257 258 print(f" {series:12s}: {anomaly_count:3d}/{series_count:4d} anomalies ({anomaly_rate:5.2f}%) | "259 f"Avg: ${series_avg:7.2f} | Max: ${series_max:8.2f}")260 261 print("="*75)
Step 9: Cleanup
Run these following commands to cleanup all the created resources
In [21]:
1%%sql2USE temp;3 4-- Drop all tables created during the demo5DROP TABLE IF EXISTS results_table;6DROP TABLE IF EXISTS predict_table;7DROP TABLE IF EXISTS train_table;8DROP TABLE IF EXISTS transactions;9DROP TABLE IF EXISTS test_data_with_history;10 11-- Drop the database12DROP DATABASE IF EXISTS temp;
Out [21]:

Details
About this Template
Learn how to train an ML Anomaly Detect model and run it to predict the class of a set of time series inputs.
This Notebook can be run in Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.
See Notebook in action
Launch this notebook in SingleStore and start executing queries instantly.