IT Threat Detection, Part 3
Notebook
Note
This tutorial is meant for Standard & Premium Workspaces. You can't run this with a Free Starter Workspace due to restrictions on Storage. Create a Workspace using +group in the left nav & select Standard for this notebook. Gallery notebooks tagged with "Starter" are suitable to run on a Free Starter Workspace
Get pipeline data from Confluent (Kafka)
We recommend for that step to use a S1+ size workspace
Action Required
Make sure to select the siem_log_kafka_demo database from the drop-down menu at the top of this notebook. It updates the connection_url which is used by the %%sql magic command and SQLAlchemy to make connections to the selected database.
In [1]:
%%sqlDROP PIPELINE IF EXISTS `siem_log_real`;DROP TABLE IF EXISTS `siem_log_real`;
We start creating a simple table to load the logs into a JSON column
In [2]:
%%sqlCREATE TABLE IF NOT EXISTS `siem_log_real` (`logs` JSON COLLATE utf8_bin, SHARD KEY ()) AUTOSTATS_CARDINALITY_MODE=PERIODIC AUTOSTATS_HISTOGRAM_MODE=CREATE SQL_MODE='STRICT_ALL_TABLES';
We create a pipeline from the Confluent Cluster with an interval of 20ms
In [3]:
%%sqlCREATE PIPELINE `siem_log_real`AS LOAD DATA KAFKA 'pkc-p11xm.us-east-1.aws.confluent.cloud:9092/singlestore_topic'CONFIG '{\"sasl.username\": \"WTIVCYPLUAIMIAYQ\",\n \"sasl.mechanism\": \"PLAIN\",\n \"security.protocol\": \"SASL_SSL\",\n \"ssl.ca.location\": \"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem\"}'CREDENTIALS '{"sasl.password": "/qIOhlTFEK8RNNCc1qSOnpNj4mqhXfudBlQQFgRfc0qBEjfm99VcyvEuwPILBcnv"}'BATCH_INTERVAL 20DISABLE OFFSETS METADATA GCINTO TABLE `siem_log_real`FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'LINES TERMINATED BY '\n' STARTING BY '';
Let's start the pipeline
In [4]:
%%sqlSTART PIPELINE siem_log_real;
We extract a few elements from the JSON column such as timestamp, Log_ID, and the vector to be stored in a blob format. Data is extracted as soon as an update is made to the table
In [5]:
%%sqlALTER TABLE siem_log_realADD COLUMN Timestamp as JSON_EXTRACT_STRING(`logs`,'Timestamp') PERSISTED datetime,ADD COLUMN model_res_blob AS JSON_ARRAY_PACK_F32(JSON_EXTRACT_STRING(`logs`, 'model_res')) PERSISTED BLOB,ADD COLUMN Log_ID AS JSON_EXTRACT_BIGINT(`logs`, 'Log_ID') PERSISTED bigint;
Install libraries for real-time dashboarding with Perspective
In [6]:
%pip install perspective-python --quiet
In [7]:
import perspectiveimport threadingimport randomimport timefrom datetime import datetime, datefrom perspective import Table, PerspectiveWidgetimport warningswarnings.filterwarnings('ignore')
We will set dashboard with a refresh rate of 500ms. We use two modes: stop and run to stop a dashboard retrieving results from the database.
In [8]:
def loop():while mode != 'stop':while mode == 'run':table.update(data_source())time.sleep(0.5)
Track Real-Time Connections
In [9]:
def data_source():result = %sql select Timestamp, count(*) as count_connections from siem_log_real group by Timestamp order by Timestamp desc limit 100result2 = list(result.dicts())return result2SCHEMA = {"Timestamp": datetime,"count_connections": int}
In [10]:
mode = 'run'table = perspective.Table(SCHEMA, limit=100)threading.Thread(target=loop).start()
In [11]:
perspective.PerspectiveWidget(table,title = "Track Real-Time Connections", group_by=["Timestamp"],plugin="Y Line",columns=["count_connections"])
In [12]:
mode = 'stop'
Monitor and Infer IT Threats using Semantic Search over Real-Time Data
In [13]:
def data_source():result = %sql WITH test_sql AS (SELECT Log_ID, TIMESTAMP, id, EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) AS score,ROW_NUMBER() OVER(PARTITION BY Log_ID ORDER BY EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) ASC) AS rn FROM (SELECT TIMESTAMP, Log_ID, model_res_blob FROM siem_log_real ORDER BY TIMESTAMP DESC LIMIT 20) CROSS JOIN model_results where score IS NOT NULL),label_table AS (SELECT Log_ID, TIMESTAMP,MAX(CASE WHEN id LIKE 'Bru%' OR id LIKE 'SQL%' THEN 'Malicious' ELSE 'Benign' END) as log_status FROM test_sql WHERE rn <= 100 GROUP BY Log_ID,TIMESTAMP order by TIMESTAMP DESC) SELECT log_status, count(Log_ID) as count_connections FROM label_table group by log_status;result2 = list(result.dicts())return result2SCHEMA = {"log_status": str,"count_connections": int}
In [14]:
mode = 'run'table = perspective.Table(SCHEMA, limit=100)threading.Thread(target=loop).start()
In [15]:
perspective.PerspectiveWidget(table,title = "Monitor Threat Inference", split_by=["log_status"],plugin="Y Line",columns=["count_connections"])
In [16]:
mode = 'stop'
Track latest connections with Inferences Threat Inference by Log IDs
In [17]:
def data_source():result = %sql WITH test_sql AS (SELECT Log_ID, TIMESTAMP, id, EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) AS score,ROW_NUMBER() OVER(PARTITION BY Log_ID ORDER BY EUCLIDEAN_DISTANCE(model_res_blob, Model_Results) ASC) AS rn FROM (SELECT TIMESTAMP, Log_ID, model_res_blob FROM siem_log_real ORDER BY TIMESTAMP DESC LIMIT 20) CROSS JOIN model_results where score IS NOT NULL),label_table AS (SELECT Log_ID, TIMESTAMP,MAX(CASE WHEN id LIKE 'Bru%' OR id LIKE 'SQL%' THEN 'Malicious' ELSE 'Benign' END) as log_status FROM test_sql WHERE rn <= 100 GROUP BY Log_ID,TIMESTAMP order by TIMESTAMP DESC) SELECT * FROM label_table;result2 = list(result.dicts())return result2SCHEMA = {"Log_ID": str,"TIMESTAMP": datetime,"log_status": str}
In [18]:
mode = 'run'table = perspective.Table(SCHEMA, limit=20)threading.Thread(target=loop).start()
In [19]:
perspective.PerspectiveWidget(table,title = "Latest Connections", group_by=["TIMESTAMP"],plugin="Datagrid",columns=["count_attack"])
In [20]:
mode = 'stop'
Details
About this Template
Part 3 of Real-time threat Detection - Integrate with Kafka, run and visualize Threat Detection on incoming logs. This notebook requires adjustments to work out of the box.
This Notebook can be run in Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.