New

IT Threat Detection, Part 3

Notebook


SingleStore Notebooks

IT Threat Detection, Part 3

Note

This tutorial is meant for Standard & Premium Workspaces. You can't run this with a Free Starter Workspace due to restrictions on Storage. Create a Workspace using +group in the left nav & select Standard for this notebook. Gallery notebooks tagged with "Starter" are suitable to run on a Free Starter Workspace

Get pipeline data from Confluent (Kafka)

Kafka_SingleStore.png

We recommend for that step to use a S1+ size workspace

Action Required

Make sure to select the siem_log_kafka_demo database from the drop-down menu at the top of this notebook. It updates the connection_url which is used by the %%sql magic command and SQLAlchemy to make connections to the selected database.

In [1]:

%%sql
DROP PIPELINE IF EXISTS `siem_log_real`;
DROP TABLE IF EXISTS `siem_log_real`;

We start creating a simple table to load the logs into a JSON column

In [2]:

%%sql
CREATE TABLE IF NOT EXISTS `siem_log_real` (
`logs` JSON COLLATE utf8_bin
, SHARD KEY ()
) AUTOSTATS_CARDINALITY_MODE=PERIODIC AUTOSTATS_HISTOGRAM_MODE=CREATE SQL_MODE='STRICT_ALL_TABLES';

We create a pipeline from the Confluent Cluster with an interval of 20ms

In [3]:

%%sql
CREATE PIPELINE `siem_log_real`
AS LOAD DATA KAFKA 'pkc-p11xm.us-east-1.aws.confluent.cloud:9092/singlestore_topic'
CONFIG '{\"sasl.username\": \"WTIVCYPLUAIMIAYQ\",\n \"sasl.mechanism\": \"PLAIN\",\n \"security.protocol\": \"SASL_SSL\",\n \"ssl.ca.location\": \"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem\"}'
CREDENTIALS '{"sasl.password": "/qIOhlTFEK8RNNCc1qSOnpNj4mqhXfudBlQQFgRfc0qBEjfm99VcyvEuwPILBcnv"}'
BATCH_INTERVAL 20
DISABLE OFFSETS METADATA GC
INTO TABLE `siem_log_real`
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES TERMINATED BY '\n' STARTING BY '';

Let's start the pipeline

In [4]:

%%sql
START PIPELINE siem_log_real;

We extract a few elements from the JSON column such as timestamp, Log_ID, and the vector to be stored in a blob format. Data is extracted as soon as an update is made to the table

In [5]:

%%sql
ALTER TABLE siem_log_real
ADD COLUMN Timestamp as JSON_EXTRACT_STRING(`logs`,'Timestamp') PERSISTED datetime,
ADD COLUMN model_res_blob AS JSON_ARRAY_PACK_F32(JSON_EXTRACT_STRING(`logs`, 'model_res')) PERSISTED BLOB,
ADD COLUMN Log_ID AS JSON_EXTRACT_BIGINT(`logs`, 'Log_ID') PERSISTED bigint;

Install libraries for real-time dashboarding with Perspective

In [6]:

%pip install perspective-python --quiet

In [7]:

import perspective
import threading
import random
import time
from datetime import datetime, date
from perspective import Table, PerspectiveWidget
import warnings
warnings.filterwarnings('ignore')

We will set dashboard with a refresh rate of 500ms. We use two modes: stop and run to stop a dashboard retrieving results from the database.

In [8]:

def loop():
while mode != 'stop':
while mode == 'run':
table.update(data_source())
time.sleep(0.5)

Track Real-Time Connections

In [9]:

def data_source():
result = %sql select Timestamp, count(*) as count_connections from siem_log_real group by Timestamp order by Timestamp desc limit 100
result2 = list(result.dicts())
return result2
SCHEMA = {
"Timestamp": datetime,
"count_connections": int
}

In [10]:

mode = 'run'
table = perspective.Table(SCHEMA, limit=100)
threading.Thread(target=loop).start()

In [11]:

perspective.PerspectiveWidget(table,title = "Track Real-Time Connections", group_by=["Timestamp"],plugin="Y Line",columns=["count_connections"])

In [12]:

mode = 'stop'

Monitor and Infer IT Threats using Semantic Search over Real-Time Data

Semantic_searches.png

In [13]:

def data_source():
result = %sql WITH test_sql AS (SELECT Log_ID, TIMESTAMP, id, EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) AS score,ROW_NUMBER() OVER(PARTITION BY Log_ID ORDER BY EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) ASC) AS rn FROM (SELECT TIMESTAMP, Log_ID, model_res_blob FROM siem_log_real ORDER BY TIMESTAMP DESC LIMIT 20) CROSS JOIN model_results where score IS NOT NULL),label_table AS (SELECT Log_ID, TIMESTAMP,MAX(CASE WHEN id LIKE 'Bru%' OR id LIKE 'SQL%' THEN 'Malicious' ELSE 'Benign' END) as log_status FROM test_sql WHERE rn <= 100 GROUP BY Log_ID,TIMESTAMP order by TIMESTAMP DESC) SELECT log_status, count(Log_ID) as count_connections FROM label_table group by log_status;
result2 = list(result.dicts())
return result2
SCHEMA = {
"log_status": str,
"count_connections": int
}

In [14]:

mode = 'run'
table = perspective.Table(SCHEMA, limit=100)
threading.Thread(target=loop).start()

In [15]:

perspective.PerspectiveWidget(table,title = "Monitor Threat Inference", split_by=["log_status"],plugin="Y Line",columns=["count_connections"])

In [16]:

mode = 'stop'

Track latest connections with Inferences Threat Inference by Log IDs

In [17]:

def data_source():
result = %sql WITH test_sql AS (SELECT Log_ID, TIMESTAMP, id, EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) AS score,ROW_NUMBER() OVER(PARTITION BY Log_ID ORDER BY EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) ASC) AS rn FROM (SELECT TIMESTAMP, Log_ID, model_res_blob FROM siem_log_real ORDER BY TIMESTAMP DESC LIMIT 20) CROSS JOIN model_results where score IS NOT NULL),label_table AS (SELECT Log_ID, TIMESTAMP,MAX(CASE WHEN id LIKE 'Bru%' OR id LIKE 'SQL%' THEN 'Malicious' ELSE 'Benign' END) as log_status FROM test_sql WHERE rn <= 100 GROUP BY Log_ID,TIMESTAMP order by TIMESTAMP DESC) SELECT * FROM label_table;
result2 = list(result.dicts())
return result2
SCHEMA = {
"Log_ID": str,
"TIMESTAMP": datetime,
"log_status": str
}

In [18]:

mode = 'run'
table = perspective.Table(SCHEMA, limit=20)
threading.Thread(target=loop).start()

In [19]:

perspective.PerspectiveWidget(table,title = "Latest Connections", group_by=["TIMESTAMP"],plugin="Datagrid",columns=["count_attack"])

In [20]:

mode = 'stop'

Details


About this Template

Part 3 of Real-time threat Detection - Integrate with Kafka, run and visualize Threat Detection on incoming logs. This notebook requires adjustments to work out of the box.

Notebook Icon

This Notebook can be run in Standard and Enterprise deployments.

Tags

advancedcybersecurityvectordbiotai

License

This Notebook has been released under the Apache 2.0 open source license.