New

Vector search with AI Function EMBED_TEXT

Notebook


SingleStore Notebooks

Vector search with AI Function EMBED_TEXT

Note

You can use your existing Standard or Premium workspace with this Notebook.

The AI functions are currently in Private Preview. Please reach out to support@singlestore.com to confirm if this feature can be enabled in your org.

What This Notebook Does

This Jupyter notebook demonstrates an end-to-end vector search pipeline using SingleStore:

  1. Data Ingestion — Downloads an example Amazon food reviews dataset and loads it into a SingleStore table

  2. Embedding Generation — Uses the SingleStore AI function EMBED_TEXT() to generate vector embeddings via a simple SQL call

  3. Vector Indexing — Creates an HNSW vector index on the embedding column for fast approximate nearest-neighbor (ANN) search

  4. Semantic Search — Performs vector similarity search using DOT_PRODUCT() to find reviews that are semantically similar to a natural-language query

  5. API Deployment — Deploys a FastAPI endpoint as a SingleStore Cloud Function for production-ready semantic search

Why SingleStore for Vector Search?

  • Simple SQL-based embeddings: EMBED_TEXT() is tightly integrated into the SQL layer — you call it like any other SQL function. No need to set up external embedding services, manage API keys for third-party providers, or write client-side embedding code. The embedding logic runs on SingleStore's Aura compute infrastructure, so model management is handled for you.

  • HNSW indexing: Approximate nearest-neighbor search reduces query time from O(n) full-scan to sub-linear, critical for large datasets

  • Unified platform: One database for transactional, analytical, and vector workloads — no need to sync data to a separate vector database

Prerequisites: Ensure AI Functions are installed on your deployment (AI > AI & ML Functions).

Step 1: Configuration

Define key parameters for the pipeline. These control dataset size, embedding model, and database targets.

| Parameter | Purpose | |-----------|---------| | SAMPLE_SIZE | Number of reviews to process. Start small (1,000) for testing, scale up for production. | | EMBEDDING_MODEL | The model used by EMBED_TEXT(). titan-embed-text-v2-0 produces 1024-dimensional vectors optimized for semantic similarity. | | MAX_EMBEDDING_TOKENS | Token limit per text. Embedding models have fixed context windows — truncating to 512 tokens ensures quality and avoids silent truncation by the model. | | RECREATE_DB | Set to True to drop and rebuild everything from scratch. |

In [1]:

1SAMPLE_SIZE = 100002EMBEDDING_MODEL = 'titan-embed-text-v2-0'3VECTOR_DIMENSIONS = 10244DATABASE_NAME = 'vector_search_demo'5TABLE_NAME = 'product_reviews'6RECREATE_DB = False7
8MAX_EMBEDDING_TOKENS = 512  # Optimal token limit for embedding model quality9APPROX_CHARS_PER_TOKEN = 4  # ~4 chars per token for English text10
11TEST_VECTOR_SEARCH = False # change this value to run example vector search12
13print(f"✓ Configuration: {SAMPLE_SIZE:,} rows | Model: {EMBEDDING_MODEL} | Max tokens: {MAX_EMBEDDING_TOKENS}")

Step 2: Install Dependencies & Import Libraries

We need:

  • kagglehub — to download the Amazon Fine Food Reviews dataset directly from Kaggle

  • singlestoredb — the official SingleStore Python SDK (provides connections, cursor API, and cloud function deployment via singlestoredb.apps)

  • fastapi / pydantic — to define the search API that will be deployed as a SingleStore Cloud Function

  • tqdm — for progress bars during batch operations

In [2]:

1import time2
3# Track total workflow time from the very start4start_time = time.time()5
6# Track library installation time7install_start = time.time()8print("📦 Installing dependencies...")9
10!pip install -q kagglehub singlestoredb fastapi pydantic tqdm11
12install_time = time.time() - install_start13print(f"✓ Dependencies installed in {install_time:.1f}s")14
15# Import libraries16import_start = time.time()17
18import pandas as pd19import sqlalchemy as sa20import singlestoredb as s221import kagglehub22import concurrent.futures23from fastapi import FastAPI, HTTPException24from pydantic import BaseModel, Field25from typing import Optional, List26from tqdm.auto import tqdm27
28import_time = time.time() - import_start29
30print(f"✓ Libraries imported in {import_time:.2f}s")31print(f"✓ Total setup time: {(install_time + import_time):.1f}s")

Step 3: Check Current State (Idempotent Pipeline)

Before doing any work, we check what already exists in the database. This makes the notebook idempotent — you can re-run it safely without duplicating data or re-generating embeddings.

The state check inspects:

  • Whether the database and table exist

  • How many rows are loaded vs. needed

  • How many rows already have embeddings

  • Whether the vector index has been created

Only the steps that are actually needed will execute in subsequent cells.

In [3]:

