New

Demonstrate ML function Anomaly Detect

Notebook


SingleStore Notebooks

Demonstrate ML function Anomaly Detect

Note

You can use your existing Standard or Premium workspace with this Notebook.

This feature is currently in Private Preview. Please reach out to support@singlestore.com to confirm if this feature can be enabled in your org.

This Jupyter notebook will help you:

  1. Download the bank transaction dataset from kaggle for anomaly detection

  2. Store the data in a SingleStore table

  3. Use ML Functions for training and predictions

  4. Visualize the results of anomaly detection on test dataset

Prerequisites: Ensure ML Functions are installed on your deployment (AI > AI & ML Functions).

Step 1: Import necessary Libraries

In [1]:

1pip install -q kagglehub

In [2]:

1import os2import pandas as pd3import singlestoredb as s24import json5import kagglehub6import getpass7from singlestoredb import create_engine8from IPython.display import display9
10import plotly.graph_objects as go11from plotly.subplots import make_subplots

Step 2: Test Connection to SingleStore

In [3]:

1# Ensure that you have selected a SinglestoreDB connection (workspace) before running this cell2try:3    conn = s2.connect()4    conn.autocommit(True) # Set autocommit for notebook simplicity5    print("Connection successful!")6except Exception as e:7    print(f"Connection failed: {e}")

Step 3: Create Database and Tables

In [4]:

1# Create the database2db_name = "temp"3
4try:5    conn.cursor().execute(f"CREATE DATABASE IF NOT EXISTS {db_name};")6    conn.cursor().execute(f"USE {db_name};")7    print(f"Database '{db_name}' is ready.")8except Exception as e:9    print(e)

Step 4: Prepare and Load Data

We will use a publicly available Kaggle Bank Transactions dataset. We'll download the data into Pandas DataFrames and then load them into SingleStore.

Note

You may need to whitelist your Firewall to allow this notebook instance to access the Kaggle server url. In case the notebook doesn't have access, a Toast message will appear guiding you to add this url when running the following code cell.

In [5]:

1print("Downloading dataset from Kaggle...")2path = kagglehub.dataset_download("valakhorasani/bank-transaction-dataset-for-fraud-detection")3print(f"Dataset downloaded to: {path}")4
5# Read the CSV file6df = pd.read_csv(f"{path}/bank_transactions_data_2.csv")7df["TransactionDate"] = pd.to_datetime(df["TransactionDate"], format="%Y-%m-%d %H:%M:%S", errors="coerce")8df["PreviousTransactionDate"] = pd.to_datetime(df["PreviousTransactionDate"], format="%Y-%m-%d %H:%M:%S", errors="coerce")9# Display dataset info10print(f"\nDataset shape: {df.shape}")11print(f"Columns: {list(df.columns)}")12print("\nFirst few rows:")13df.head()

In [6]:

1try:2    # Create a SQLAlchemy engine3    engine = s2.create_engine(database=db_name)4
5    # Use pandas.to_sql to load the complete data to a table6    df.to_sql("metrics", con=engine, if_exists='append', index=False)7
8    print("Data loading complete.")9except Exception as e:10    print(f"Error loading data: {e}")

Create Train Test splits from complete data

Ensure that the rows belonging to the same sequence are grouped in the same split. The following SQL query takes care of this split

In [7]:

1%%sql2USE "temp";3
4DROP TABLE IF EXISTS training_data;5DROP TABLE IF EXISTS test_data;6
7-- Training set8CREATE TABLE training_data AS9SELECT *10FROM (11    SELECT12        *,13        ROW_NUMBER() OVER (ORDER BY TransactionDate) AS rn,14        COUNT(*) OVER () AS total_rows15    FROM temp.metrics16) t17WHERE rn <= 0.8 * total_rows18ORDER BY TransactionDate;19
20-- Test set21CREATE TABLE test_data AS22SELECT *23FROM (24    SELECT25        *,26        ROW_NUMBER() OVER (ORDER BY TransactionDate) AS rn,27        COUNT(*) OVER () AS total_rows28    FROM temp.metrics29) t30WHERE rn > 0.8 * total_rows31ORDER BY TransactionDate;

Out [7]:

2009 rows affected.

Pre-process train data set to include recent context rows

  1. Combine the last N (e.g., 20) most recent rows from train_data with all rows from test_data, forming a continuous chronological dataset.

  2. The last N rows help provide recent context for model evaluation.

  3. The most recent 20 rows are then excluded from the final result since they are used only for observing short-term trends.

In [8]:

