New

Real-Time Event Monitoring Dataset From Kafka

Notebook


SingleStore Notebooks

Real-Time Event Monitoring Dataset From Kafka

Note

This notebook can be run on a Free Starter Workspace. To create a Free Starter Workspace navigate to Start using the left nav. You can also use your existing Standard or Premium workspace with this Notebook.

Introduction

The Real-Time Event Monitoring use case illustrates how to leverage Singlestore's capabilities to process and analyze streaming data from a Kafka data source. This demo showcases the ability to ingest real-time events, such as application logs or user activities, and perform immediate analysis to gain actionable insights. By working through this example, new users will learn how to set up a Kafka data pipeline, ingest streaming data into Singlestore, and execute real-time queries to monitor event types, user activity patterns, and detect anomalies. This use case highlights the power of Singlestore in providing timely and relevant information for decision-making in dynamic environments.

How to use this notebook

Create a database (You can skip this Step if you are using Free Starter Tier)

We need to create a database to work with in the following examples.

In [1]:

shared_tier_check = %sql show variables like 'is_shared_tier'
if not shared_tier_check or shared_tier_check[0][1] == 'OFF':
%sql DROP DATABASE IF EXISTS EventAnalysis;
%sql CREATE DATABASE EventAnalysis;

Action Required

If you have a Free Starter Workspace deployed already, select the database from drop-down menu at the top of this notebook. It updates the connection_url to connect to that database.

Create Table

In [2]:

%%sql
CREATE TABLE IF NOT EXISTS eventsdata /* Creating table for sample data. */ (
`user_id` varchar(120) DEFAULT NULL,
`event_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`advertiser` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`campaign` varchar(110) DEFAULT NULL,
`gender` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`income` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`page_url` varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`region` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`country` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL
)

Load Data using Pipeline

In [3]:

%%sql
CREATE PIPELINE IF NOT EXISTS eventsdata /* Creating pipeline for sample data. */
AS LOAD DATA KAFKA 'public-kafka.memcompute.com:9092/ad_events'
ENABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `eventsdata`
FIELDS TERMINATED BY '\t' ENCLOSED BY '' ESCAPED BY '\\'
LINES TERMINATED BY '\n' STARTING BY ''
(
`events`.`user_id`,
`events`.`event_name`,
`events`.`advertiser`,
`events`.`campaign`,
`events`.`gender`,
`events`.`income`,
`events`.`page_url`,
`events`.`region`,
`events`.`country`
);
START PIPELINE `eventsdata`;

In [4]:

%%sql
SELECT COUNT(*) FROM `eventsdata`

Sample Queries

Events by Region

In [5]:

%%sql
SELECT events.country
AS `events.country`,
COUNT(events.country) AS 'events.countofevents'
FROM eventsdata AS events
GROUP BY 1 ORDER BY 2 DESC LIMIT 5;

Events by Top 5 Advertisers

In [6]:

%%sql
SELECT
events.advertiser AS `events.advertiser`,
COUNT(*) AS `events.count`
FROM eventsdata AS events
WHERE
(events.advertiser LIKE '%Subway%' OR events.advertiser LIKE '%McDonalds%' OR events.advertiser LIKE '%Starbucks%' OR events.advertiser LIKE '%Dollar General%' OR events.advertiser LIKE '%YUM! Brands%')
GROUP BY 1
ORDER BY 2 DESC;

Ad visitors by gender and income

In [7]:

%%sql
SELECT * FROM (
SELECT *, DENSE_RANK() OVER (ORDER BY z___min_rank) as z___pivot_row_rank, RANK() OVER (PARTITION BY z__pivot_col_rank ORDER BY z___min_rank) as z__pivot_col_ordering, CASE WHEN z___min_rank = z___rank THEN 1 ELSE 0 END AS z__is_highest_ranked_cell FROM (
SELECT *, MIN(z___rank) OVER (PARTITION BY `events.income`) as z___min_rank FROM (
SELECT *, RANK() OVER (ORDER BY CASE WHEN z__pivot_col_rank=1 THEN (CASE WHEN `events.count` IS NOT NULL THEN 0 ELSE 1 END) ELSE 2 END, CASE WHEN z__pivot_col_rank=1 THEN `events.count` ELSE NULL END DESC, `events.count` DESC, z__pivot_col_rank, `events.income`) AS z___rank FROM (
SELECT *, DENSE_RANK() OVER (ORDER BY CASE WHEN `events.gender` IS NULL THEN 1 ELSE 0 END, `events.gender`) AS z__pivot_col_rank FROM (
SELECT
events.gender AS `events.gender`,
events.income AS `events.income`,
COUNT(*) AS `events.count`
FROM eventsdata AS events
WHERE
(events.income <> 'unknown' OR events.income IS NULL)
GROUP BY 1,2) ww
) bb WHERE z__pivot_col_rank <= 16384
) aa
) xx
) zz
WHERE (z__pivot_col_rank <= 50 OR z__is_highest_ranked_cell = 1) AND (z___pivot_row_rank <= 500 OR z__pivot_col_ordering = 1) ORDER BY z___pivot_row_rank;

Pipeline will keep pushing data from the kafka topic. Once your data is loaded you can stop the pipeline using below command

In [8]:

%%sql
STOP PIPELINE eventsdata

Conclusion

We have shown how to connect to Kafka using Pipelines and insert data into SinglestoreDB. These techniques should enable you to integrate your Kafka topics with SingleStoreDB.

Clean up

Remove the '#' to uncomment and execute the queries below to clean up the pipeline and table created.

Drop the pipeline using below command

In [9]:

%%sql
#DROP PIPELINE eventsdata

In [10]:

#shared_tier_check = %sql show variables like 'is_shared_tier'
#if not shared_tier_check or shared_tier_check[0][1] == 'OFF':
# %sql DROP DATABASE IF EXISTS EventAnalysis;
#else:
# %sql DROP TABLE eventsdata;

Details


About this Template

The Real-Time Event Monitoring use case illustrates how to leverage Singlestore's capabilities to process and analyze streaming data from a Kafka data source.

Notebook Icon

This Notebook can be run in Shared Tier, Standard and Enterprise deployments.

Tags

starterloaddatakafka

License

This Notebook has been released under the Apache 2.0 open source license.