1# Check what's already done using direct connection2state_check_start = time.time()3
4conn = s2.connect()5
6try:7    with conn.cursor() as cur:8
9        # Drop DB10        if RECREATE_DB:11            print(f"  ⚠️  Dropping existing database...")12            cur.execute(f"DROP DATABASE IF EXISTS {DATABASE_NAME}")13
14        # Check database exists15        cur.execute(f"""16            SELECT COUNT(*) FROM information_schema.schemata17            WHERE schema_name = '{DATABASE_NAME}'18        """)19        db_exists = cur.fetchone()[0] > 020
21        # Initialize defaults22        table_exists = False23        total_rows = 024        embedded_rows = 025        index_exists = False26
27        if db_exists:28            try:29                # Try to use the database30                cur.execute(f"USE {DATABASE_NAME}")31
32                # Check table exists33                cur.execute(f"""34                    SELECT COUNT(*) FROM information_schema.tables35                    WHERE table_schema = '{DATABASE_NAME}' AND table_name = '{TABLE_NAME}'36                """)37                table_exists = cur.fetchone()[0] > 038
39                if table_exists:40                    # Check data41                    cur.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}")42                    total_rows = cur.fetchone()[0]43
44                    # Check embeddings45                    cur.execute(f"""46                        SELECT COUNT(*) FROM {TABLE_NAME} WHERE review_embedding IS NOT NULL47                    """)48                    embedded_rows = cur.fetchone()[0]49
50                    # Check vector index51                    cur.execute(f"""52                        SELECT COUNT(*) FROM information_schema.statistics53                        WHERE table_schema = '{DATABASE_NAME}'54                        AND table_name = '{TABLE_NAME}'55                        AND index_name = 'vindex_review_embedding'56                    """)57                    index_exists = cur.fetchone()[0] > 058
59            except Exception as e:60                # If we can't USE the database, treat it as non-existent61                print(f"⚠️  Database exists but cannot be accessed: {e}")62                print(f"   Will recreate database: {DATABASE_NAME}")63                db_exists = False64                table_exists = False65                total_rows = 066                embedded_rows = 067                index_exists = False68finally:69    conn.close()70
71# Determine what needs to be done72needs_data = total_rows < SAMPLE_SIZE73needs_embedding = embedded_rows < SAMPLE_SIZE74needs_index = not index_exists75
76state_check_time = time.time() - state_check_start77
78print("=" * 70)79print("CURRENT STATE")80print("=" * 70)81print(f"Database exists:     {db_exists}")82print(f"Table exists:        {table_exists}")83print(f"Rows in table:       {total_rows:,}")84print(f"Embedded rows:       {embedded_rows:,}")85print(f"Vector index:        {'Yes' if index_exists else 'No'}")86print(f"State check time:    {state_check_time*1000:.2f} ms")87print("=" * 70)88print(f"\nACTIONS NEEDED:")89print(f"  {'✓' if not needs_data else '→'} Data ingestion {'' if not needs_data else f'(need {SAMPLE_SIZE - total_rows:,} more rows)'}")90print(f"  {'✓' if not needs_embedding else '→'} Embedding generation {'' if not needs_embedding else f'(need {SAMPLE_SIZE - embedded_rows:,} embeddings)'}")91print(f"  {'✓' if not needs_index else '→'} Vector index creation")92print("=" * 70)

Step 4: Create Database & Table

Creates the database and table if they don't already exist. Key design choices:

  • VECTOR(1024) column type stores fixed-dimension float vectors natively in SingleStore — no serialization/deserialization overhead

  • review_text_for_embedding stores the token-truncated text that was actually embedded (important for reproducibility)

  • SORT KEY (id) optimizes sequential scans during batch processing

  • Secondary keys on product_id and score enable fast filtered queries alongside vector search

In [4]:

1if not db_exists or not table_exists:2    setup_start = time.time()3    print("\n📦 Setting up database and table...")4
5    conn = s2.connect()6    try:7        with conn.cursor() as cur:8            # Drop and recreate if there were access issues9            if db_exists and not table_exists:10                print(f"  ⚠️  Dropping existing inaccessible database...")11                cur.execute(f"DROP DATABASE IF EXISTS {DATABASE_NAME}")12
13            # Create database14            cur.execute(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")15            cur.execute(f"USE {DATABASE_NAME}")16
17            # Create table18            cur.execute(f"""19                CREATE TABLE IF NOT EXISTS {TABLE_NAME} (20                    id BIGINT PRIMARY KEY,21                    product_id VARCHAR(50),22                    user_id VARCHAR(50),23                    profile_name VARCHAR(255),24                    score INT,25                    review_time BIGINT,26                    summary TEXT,27                    review_text TEXT,28                    full_review_text TEXT,29                    review_text_for_embedding TEXT,30                    review_embedding VECTOR({VECTOR_DIMENSIONS}) NULL,31                    KEY (product_id),32                    KEY (score),33                    SORT KEY (id)34                )35            """)36        conn.commit()37    finally:38        conn.close()39
40    setup_time = time.time() - setup_start41    print(f"✓ Database and table created in {setup_time:.2f}s")42else:43    print("\n✓ Database and table already exist - skipping creation")

Step 5: Download & Ingest Data

Downloads the Amazon Fine Food Reviews dataset from Kaggle and inserts it into SingleStore.

Token-aware truncation: Embedding models have a maximum input length (measured in tokens, not characters). Text beyond the limit is silently truncated by the model, which can degrade embedding quality. We pre-truncate at word boundaries to approximately MAX_EMBEDDING_TOKENS tokens (~4 characters per token for English) so we control exactly what gets embedded.

Data is inserted in batches of 1,000 rows for efficient network utilization without hitting memory limits.