1%%sql2USE temp;3
4-- Drop old context table if it exists5DROP TABLE IF EXISTS test_data_with_history;6
7-- Create a new test_data_with_history table:8-- includes the last 20 rows from training_data + all rows from test_data9CREATE TABLE test_data_with_history AS10SELECT *11FROM (12    SELECT *13    FROM (14        SELECT *15        FROM training_data16        ORDER BY TransactionDate DESC17        LIMIT 2018    ) recent_training19    UNION ALL20    SELECT *21    FROM test_data22)23ORDER BY TransactionDate ASC;

Out [8]:

523 rows affected.

Step 5: Train the Anomaly Detection Model

Now we'll use the %%s2ml train cell magic to train our AnomalyDetection model.

In [9]:

1%s2ml list

Optionally delete any previous model

In [10]:

1%s2ml delete --model cc_anomaly_model_v1 --f

Out [10]:

{'status': 'deleted',
 'modelID': '174365d5-0135-47af-bf4f-2a582a03b5ba',
 'model': 'cc_anomaly_model_v1'}

In [11]:

1%%s2ml train as anomaly_model2task: AnomalyDetection3model: cc_anomaly_model_v14db: temp5input_table: training_data6target_column: TransactionAmount7target_time_column: TransactionDate8target_series_column: Channel9description: "Training a anomaly detection model"10runtime: cpu-small11selected_features: {"mode":"*","features":null}12force: True

Step 6: Check Model Status

Model training is an asynchronous job. We can use the %s2ml status magic to check on its progress. The model is ready to use when the status shows Completed.

(This may take a minute or two. Re-run the cell to refresh the status.)

In [12]:

1%s2ml status --model cc_anomaly_model_v1

In [13]:

1print(json.dumps(anomaly_model, indent=4))

Step 7: Run Predictions

Once the model is COMPLETED, you can use it directly in SQL via the ML_ANOMALY_DETECT() function.

Here is an example prediction for one row

In [14]:

1%%sql2show tables in temp;

Out [14]:

4 rows affected.

Run the model on a subset of data to view the results format

In [15]:

1%%sql2SELECT COUNT(*) as test_count FROM test_data_with_history;

Out [15]:

1 rows affected.

In [16]:

1%%sql2SELECT * FROM test_data_with_history ORDER BY TransactionDate ASC LIMIT 10;

Out [16]:

10 rows affected.

In [17]:

1%%sql2SELECT cluster.ML_ANOMALY_DETECT('cc_anomaly_model_v1', TO_JSON(selected_data.*)) AS is_anomaly3FROM (SELECT * FROM temp.test_data_with_history ORDER BY TransactionDate ASC) AS selected_data;

Out [17]:

1025 rows affected.

Step 8: Visualize Anomalies

In [18]:

1# Run Predictions2cursor = conn.cursor();3
4test_data_count = f"""5    SELECT count(*) FROM temp.test_data;6"""7cursor.execute(test_data_count)8
9QUERY_LIMIT = cursor.fetchall()[0][0]  # Number of records to analyze

In [19]:

1# Run Predictions2predictions = f"""3    SELECT cluster.ML_ANOMALY_DETECT('cc_anomaly_model_v1', TO_JSON(selected_data.*)) AS is_anomaly4    FROM (SELECT * FROM temp.test_data_with_history ORDER BY TransactionDate ASC) AS selected_data;5"""6
7cursor.execute(predictions)8
9
10# Parse and prepare data for visualization11
12column_names = [desc[0] for desc in cursor.description]13df = pd.DataFrame(list(cursor.fetchall()), columns=column_names)14
15print(f"✓ Retrieved {len(df)} records from database")16
17
18df.head()

In [20]:

