Real-Time Event Monitoring Dataset From Kafka
Notebook
Note
This notebook can be run on a Free Starter Workspace. To create a Free Starter Workspace navigate to Start using the left nav. You can also use your existing Standard or Premium workspace with this Notebook.
Introduction
The Real-Time Event Monitoring use case illustrates how to leverage Singlestore's capabilities to process and analyze streaming data from a Kafka data source. This demo showcases the ability to ingest real-time events, such as application logs or user activities, and perform immediate analysis to gain actionable insights. By working through this example, new users will learn how to set up a Kafka data pipeline, ingest streaming data into Singlestore, and execute real-time queries to monitor event types, user activity patterns, and detect anomalies. This use case highlights the power of Singlestore in providing timely and relevant information for decision-making in dynamic environments.
How to use this notebook
Create a database (You can skip this Step if you are using Free Starter Tier)
We need to create a database to work with in the following examples.
In [1]:
shared_tier_check = %sql show variables like 'is_shared_tier'if not shared_tier_check or shared_tier_check[0][1] == 'OFF':%sql DROP DATABASE IF EXISTS EventAnalysis;%sql CREATE DATABASE EventAnalysis;
Action Required
If you have a Free Starter Workspace deployed already, select the database from drop-down menu at the top of this notebook. It updates the connection_url to connect to that database.
Create Table
In [2]:
%%sqlCREATE TABLE IF NOT EXISTS eventsdata /* Creating table for sample data. */ (`user_id` varchar(120) DEFAULT NULL,`event_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,`advertiser` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,`campaign` varchar(110) DEFAULT NULL,`gender` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,`income` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,`page_url` varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,`region` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,`country` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL)
Load Data using Pipeline
In [3]:
%%sqlCREATE PIPELINE IF NOT EXISTS eventsdata /* Creating pipeline for sample data. */AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/ad_events'ENABLE OUT_OF_ORDER OPTIMIZATIONDISABLE OFFSETS METADATA GCINTO TABLE `eventsdata`FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'LINES TERMINATED BY '\n' STARTING BY ''(`events`.`user_id`,`events`.`event_name`,`events`.`advertiser`,`events`.`campaign`,`events`.`gender`,`events`.`income`,`events`.`page_url`,`events`.`region`,`events`.`country`);START PIPELINE `eventsdata`;
In [4]:
%%sqlSELECT COUNT(*) FROM `eventsdata`
Sample Queries
Events by Region
In [5]:
%%sqlSELECT events.countryAS `events.country`,COUNT(events.country) AS 'events.countofevents'FROM eventsdata AS eventsGROUP BY 1 ORDER BY 2 DESC LIMIT 5;
Events by Top 5 Advertisers
In [6]:
%%sqlSELECTevents.advertiser AS `events.advertiser`,COUNT(*) AS `events.count`FROM eventsdata AS eventsWHERE(events.advertiser LIKE '%Subway%' OR events.advertiser LIKE '%McDonalds%' OR events.advertiser LIKE '%Starbucks%' OR events.advertiser LIKE '%Dollar General%' OR events.advertiser LIKE '%YUM! Brands%')GROUP BY 1ORDER BY 2 DESC;
Ad visitors by gender and income
In [7]:
%%sqlSELECT * FROM (SELECT *, DENSE_RANK() OVER (ORDER BY z___min_rank) as z___pivot_row_rank, RANK() OVER (PARTITION BY z__pivot_col_rank ORDER BY z___min_rank) as z__pivot_col_ordering, CASE WHEN z___min_rank = z___rank THEN 1 ELSE 0 END AS z__is_highest_ranked_cell FROM (SELECT *, MIN(z___rank) OVER (PARTITION BY `events.income`) as z___min_rank FROM (SELECT *, RANK() OVER (ORDER BY CASE WHEN z__pivot_col_rank=1 THEN (CASE WHEN `events.count` IS NOT NULL THEN 0 ELSE 1 END) ELSE 2 END, CASE WHEN z__pivot_col_rank=1 THEN `events.count` ELSE NULL END DESC, `events.count` DESC, z__pivot_col_rank, `events.income`) AS z___rank FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE WHEN `events.gender` IS NULL THEN 1 ELSE 0 END, `events.gender`) AS z__pivot_col_rank FROM (SELECTevents.gender AS `events.gender`,events.income AS `events.income`,COUNT(*) AS `events.count`FROM eventsdata AS eventsWHERE(events.income <> 'unknown' OR events.income IS NULL)GROUP BY 1,2) ww) bb WHERE z__pivot_col_rank <= 16384) aa) xx) zzWHERE (z__pivot_col_rank <= 50 OR z__is_highest_ranked_cell = 1) AND (z___pivot_row_rank <= 500 OR z__pivot_col_ordering = 1) ORDER BY z___pivot_row_rank;
Pipeline will keep pushing data from the kafka topic. Once your data is loaded you can stop the pipeline using below command
In [8]:
%%sqlSTOP PIPELINE eventsdata
Conclusion
We have shown how to connect to Kafka using Pipelines
and insert data into SinglestoreDB. These techniques should enable you to
integrate your Kafka topics with SingleStoreDB.
Clean up
Remove the '#' to uncomment and execute the queries below to clean up the pipeline and table created.
Drop the pipeline using below command
In [9]:
%%sql#DROP PIPELINE eventsdata
In [10]:
#shared_tier_check = %sql show variables like 'is_shared_tier'#if not shared_tier_check or shared_tier_check[0][1] == 'OFF':# %sql DROP DATABASE IF EXISTS EventAnalysis;#else:# %sql DROP TABLE eventsdata;
Details
About this Template
The Real-Time Event Monitoring use case illustrates how to leverage Singlestore's capabilities to process and analyze streaming data from a Kafka data source.
This Notebook can be run in Shared Tier, Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.