In [5]:

1if needs_data:2    data_start = time.time()3    rows_needed = SAMPLE_SIZE - total_rows4    print(f"\n📥 Downloading and inserting {rows_needed:,} rows...")5
6    # Download dataset7    download_start = time.time()8    path = kagglehub.dataset_download("snap/amazon-fine-food-reviews")9    df = pd.read_csv(f"{path}/Reviews.csv")10    download_time = time.time() - download_start11    print(f"  ✓ Downloaded in {download_time:.1f}s")12
13    # Prepare data14    prep_start = time.time()15    sample_df = df.iloc[total_rows:SAMPLE_SIZE].copy()16    sample_df['full_review_text'] = sample_df['Summary'].fillna('') + '. ' + sample_df['Text'].fillna('')17
18    # --- Token-aware truncation for embedding quality ---19    max_chars = MAX_EMBEDDING_TOKENS * APPROX_CHARS_PER_TOKEN  # e.g. 512 * 4 = 2048 chars20
21    def truncate_to_token_limit(text, max_chars):22        """Truncate text to approximate token limit, breaking at word boundaries."""23        text = str(text).strip()24        if len(text) <= max_chars:25            return text26        # Truncate at last word boundary before the limit27        truncated = text[:max_chars]28        last_space = truncated.rfind(' ')29        if last_space > max_chars * 0.8:  # Only break at word boundary if reasonable30            truncated = truncated[:last_space]31        return truncated.rstrip()32
33    sample_df['review_text_for_embedding'] = sample_df['full_review_text'].apply(34        lambda x: truncate_to_token_limit(x, max_chars)35    )36
37    # Report truncation stats38    original_lengths = sample_df['full_review_text'].str.len()39    truncated_lengths = sample_df['review_text_for_embedding'].str.len()40    num_truncated = (original_lengths > max_chars).sum()41    print(f"  ✂️  Token truncation (max {MAX_EMBEDDING_TOKENS} tokens ≈ {max_chars} chars):")42    print(f"    Truncated: {num_truncated:,}/{len(sample_df):,} reviews ({num_truncated/len(sample_df)*100:.1f}%)")43    print(f"    Original avg:  {original_lengths.mean():.0f} chars | Truncated avg: {truncated_lengths.mean():.0f} chars")44
45    sample_df = sample_df.fillna({'ProductId': '', 'UserId': '', 'ProfileName': '', 'Score': 0})46
47    # Rename columns48    sample_df = sample_df.rename(columns={49        'Id': 'id', 'ProductId': 'product_id', 'UserId': 'user_id',50        'ProfileName': 'profile_name', 'Score': 'score', 'Time': 'review_time',51        'Summary': 'summary', 'Text': 'review_text'52    })53    prep_time = time.time() - prep_start54    print(f"  ✓ Prepared data in {prep_time:.1f}s")55
56    # Insert with progress bar57    insert_start = time.time()58    conn = s2.connect()59
60    try:61        with conn.cursor() as cur:62            cur.execute(f"USE {DATABASE_NAME}")63
64            batch_size = 100065            num_batches = (len(sample_df) + batch_size - 1) // batch_size66
67            # Progress bar for insertion68            with tqdm(total=len(sample_df), desc="  Inserting rows", unit="rows") as pbar:69                for i in range(0, len(sample_df), batch_size):70                    batch = sample_df.iloc[i:i+batch_size]71
72                    # Build multi-row insert73                    values = []74                    for _, row in batch.iterrows():75                        # Escape single quotes76                        product_id = str(row['product_id']).replace("'", "''")77                        user_id = str(row['user_id']).replace("'", "''")78                        profile_name = str(row['profile_name']).replace("'", "''")79                        summary = str(row['summary']).replace("'", "''")80                        review_text = str(row['review_text']).replace("'", "''")81                        full_text = str(row['full_review_text']).replace("'", "''")82                        text_for_embedding = str(row['review_text_for_embedding']).replace("'", "''")83
84                        values.append(85                            f"({row['id']}, '{product_id}', '{user_id}', "86                            f"'{profile_name}', {row['score']}, {row['review_time']}, "87                            f"'{summary}', '{review_text}', '{full_text}', '{text_for_embedding}')"88                        )89
90                    insert_query = f"""91                        INSERT INTO {TABLE_NAME}92                        (id, product_id, user_id, profile_name, score, review_time,93                         summary, review_text, full_review_text, review_text_for_embedding)94                        VALUES {','.join(values)}95                    """96                    cur.execute(insert_query)97                    pbar.update(len(batch))98
99        conn.commit()100    finally:101        conn.close()102
103    insert_time = time.time() - insert_start104    data_time = time.time() - data_start105
106    print(f"\n  ✓ Inserted {rows_needed:,} rows")107    print(f"    Download:  {download_time:.1f}s")108    print(f"    Prepare:   {prep_time:.1f}s")109    print(f"    Insert:    {insert_time:.1f}s ({rows_needed/insert_time:.0f} rows/sec)")110    print(f"    Total:     {data_time:.1f}s")111else:112    print(f"\n✓ Already have {total_rows:,} rows - skipping data ingestion")

Step 6: Generate Embeddings with EMBED_TEXT()