1# ============================================2# AI generated code to visualize Predictions3# ============================================4
5SERIES_COLORS = {6    'ATM': '#1f77b4',7    'Branch': '#2ca02c',8    'Online': '#9467bd',9    'POS': '#d62728',10    'Mobile': '#ff7f0e'11}12DEFAULT_COLOR = '#8c564b'13
14# Parse JSON and filter out error rows15def safe_parse_json(x):16    try:17        parsed = json.loads(x)18        # Check if it's an error response19        if 'error' in parsed:20            return None21        return parsed22    except:23        return None24
25df['is_anomaly_parsed'] = df['is_anomaly'].apply(safe_parse_json)26
27# Filter out rows that failed to parse or had errors28df_valid = df[df['is_anomaly_parsed'].notna()].copy()29print(f"✓ Filtered out {len(df) - len(df_valid)} error/invalid rows")30print(f"✓ Processing {len(df_valid)} valid prediction rows")31
32if len(df_valid) == 0:33    print("❌ No valid predictions found. All rows contain errors.")34    print("\nSample error:")35    print(df['is_anomaly'].iloc[0])36else:37    # Check what keys are in the JSON38    print(f"✓ JSON keys found: {list(df_valid['is_anomaly_parsed'].iloc[0].keys())}")39
40    # Expand JSON into separate columns41    df_expanded = pd.json_normalize(df_valid['is_anomaly_parsed'])42
43    print(f"✓ Expanded columns: {list(df_expanded.columns)}")44
45    # Combine dataframes46    df_final = pd.concat([df_valid.reset_index(drop=True), df_expanded], axis=1)47
48    # Handle potential case sensitivity in column names49    # Check for different possible timestamp column names50    timestamp_col = None51    for possible_name in ['TS', 'ts', 'Ts', 'timestamp', 'Timestamp']:52        if possible_name in df_final.columns:53            timestamp_col = possible_name54            break55
56    if timestamp_col is None:57        print("Available columns:", list(df_final.columns))58        raise ValueError("Could not find timestamp column. Please check the column names above.")59
60    # Convert timestamp to datetime61    df_final['TS'] = pd.to_datetime(df_final[timestamp_col])62    print("Before drop:", len(df_final))63
64    # --- DROP WARM-UP CONTEXT ROWS ---65    warmup_count = 2066    df_final = df_final.sort_values('TS').reset_index(drop=True)67    if len(df_final) > warmup_count:68        df_final = df_final.iloc[warmup_count:].reset_index(drop=True)69    print(f"✓ Ignored the first {warmup_count} warm-up rows used for context.")70    print("After drop:", len(df_final))71
72
73    # Handle potential case sensitivity for Series column74    series_col = None75    for possible_name in ['Series', 'series', 'SERIES']:76        if possible_name in df_final.columns:77            series_col = possible_name78            break79
80    if series_col is None:81        raise ValueError("Could not find Series column")82
83    # Standardize column name84    if series_col != 'Series':85        df_final['Series'] = df_final[series_col]86
87    # Filter out any null Series values before sorting88    df_final = df_final[df_final['Series'].notna()].copy()89
90    # Handle potential case sensitivity for other columns91    column_mapping = {92        'Actual': ['Actual', 'actual'],93        'Forecast': ['Forecast', 'forecast'],94        'Lower bound': ['Lower bound', 'lower bound', 'lower_bound'],95        'Upper bound': ['Upper bound', 'upper bound', 'upper_bound'],96        'is_Anomaly': ['is_Anomaly', 'is_anomaly', 'isAnomaly']97    }98
99    for standard_name, possible_names in column_mapping.items():100        for possible_name in possible_names:101            if possible_name in df_final.columns:102                if possible_name != standard_name:103                    df_final[standard_name] = df_final[possible_name]104                break105
106    print(f"✓ Data preparation complete")107
108
109    # STEP 3: Dynamic series configuration110
111    series_types = sorted([s for s in df_final['Series'].unique() if isinstance(s, str)])112    num_series = len(series_types)113
114    print(f"✓ Found {num_series} unique series types: {', '.join(series_types)}")115
116    subplot_titles = [f"{series} Transactions" for series in series_types]117    row_heights = [1.0 / num_series] * num_series118
119
120    # STEP 4: Create optimized Plotly visualization121
122    fig = make_subplots(123        rows=num_series,124        cols=1,125        subplot_titles=tuple(subplot_titles),126        vertical_spacing=0.08,127        row_heights=row_heights128    )129
130    for idx, series_type in enumerate(series_types, start=1):131        series_data = df_final[df_final['Series'] == series_type].sort_values('TS')132        series_color = SERIES_COLORS.get(series_type, DEFAULT_COLOR)133
134        # Add confidence interval (upper bound)135        fig.add_trace(go.Scatter(136            x=series_data['TS'],137            y=series_data['Upper bound'],138            mode='lines',139            line=dict(width=0),140            showlegend=False,141            hoverinfo='skip',142            name='Upper Bound'143        ), row=idx, col=1)144
145        # Add confidence interval (lower bound with fill)146        fig.add_trace(go.Scatter(147            x=series_data['TS'],148            y=series_data['Lower bound'],149            mode='lines',150            line=dict(width=0),151            fillcolor='rgba(68, 168, 129, 0.12)',152            fill='tonexty',153            showlegend=False,154            hoverinfo='skip',155            name='Lower Bound'156        ), row=idx, col=1)157
158        # Add forecast line159        fig.add_trace(go.Scatter(160            x=series_data['TS'],161            y=series_data['Forecast'],162            mode='lines',163            name='Forecast',164            line=dict(color='gray', dash='dash', width=1.5),165            showlegend=(idx==1),166            hovertemplate='<b>Forecast</b><br>%{x}<br>$%{y:.2f}<extra></extra>'167        ), row=idx, col=1)168
169        # Add actual values - Use Scattergl for performance170        fig.add_trace(go.Scattergl(171            x=series_data['TS'],172            y=series_data['Actual'],173            mode='lines',174            name='Actual',175            line=dict(color=series_color, width=2),176            showlegend=(idx==1),177            hovertemplate='<b>Actual</b><br>%{x}<br>$%{y:.2f}<extra></extra>'178        ), row=idx, col=1)179
180        # Highlight anomalies181        anomalies = series_data[series_data['is_Anomaly'] == True]182        if len(anomalies) > 0:183            fig.add_trace(go.Scatter(184                x=anomalies['TS'],185                y=anomalies['Actual'],186                mode='markers',187                name='Anomaly',188                marker=dict(189                    size=12,190                    color='red',191                    symbol='x',192                    line=dict(width=2, color='darkred')193                ),194                showlegend=(idx==1),195                hovertemplate='<b>⚠️ ANOMALY</b><br>%{x}<br>$%{y:.2f}<extra></extra>'196            ), row=idx, col=1)197
198        # Update axes199        fig.update_xaxes(title_text="Time", row=idx, col=1, showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.1)')200        fig.update_yaxes(title_text="Amount ($)", row=idx, col=1, showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.1)')201
202
203    # STEP 5: Optimized layout204    plot_height = max(900, 280 * num_series)205
206    fig.update_layout(207        height=plot_height,208        width=1400,209        title={210            'text': f"Credit Card Transaction Anomaly Detection<br><sub>ML_ANOMALY_DETECT - {len(df_final):,} Records | {df_final['is_Anomaly'].sum()} Anomalies Detected</sub>",211            'x': 0.5,212            'xanchor': 'center',213            'y': 0.99,214            'yanchor': 'top',215            'font': {'size': 18}216        },217        hovermode='closest',218        showlegend=True,219        legend=dict(220            orientation="h",221            yanchor="top",222            y=-0.02,223            xanchor="center",224            x=0.5,225            bgcolor='rgba(255,255,255,0.9)',226            bordercolor='rgba(0,0,0,0.2)',227            borderwidth=1,228            font=dict(size=12)229        ),230        template='plotly_white',231        margin=dict(l=80, r=40, t=100, b=80),232    )233
234    fig.show()235
236
237    # STEP 6: Enhanced summary statistics238
239    print("\n" + "="*75)240    print(f"{'ANOMALY DETECTION SUMMARY':^75}")241    print("="*75)242    print(f"Total records analyzed: {len(df_final):,}")243    print(f"Date range: {df_final['TS'].min().strftime('%Y-%m-%d')} to {df_final['TS'].max().strftime('%Y-%m-%d')}")244    print(f"Anomalies detected: {df_final['is_Anomaly'].sum()}")245    print(f"Overall anomaly rate: {(df_final['is_Anomaly'].sum() / len(df_final) * 100):.2f}%")246    print(f"\nUnique series types: {num_series}")247    print("\nBreakdown by Series:")248    print("-"*75)249
250    for series in series_types:251        series_df = df_final[df_final['Series'] == series]252        series_count = len(series_df)253        anomaly_count = series_df['is_Anomaly'].sum()254        anomaly_rate = (anomaly_count/series_count*100) if series_count > 0 else 0255        series_avg = series_df['Actual'].mean()256        series_max = series_df['Actual'].max()257
258        print(f"  {series:12s}: {anomaly_count:3d}/{series_count:4d} anomalies ({anomaly_rate:5.2f}%) | "259              f"Avg: ${series_avg:7.2f} | Max: ${series_max:8.2f}")260
261    print("="*75)

Step 9: Cleanup

Run these following commands to cleanup all the created resources

In [21]:

1%%sql2USE temp;3
4-- Drop all tables created during the demo5DROP TABLE IF EXISTS results_table;6DROP TABLE IF EXISTS predict_table;7DROP TABLE IF EXISTS train_table;8DROP TABLE IF EXISTS transactions;9DROP TABLE IF EXISTS test_data_with_history;10
11-- Drop the database12DROP DATABASE IF EXISTS temp;

Out [21]:

3 rows affected.

Details


About this Template

Learn how to train an ML Anomaly Detect model and run it to predict the class of a set of time series inputs.

This Notebook can be run in Standard and Enterprise deployments.

Tags

advancednotebookspython

License

This Notebook has been released under the Apache 2.0 open source license.

See Notebook in action

Launch this notebook in SingleStore and start executing queries instantly.