This is the core AI step. Instead of writing Python code to call an external embedding API, we use SingleStore's EMBED_TEXT() function to generate embeddings with a simple SQL call.

How EMBED_TEXT() works

EMBED_TEXT(text_column, model => 'titan-embed-text-v2-0', tokens_threshold => 2000000)
  • Integrated directly into SingleStore's SQL layer : you call it like any other function in a SELECT or INSERT statement

  • The embedding computation runs on SingleStore's Aura compute infrastructure, which manages the model hosting and scaling for you

  • The tokens_threshold parameter controls batching: the function accumulates rows until it reaches this token count, then sends them to the model in one batch (higher = more throughput, but more memory)

  • Returns a JSON array of floats, which we unpack with JSON_ARRAY_UNPACK_F64() and cast to VECTOR(1024)

Why this is simpler than external embedding APIs

With traditional approaches, you'd need to: extract text from the database → call an external API (OpenAI, Cohere, etc.) → manage rate limits and retries → write embeddings back. With EMBED_TEXT(), it's a single SQL statement.

Why parallel batch processing?

We use 50 worker threads, each processing 10 rows at a time. This saturates the embedding throughput capacity. The INSERT ... ON DUPLICATE KEY UPDATE pattern lets us safely upsert, if a row already has an embedding, it gets overwritten (enabling retries on failure).

In [6]:

1if needs_embedding:2    embedding_total_start = time.time()3    rows_to_embed = SAMPLE_SIZE - embedded_rows4    print(f"\n🚀 Generating {rows_to_embed:,} embeddings in parallel...")5    print(f"   Using token-truncated text (max {MAX_EMBEDDING_TOKENS} tokens) for optimal embedding quality")6
7    # Get text content size statistics for token throughput calculation8    content_stats_conn = s2.connect()9    try:10        with content_stats_conn.cursor() as cur:11            cur.execute(f"USE {DATABASE_NAME}")12            cur.execute(f"""13                SELECT14                    SUM(LENGTH(review_text_for_embedding)) as total_chars,15                    AVG(LENGTH(review_text_for_embedding)) as avg_chars,16                    MIN(LENGTH(review_text_for_embedding)) as min_chars,17                    MAX(LENGTH(review_text_for_embedding)) as max_chars18                FROM {TABLE_NAME}19                WHERE review_embedding IS NULL20            """)21            stats = cur.fetchone()22            total_content_chars = float(stats[0] or 0)23            avg_content_chars = float(stats[1] or 0)24            min_content_chars = float(stats[2] or 0)25            max_content_chars = float(stats[3] or 0)26            # Rough estimate: 1 token ≈ 4 characters for English text27            estimated_total_tokens = total_content_chars / APPROX_CHARS_PER_TOKEN28    finally:29        content_stats_conn.close()30
31    print(f"  📏 Content size statistics (truncated text):")32    print(f"    Total content:    {total_content_chars:,} chars (~{estimated_total_tokens:,.0f} estimated tokens)")33    print(f"    Avg per row:      {avg_content_chars:,.0f} chars (~{avg_content_chars/APPROX_CHARS_PER_TOKEN:,.0f} tokens)")34    print(f"    Range:            {min_content_chars:,} - {max_content_chars:,} chars")35
36    INSERT_SQL = f"""37    INSERT INTO {TABLE_NAME} (id, review_embedding)38    SELECT39        id,40        JSON_ARRAY_UNPACK_F64(41            cluster.EMBED_TEXT(review_text_for_embedding, model => '{EMBEDDING_MODEL}', tokens_threshold => 2000000)42        ) :> VECTOR({VECTOR_DIMENSIONS})43    FROM (44        WITH numbered AS (45            SELECT ROW_NUMBER() OVER (ORDER BY id) AS rn, id, review_text_for_embedding46            FROM {TABLE_NAME}47            WHERE review_embedding IS NULL48        )49        SELECT * FROM numbered WHERE rn BETWEEN %s AND %s50    )51    ON DUPLICATE KEY UPDATE review_embedding = VALUES(review_embedding);52    """53
54    from queue import Queue55    import threading56
57    # OPTIMIZED: Small batches + Many workers = True parallelism58    batch_size = 10       # Small batches for maximum parallelism59    max_workers = 50      # High worker count to saturate cluster60
61    # Create connection pool62    connection_pool = Queue(maxsize=max_workers)63
64    pool_start = time.time()65    for _ in range(max_workers):66        conn = s2.connect()67        with conn.cursor() as cur:68            cur.execute(f"USE {DATABASE_NAME}")69        connection_pool.put(conn)70    pool_time = time.time() - pool_start71
72    def run_insert_range(rn_start: int, rn_end: int):73        conn = connection_pool.get()74        try:75            with conn.cursor() as cursor:76                cursor.execute(INSERT_SQL, (rn_start, rn_end))77            conn.commit()78            return (rn_start, rn_end, None)79        except Exception as e:80            return (rn_start, rn_end, str(e)[:100])81        finally:82            connection_pool.put(conn)83
84    # Create batches85    ranges = [(i, min(i + batch_size - 1, rows_to_embed))86              for i in range(1, rows_to_embed + 1, batch_size)]87
88    embed_start = time.time()89    errors = []90
91    print(f"  Configuration: {batch_size} rows/batch × {max_workers} workers = {len(ranges)} batches")92
93    # Progress bar94    with tqdm(total=len(ranges), desc="  Processing", unit="batch") as pbar:95        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:96            futures = {executor.submit(run_insert_range, start, end): (start, end)97                      for (start, end) in ranges}98
99            for future in concurrent.futures.as_completed(futures):100                start, end, err = future.result()101                if err:102                    errors.append((start, end, err))103
104                elapsed = time.time() - embed_start105                embeddings_done = (pbar.n + 1) * batch_size106                rate = embeddings_done / elapsed if elapsed > 0 else 0107
108                pbar.set_postfix({'rate': f'{rate:.0f} emb/s'})109                pbar.update(1)110
111    embed_time = time.time() - embed_start112
113    # Cleanup114    while not connection_pool.empty():115        conn = connection_pool.get()116        conn.close()117
118    # Summary119    print()120    if errors:121        print(f"  ⚠️  {len(errors)}/{len(ranges)} batches failed ({len(errors)/len(ranges)*100:.1f}%)")122
123    actual_throughput = rows_to_embed / embed_time124    chars_per_sec = total_content_chars / embed_time if embed_time > 0 else 0125    tokens_per_sec = estimated_total_tokens / embed_time if embed_time > 0 else 0126    print(f"  ✓ Generated {rows_to_embed:,} embeddings in {embed_time:.1f}s")127    print(f"    Throughput:  {actual_throughput:.0f} embeddings/sec")128    print(f"    Avg latency: {embed_time/rows_to_embed*1000:.2f} ms/embedding")129    print(f"    Content processed: {total_content_chars:,} chars (~{estimated_total_tokens:,.0f} tokens)")130    print(f"    Token throughput:  {tokens_per_sec:,.0f} tokens/sec ({chars_per_sec:,.0f} chars/sec)")131
132else:133    print(f"\n✓ Already have {embedded_rows:,} embeddings - skipping")

Step 7: Create Vector Index (HNSW)

A vector index is what makes similarity search fast. Without it, every query must compute DOT_PRODUCT() against all rows (a full table scan). With an HNSW index, SingleStore uses Approximate Nearest Neighbor (ANN) search that runs in sub-linear time.

What is HNSW?

Hierarchical Navigable Small World (HNSW) is a graph-based ANN algorithm. It builds a multi-layer graph where:

  • Each vector is a node connected to its nearest neighbors

  • Higher layers have fewer nodes and longer-range connections (for fast coarse search)

  • Lower layers have more nodes and shorter-range connections (for fine-grained refinement)

Search starts at the top layer and "navigates" down, like zooming in on a map.

HNSW_FLAT vs other types

HNSW_FLAT stores the original full-precision vectors (no compression). This gives exact dot-product scores while still getting the speed benefits of the HNSW graph navigation. Use HNSW_PQ (product quantization) if you need to save memory at the cost of some accuracy.

Important: Query pattern for index usage

For SingleStore to use the vector index, your query must follow this pattern:

SELECT ..., DOT_PRODUCT(vector_col, query_vec) AS score
FROM table
ORDER BY score DESC
LIMIT k

Adding WHERE filters on non-vector columns (e.g., score >= 3) may cause the optimizer to bypass the vector index and fall back to a full scan. See the search endpoint below for how to handle filtered vector search correctly.

In [7]:

1if needs_index:2    index_start = time.time()3    print("\n🔍 Creating vector index...")4
5    conn = s2.connect()6    try:7        with conn.cursor() as cur:8            cur.execute(f"USE {DATABASE_NAME}")9            cur.execute(f"""10                ALTER TABLE {TABLE_NAME}11                ADD VECTOR INDEX vindex_review_embedding(review_embedding)12                INDEX_OPTIONS '{{"index_type":"HNSW_FLAT"}}'13            """)14        conn.commit()15    finally:16        conn.close()17
18    index_time = time.time() - index_start19    print(f"✓ Vector index created in {index_time:.2f}s")20else:21    print("\n✓ Vector index already exists - skipping")

Step 8: Verify Vector Index Usage

Use EXPLAIN to confirm that the query optimizer is actually using the HNSW vector index. Look for VECTOR_KNN_SCAN or ANN in the query plan — this proves the index is being used for approximate nearest-neighbor search rather than a full table scan.

If you see a regular TABLE SCAN instead, the query pattern may not match what the optimizer expects (see the note in Step 7 above).

In [8]:

1# Verify the vector index is being used by the query optimizer2print("🔍 Verifying vector index usage with EXPLAIN...\n")3
4conn = s2.connect()5try:6    with conn.cursor() as cur:7        cur.execute(f"USE {DATABASE_NAME}")8
9        # Generate a sample query embedding for the EXPLAIN10        cur.execute(f"""11            SELECT JSON_ARRAY_UNPACK_F64(12                cluster.EMBED_TEXT('sample query text', model => '{EMBEDDING_MODEL}')13            ) :> VECTOR({VECTOR_DIMENSIONS}) as query_embedding14        """)15        sample_embedding = cur.fetchone()[0]16
17        # Run EXPLAIN on the vector search query18        cur.execute(f"""19            EXPLAIN SELECT20                id, score,21                LEFT(full_review_text, 80) as preview,22                DOT_PRODUCT(review_embedding, %s) as similarity23            FROM {TABLE_NAME}24            ORDER BY similarity DESC25            LIMIT 526        """, (sample_embedding,))27
28        explain_results = cur.fetchall()29
30        print("EXPLAIN output:")31        print("-" * 80)32        for row in explain_results:33            print(row)34        print("-" * 80)35
36        # Check if vector index is being used37        explain_text = str(explain_results).lower()38        if 'vector' in explain_text or 'ann' in explain_text or 'hnsw' in explain_text:39            print("\n✅ Vector index IS being used (ANN search)")40        else:41            print("\n⚠️  Vector index may NOT be used — check the EXPLAIN output above")42            print("    Ensure the query follows: ORDER BY DOT_PRODUCT(...) DESC LIMIT k")43finally:44    conn.close()

Step 9: Test Vector Search

Run a simple semantic search to verify everything works end-to-end. The flow is:

  1. Embed the query : call EMBED_TEXT() on the search text to get a query vector

  2. Compute similarity : calculate DOT_PRODUCT(review_embedding, query_vec) for each row

  3. Rank and return : ORDER BY similarity DESC LIMIT k returns the top matches

Because we created an HNSW index, Step 2-3 use approximate nearest-neighbor search instead of scanning every row. For 10,000 rows the difference is small, but for millions of rows this is the difference between milliseconds and seconds.

Note: Set TEST_VECTOR_SEARCH = True in the configuration cell to enable this step.

In [9]:

1if TEST_VECTOR_SEARCH:2    test_start = time.time()3    print("\n" + "=" * 70)4    print("TESTING VECTOR SEARCH")5    print("=" * 70)6
7    test_query = "delicious and healthy food"8
9    conn = s2.connect()10    try:11        with conn.cursor() as cur:12            cur.execute(f"USE {DATABASE_NAME}")13
14            # Embed query using the same model used for stored embeddings15            cur.execute(f"""16                SELECT JSON_ARRAY_UNPACK_F64(17                    cluster.EMBED_TEXT(%s, model => '{EMBEDDING_MODEL}')18                ) :> VECTOR({VECTOR_DIMENSIONS}) as query_embedding19            """, (test_query,))20            query_embedding = cur.fetchone()[0]21
22            # Vector similarity search — this pattern ensures HNSW index usage:23            # ORDER BY DOT_PRODUCT(...) DESC LIMIT k  (no WHERE filters on non-vector columns)24            search_start = time.time()25            cur.execute(f"""26                SELECT27                    id, score,28                    LEFT(full_review_text, 80) as preview,29                    DOT_PRODUCT(review_embedding, %s) as similarity30                FROM {TABLE_NAME}31                ORDER BY similarity DESC32                LIMIT 533            """, (query_embedding,))34
35            results = cur.fetchall()36            search_time = time.time() - search_start37    finally:38        conn.close()39
40    print(f"\nQuery: '{test_query}'")41    print(f"Search time: {search_time*1000:.2f} ms\n")42
43    for i, row in enumerate(results, 1):44        print(f"{i}. [Score: {row[3]:.4f}] [{row[1]}★] {row[2]}...")45
46    test_time = time.time() - test_start47    print("\n" + "=" * 70)48    print(f"✅ Vector search working! (total test time: {test_time:.2f}s)")49else:50    print("✅ Skipped Vector search check")

Step 10: Pipeline Summary

Review the full execution timing breakdown. This helps you understand where time is spent and optimize for your use case:

  • Cold start (install + import) is one-time overhead

  • Embedding generation is typically the bottleneck, scale workers and batch size to match your cluster capacity

  • Index creation is fast and only happens once

In [10]:

1# NOTEBOOK EXECUTION SUMMARY2
3total_time = time.time() - start_time4
5print("\n" + "=" * 80)6print("📊 NOTEBOOK EXECUTION SUMMARY")7print("=" * 80)8
9# Configuration summary10print(f"\n📋 Configuration:")11print(f"  Dataset size:        {SAMPLE_SIZE:,} rows")12print(f"  Embedding model:     {EMBEDDING_MODEL}")13print(f"  Vector dimensions:   {VECTOR_DIMENSIONS}")14print(f"  Max embedding tokens:{MAX_EMBEDDING_TOKENS} (~{MAX_EMBEDDING_TOKENS * APPROX_CHARS_PER_TOKEN} chars)")15print(f"  Database:            {DATABASE_NAME}")16print(f"  Table:               {TABLE_NAME}")17
18# Timing breakdown19print(f"\n⏱️  Execution Timing:")20print(f"  Setup:")21print(f"    - Dependency install: {install_time:.1f}s")22print(f"    - Library imports:    {import_time:.2f}s")23
24# State check time (if available)25if 'state_check_time' in globals():26    print(f"  State check:            {state_check_time*1000:.2f} ms")27
28# Database/Table setup29if 'setup_time' in globals():30    print(f"  Database/Table setup:   {setup_time:.2f}s")31else:32    print(f"  Database/Table setup:   skipped (already exists)")33
34# Data ingestion35if 'data_time' in globals():36    print(f"  Data ingestion:         {data_time:.2f}s")37    if 'download_time' in globals():38        print(f"    - Download:           {download_time:.2f}s")39    if 'prep_time' in globals():40        print(f"    - Prepare:            {prep_time:.2f}s")41    if 'insert_time' in globals():42        rows_rate = rows_needed/insert_time if 'rows_needed' in globals() and insert_time > 0 else 043        print(f"    - Insert:             {insert_time:.2f}s ({rows_rate:.0f} rows/s)")44    if 'num_truncated' in globals():45        print(f"    - Reviews truncated:  {num_truncated:,}/{SAMPLE_SIZE:,} ({num_truncated/SAMPLE_SIZE*100:.1f}%)")46else:47    print(f"  Data ingestion:         skipped ({total_rows:,} rows exist)")48
49# Embedding generation50if 'embed_time' in globals():51    emb_rate = rows_to_embed/embed_time if 'rows_to_embed' in globals() and embed_time > 0 else 052    print(f"  Embedding generation:   {embed_time:.2f}s ({emb_rate:.0f} emb/s)")53    if 'total_content_chars' in globals() and total_content_chars > 0:54        print(f"    - Content size:       {total_content_chars:,} chars (~{estimated_total_tokens:,.0f} tokens)")55        print(f"    - Avg text/row:       {avg_content_chars:,.0f} chars (~{avg_content_chars/APPROX_CHARS_PER_TOKEN:,.0f} tokens)")56        print(f"    - Text range:         {min_content_chars:,} - {max_content_chars:,} chars/row")57        print(f"    - Token throughput:   {estimated_total_tokens/embed_time:,.0f} tokens/sec")58else:59    print(f"  Embedding generation:   skipped ({embedded_rows:,} embeddings exist)")60
61# Index creation62if 'index_time' in globals():63    print(f"  Index creation:         {index_time:.2f}s")64else:65    print(f"  Index creation:         skipped (index exists)")66
67# Vector search test68if 'test_time' in globals():69    print(f"  Vector search test:     {test_time:.2f}s")70else:71    print(f"  Vector search test:     skipped")72
73# Total time summary74print(f"\n{'─' * 80}")75print(f"🎯 Total Pipeline Time:      {total_time:.2f}s ({total_time/60:.2f} minutes)")76cold_start_overhead = install_time + import_time77print(f"   Cold Start Overhead:      {cold_start_overhead:.2f}s ({cold_start_overhead/total_time*100:.1f}%)")78print(f"{'─' * 80}")79
80# Performance metrics81print(f"\n📈 Performance Metrics (if applicable):")82
83if 'data_time' in globals() and 'embed_time' in globals():84    core_time = data_time + embed_time85    print(f"  End-to-end throughput:  {SAMPLE_SIZE/core_time:.0f} rows/sec (excluding setup)")86elif 'data_time' in globals():87    print(f"  Data processing rate:   {rows_needed/data_time:.0f} rows/sec")88elif 'embed_time' in globals():89    print(f"  Embedding rate:         {rows_to_embed/embed_time:.0f} embeddings/sec")90
91if 'insert_time' in globals() and 'rows_needed' in globals():92    print(f"  Data ingest rate:       {rows_needed/insert_time:.0f} rows/sec")93
94if 'embed_time' in globals() and 'rows_to_embed' in globals():95    print(f"  Embedding throughput:   {rows_to_embed/embed_time:.0f} embeddings/sec")96    if 'total_content_chars' in globals() and total_content_chars > 0:97        print(f"  Token throughput:       {estimated_total_tokens/embed_time:,.0f} tokens/sec")98        print(f"  Chars throughput:       {total_content_chars/embed_time:,.0f} chars/sec")

Step 11: Define Cloud Function API

Now we wrap the vector search in a FastAPI application that can be deployed as a SingleStore Cloud Function. This gives you a production-ready REST API endpoint.

Endpoints

| Method | Path | Description | |--------|------|-------------| | GET | / | API info and available endpoints | | GET | /health | Health check with database statistics | | POST | /search | Semantic vector search |

This API demonstrates EMBED_TEXT() + DOT_PRODUCT() vector search as a building block. You can extend it with application-specific filters, pagination, or business logic for your use case.

In [11]:

1# CLOUD FUNCTION API DEFINITION2print("\n" + "=" * 80)3print("🚀 DEFINING CLOUD FUNCTION API")4print("=" * 80)5
6from fastapi import FastAPI, HTTPException7from pydantic import BaseModel, Field8from typing import Optional, List9import singlestoredb as s210import singlestoredb.apps as apps11
12# Request/Response Models13class SearchRequest(BaseModel):14    query: str = Field(..., description="Search query text", min_length=1, max_length=500)15    limit: Optional[int] = Field(10, description="Number of results to return", ge=1, le=100)16
17class SearchResult(BaseModel):18    id: int19    product_id: str20    review_text: str21    star_rating: int22    similarity_score: float23
24class SearchResponse(BaseModel):25    status: str26    query: str27    results_count: int28    results: List[SearchResult]29    latency_ms: float30
31class HealthResponse(BaseModel):32    status: str33    database: str34    total_reviews: int35    embedded_reviews: int36    index_status: str37
38# Create FastAPI Application39app = FastAPI(40    title="Product Review Vector Search API",41    version="1.0.0",42    description="Semantic search for product reviews using SingleStore vector embeddings"43)44
45@app.get("/")46async def root():47    """API information endpoint"""48    return {49        "service": "Product Review Vector Search API",50        "version": "1.0.0",51        "model": EMBEDDING_MODEL,52        "dimensions": VECTOR_DIMENSIONS,53        "database": DATABASE_NAME,54        "endpoints": {55            "GET /": "API information",56            "GET /health": "Health check and database stats",57            "POST /search": "Semantic vector search"58        }59    }60
61@app.get("/health", response_model=HealthResponse)62async def health():63    """Health check endpoint with database statistics"""64    try:65        with s2.connect() as conn:66            with conn.cursor() as cur:67                cur.execute(f"USE {DATABASE_NAME}")68
69                cur.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}")70                total = cur.fetchone()[0]71
72                cur.execute(f"SELECT COUNT(*) FROM {TABLE_NAME} WHERE review_embedding IS NOT NULL")73                embedded = cur.fetchone()[0]74
75                cur.execute(f"""76                    SELECT COUNT(*) FROM information_schema.statistics77                    WHERE table_schema = '{DATABASE_NAME}'78                    AND table_name = '{TABLE_NAME}'79                    AND index_name = 'vindex_review_embedding'80                """)81                index_exists = cur.fetchone()[0] > 082
83        return HealthResponse(84            status="healthy",85            database=DATABASE_NAME,86            total_reviews=total,87            embedded_reviews=embedded,88            index_status="active" if index_exists else "missing"89        )90    except Exception as e:91        raise HTTPException(status_code=503, detail=f"Service unavailable: {str(e)}")92
93@app.post("/search", response_model=SearchResponse)94async def search(request: SearchRequest):95    """96    Semantic search endpoint for product reviews.97
98    Embeds the query text using EMBED_TEXT() with the same model used for99    stored embeddings, then performs DOT_PRODUCT vector similarity search100    using the HNSW index to return the top-k most relevant reviews.101    """102    search_start = time.time()103
104    try:105        with s2.connect() as conn:106            with conn.cursor() as cur:107                cur.execute(f"USE {DATABASE_NAME}")108
109                # Embed the query using the same model as stored embeddings110                cur.execute(f"""111                    SELECT JSON_ARRAY_UNPACK_F64(112                        cluster.EMBED_TEXT(%s, model => '{EMBEDDING_MODEL}')113                    ) :> VECTOR({VECTOR_DIMENSIONS})114                """, (request.query,))115                query_embedding = cur.fetchone()[0]116
117                # Vector similarity search using HNSW index118                cur.execute(f"""119                    SELECT120                        id,121                        product_id,122                        full_review_text,123                        score,124                        DOT_PRODUCT(review_embedding, %s) as similarity125                    FROM {TABLE_NAME}126                    ORDER BY similarity DESC127                    LIMIT %s128                """, (query_embedding, request.limit))129
130                rows = cur.fetchall()131
132                results = [133                    SearchResult(134                        id=row[0],135                        product_id=row[1],136                        review_text=row[2],137                        star_rating=row[3],138                        similarity_score=float(row[4])139                    ) for row in rows140                ]141
142        latency = (time.time() - search_start) * 1000143
144        return SearchResponse(145            status="success",146            query=request.query,147            results_count=len(results),148            results=results,149            latency_ms=round(latency, 2)150        )151
152    except Exception as e:153        raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")154
155print("\n✓ FastAPI application defined")156print("\nEndpoints configured:")157print("  GET  /          - API information")158print("  GET  /health    - Health check with database stats")159print("  POST /search    - Semantic vector search")

Step 12: Deploy as Cloud Function

singlestoredb.apps.run_function_app() deploys the FastAPI app as a SingleStore Cloud Function. This:

  • Packages the FastAPI app into a containerized microservice

  • Hosts it on SingleStore's infrastructure with automatic HTTPS

  • Provides a URL you can call from any HTTP client

How to Call the Deployed Endpoint

After deployment, you'll need two things from the SingleStore Portal UI:

  1. Endpoint URL : Find it in the Cloud Functions section of the portal after deployment

  2. API Key : Generate one in the portal under Cloud Functions > API Keys. This key is required for all requests.

All requests must include a Bearer authorization header with your API key:

# Health check
curl https://<your-endpoint-url>/health \
  -H "Authorization: Bearer <your-api-key>"

# Semantic search
curl -X POST https://<your-endpoint-url>/search \
  -H "Authorization: Bearer <your-api-key>" \
  -H "Content-Type: application/json" \
  -d '{"query": "delicious healthy snack", "limit": 5}'

Note: Replace <your-endpoint-url> and <your-api-key> with the actual values from the SingleStore Portal.

In [12]:

1# ========================================2# DEPLOY CLOUD FUNCTION3# ========================================4
5print("\n" + "=" * 80)6print("🚀 DEPLOYING CLOUD FUNCTION")7print("=" * 80)8
9# Deploy the function app10connection_info = await apps.run_function_app(app)

Details


About this Template

Learn how to use EMBED_TEXT AI Function in Vector Search and deploy it as a SingleStore CloudFunction

This Notebook can be run in Standard and Enterprise deployments.

Tags

notebookspython

See Notebook in action

Launch this notebook in SingleStore and start executing queries instantly.

License

This Notebook has been released under the Apache 2.0 open source license.