Vector search with AI Function EMBED_TEXT
Notebook
Note
You can use your existing Standard or Premium workspace with this Notebook.
The AI functions are currently in Private Preview. Please reach out to support@singlestore.com to confirm if this feature can be enabled in your org.
What This Notebook Does
This Jupyter notebook demonstrates an end-to-end vector search pipeline using SingleStore:
Data Ingestion — Downloads an example Amazon food reviews dataset and loads it into a SingleStore table
Embedding Generation — Uses the SingleStore AI function
EMBED_TEXT()to generate vector embeddings via a simple SQL callVector Indexing — Creates an HNSW vector index on the embedding column for fast approximate nearest-neighbor (ANN) search
Semantic Search — Performs vector similarity search using
DOT_PRODUCT()to find reviews that are semantically similar to a natural-language queryAPI Deployment — Deploys a FastAPI endpoint as a SingleStore Cloud Function for production-ready semantic search
Why SingleStore for Vector Search?
Simple SQL-based embeddings:
EMBED_TEXT()is tightly integrated into the SQL layer — you call it like any other SQL function. No need to set up external embedding services, manage API keys for third-party providers, or write client-side embedding code. The embedding logic runs on SingleStore's Aura compute infrastructure, so model management is handled for you.HNSW indexing: Approximate nearest-neighbor search reduces query time from O(n) full-scan to sub-linear, critical for large datasets
Unified platform: One database for transactional, analytical, and vector workloads — no need to sync data to a separate vector database
Prerequisites: Ensure AI Functions are installed on your deployment (AI > AI & ML Functions).
Step 1: Configuration
Define key parameters for the pipeline. These control dataset size, embedding model, and database targets.
| Parameter | Purpose |
|-----------|---------|
| SAMPLE_SIZE | Number of reviews to process. Start small (1,000) for testing, scale up for production. |
| EMBEDDING_MODEL | The model used by EMBED_TEXT(). titan-embed-text-v2-0 produces 1024-dimensional vectors optimized for semantic similarity. |
| MAX_EMBEDDING_TOKENS | Token limit per text. Embedding models have fixed context windows — truncating to 512 tokens ensures quality and avoids silent truncation by the model. |
| RECREATE_DB | Set to True to drop and rebuild everything from scratch. |
In [1]:
1SAMPLE_SIZE = 100002EMBEDDING_MODEL = 'titan-embed-text-v2-0'3VECTOR_DIMENSIONS = 10244DATABASE_NAME = 'vector_search_demo'5TABLE_NAME = 'product_reviews'6RECREATE_DB = False7 8MAX_EMBEDDING_TOKENS = 512 # Optimal token limit for embedding model quality9APPROX_CHARS_PER_TOKEN = 4 # ~4 chars per token for English text10 11TEST_VECTOR_SEARCH = False # change this value to run example vector search12 13print(f"✓ Configuration: {SAMPLE_SIZE:,} rows | Model: {EMBEDDING_MODEL} | Max tokens: {MAX_EMBEDDING_TOKENS}")
Step 2: Install Dependencies & Import Libraries
We need:
kagglehub— to download the Amazon Fine Food Reviews dataset directly from Kagglesinglestoredb— the official SingleStore Python SDK (provides connections, cursor API, and cloud function deployment viasinglestoredb.apps)fastapi/pydantic— to define the search API that will be deployed as a SingleStore Cloud Functiontqdm— for progress bars during batch operations
In [2]:
1import time2 3# Track total workflow time from the very start4start_time = time.time()5 6# Track library installation time7install_start = time.time()8print("📦 Installing dependencies...")9 10!pip install -q kagglehub singlestoredb fastapi pydantic tqdm11 12install_time = time.time() - install_start13print(f"✓ Dependencies installed in {install_time:.1f}s")14 15# Import libraries16import_start = time.time()17 18import pandas as pd19import sqlalchemy as sa20import singlestoredb as s221import kagglehub22import concurrent.futures23from fastapi import FastAPI, HTTPException24from pydantic import BaseModel, Field25from typing import Optional, List26from tqdm.auto import tqdm27 28import_time = time.time() - import_start29 30print(f"✓ Libraries imported in {import_time:.2f}s")31print(f"✓ Total setup time: {(install_time + import_time):.1f}s")
Step 3: Check Current State (Idempotent Pipeline)
Before doing any work, we check what already exists in the database. This makes the notebook idempotent — you can re-run it safely without duplicating data or re-generating embeddings.
The state check inspects:
Whether the database and table exist
How many rows are loaded vs. needed
How many rows already have embeddings
Whether the vector index has been created
Only the steps that are actually needed will execute in subsequent cells.
In [3]:
1# Check what's already done using direct connection2state_check_start = time.time()3 4conn = s2.connect()5 6try:7 with conn.cursor() as cur:8 9 # Drop DB10 if RECREATE_DB:11 print(f" ⚠️ Dropping existing database...")12 cur.execute(f"DROP DATABASE IF EXISTS {DATABASE_NAME}")13 14 # Check database exists15 cur.execute(f"""16 SELECT COUNT(*) FROM information_schema.schemata17 WHERE schema_name = '{DATABASE_NAME}'18 """)19 db_exists = cur.fetchone()[0] > 020 21 # Initialize defaults22 table_exists = False23 total_rows = 024 embedded_rows = 025 index_exists = False26 27 if db_exists:28 try:29 # Try to use the database30 cur.execute(f"USE {DATABASE_NAME}")31 32 # Check table exists33 cur.execute(f"""34 SELECT COUNT(*) FROM information_schema.tables35 WHERE table_schema = '{DATABASE_NAME}' AND table_name = '{TABLE_NAME}'36 """)37 table_exists = cur.fetchone()[0] > 038 39 if table_exists:40 # Check data41 cur.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}")42 total_rows = cur.fetchone()[0]43 44 # Check embeddings45 cur.execute(f"""46 SELECT COUNT(*) FROM {TABLE_NAME} WHERE review_embedding IS NOT NULL47 """)48 embedded_rows = cur.fetchone()[0]49 50 # Check vector index51 cur.execute(f"""52 SELECT COUNT(*) FROM information_schema.statistics53 WHERE table_schema = '{DATABASE_NAME}'54 AND table_name = '{TABLE_NAME}'55 AND index_name = 'vindex_review_embedding'56 """)57 index_exists = cur.fetchone()[0] > 058 59 except Exception as e:60 # If we can't USE the database, treat it as non-existent61 print(f"⚠️ Database exists but cannot be accessed: {e}")62 print(f" Will recreate database: {DATABASE_NAME}")63 db_exists = False64 table_exists = False65 total_rows = 066 embedded_rows = 067 index_exists = False68finally:69 conn.close()70 71# Determine what needs to be done72needs_data = total_rows < SAMPLE_SIZE73needs_embedding = embedded_rows < SAMPLE_SIZE74needs_index = not index_exists75 76state_check_time = time.time() - state_check_start77 78print("=" * 70)79print("CURRENT STATE")80print("=" * 70)81print(f"Database exists: {db_exists}")82print(f"Table exists: {table_exists}")83print(f"Rows in table: {total_rows:,}")84print(f"Embedded rows: {embedded_rows:,}")85print(f"Vector index: {'Yes' if index_exists else 'No'}")86print(f"State check time: {state_check_time*1000:.2f} ms")87print("=" * 70)88print(f"\nACTIONS NEEDED:")89print(f" {'✓' if not needs_data else '→'} Data ingestion {'' if not needs_data else f'(need {SAMPLE_SIZE - total_rows:,} more rows)'}")90print(f" {'✓' if not needs_embedding else '→'} Embedding generation {'' if not needs_embedding else f'(need {SAMPLE_SIZE - embedded_rows:,} embeddings)'}")91print(f" {'✓' if not needs_index else '→'} Vector index creation")92print("=" * 70)
Step 4: Create Database & Table
Creates the database and table if they don't already exist. Key design choices:
VECTOR(1024)column type stores fixed-dimension float vectors natively in SingleStore — no serialization/deserialization overheadreview_text_for_embeddingstores the token-truncated text that was actually embedded (important for reproducibility)SORT KEY (id)optimizes sequential scans during batch processingSecondary keys on
product_idandscoreenable fast filtered queries alongside vector search
In [4]:
1if not db_exists or not table_exists:2 setup_start = time.time()3 print("\n📦 Setting up database and table...")4 5 conn = s2.connect()6 try:7 with conn.cursor() as cur:8 # Drop and recreate if there were access issues9 if db_exists and not table_exists:10 print(f" ⚠️ Dropping existing inaccessible database...")11 cur.execute(f"DROP DATABASE IF EXISTS {DATABASE_NAME}")12 13 # Create database14 cur.execute(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")15 cur.execute(f"USE {DATABASE_NAME}")16 17 # Create table18 cur.execute(f"""19 CREATE TABLE IF NOT EXISTS {TABLE_NAME} (20 id BIGINT PRIMARY KEY,21 product_id VARCHAR(50),22 user_id VARCHAR(50),23 profile_name VARCHAR(255),24 score INT,25 review_time BIGINT,26 summary TEXT,27 review_text TEXT,28 full_review_text TEXT,29 review_text_for_embedding TEXT,30 review_embedding VECTOR({VECTOR_DIMENSIONS}) NULL,31 KEY (product_id),32 KEY (score),33 SORT KEY (id)34 )35 """)36 conn.commit()37 finally:38 conn.close()39 40 setup_time = time.time() - setup_start41 print(f"✓ Database and table created in {setup_time:.2f}s")42else:43 print("\n✓ Database and table already exist - skipping creation")
Step 5: Download & Ingest Data
Downloads the Amazon Fine Food Reviews dataset from Kaggle and inserts it into SingleStore.
Token-aware truncation: Embedding models have a maximum input length (measured in tokens, not characters). Text beyond the limit is silently truncated by the model, which can degrade embedding quality. We pre-truncate at word boundaries to approximately MAX_EMBEDDING_TOKENS tokens (~4 characters per token for English) so we control exactly what gets embedded.
Data is inserted in batches of 1,000 rows for efficient network utilization without hitting memory limits.
In [5]:
1if needs_data:2 data_start = time.time()3 rows_needed = SAMPLE_SIZE - total_rows4 print(f"\n📥 Downloading and inserting {rows_needed:,} rows...")5 6 # Download dataset7 download_start = time.time()8 path = kagglehub.dataset_download("snap/amazon-fine-food-reviews")9 df = pd.read_csv(f"{path}/Reviews.csv")10 download_time = time.time() - download_start11 print(f" ✓ Downloaded in {download_time:.1f}s")12 13 # Prepare data14 prep_start = time.time()15 sample_df = df.iloc[total_rows:SAMPLE_SIZE].copy()16 sample_df['full_review_text'] = sample_df['Summary'].fillna('') + '. ' + sample_df['Text'].fillna('')17 18 # --- Token-aware truncation for embedding quality ---19 max_chars = MAX_EMBEDDING_TOKENS * APPROX_CHARS_PER_TOKEN # e.g. 512 * 4 = 2048 chars20 21 def truncate_to_token_limit(text, max_chars):22 """Truncate text to approximate token limit, breaking at word boundaries."""23 text = str(text).strip()24 if len(text) <= max_chars:25 return text26 # Truncate at last word boundary before the limit27 truncated = text[:max_chars]28 last_space = truncated.rfind(' ')29 if last_space > max_chars * 0.8: # Only break at word boundary if reasonable30 truncated = truncated[:last_space]31 return truncated.rstrip()32 33 sample_df['review_text_for_embedding'] = sample_df['full_review_text'].apply(34 lambda x: truncate_to_token_limit(x, max_chars)35 )36 37 # Report truncation stats38 original_lengths = sample_df['full_review_text'].str.len()39 truncated_lengths = sample_df['review_text_for_embedding'].str.len()40 num_truncated = (original_lengths > max_chars).sum()41 print(f" ✂️ Token truncation (max {MAX_EMBEDDING_TOKENS} tokens ≈ {max_chars} chars):")42 print(f" Truncated: {num_truncated:,}/{len(sample_df):,} reviews ({num_truncated/len(sample_df)*100:.1f}%)")43 print(f" Original avg: {original_lengths.mean():.0f} chars | Truncated avg: {truncated_lengths.mean():.0f} chars")44 45 sample_df = sample_df.fillna({'ProductId': '', 'UserId': '', 'ProfileName': '', 'Score': 0})46 47 # Rename columns48 sample_df = sample_df.rename(columns={49 'Id': 'id', 'ProductId': 'product_id', 'UserId': 'user_id',50 'ProfileName': 'profile_name', 'Score': 'score', 'Time': 'review_time',51 'Summary': 'summary', 'Text': 'review_text'52 })53 prep_time = time.time() - prep_start54 print(f" ✓ Prepared data in {prep_time:.1f}s")55 56 # Insert with progress bar57 insert_start = time.time()58 conn = s2.connect()59 60 try:61 with conn.cursor() as cur:62 cur.execute(f"USE {DATABASE_NAME}")63 64 batch_size = 100065 num_batches = (len(sample_df) + batch_size - 1) // batch_size66 67 # Progress bar for insertion68 with tqdm(total=len(sample_df), desc=" Inserting rows", unit="rows") as pbar:69 for i in range(0, len(sample_df), batch_size):70 batch = sample_df.iloc[i:i+batch_size]71 72 # Build multi-row insert73 values = []74 for _, row in batch.iterrows():75 # Escape single quotes76 product_id = str(row['product_id']).replace("'", "''")77 user_id = str(row['user_id']).replace("'", "''")78 profile_name = str(row['profile_name']).replace("'", "''")79 summary = str(row['summary']).replace("'", "''")80 review_text = str(row['review_text']).replace("'", "''")81 full_text = str(row['full_review_text']).replace("'", "''")82 text_for_embedding = str(row['review_text_for_embedding']).replace("'", "''")83 84 values.append(85 f"({row['id']}, '{product_id}', '{user_id}', "86 f"'{profile_name}', {row['score']}, {row['review_time']}, "87 f"'{summary}', '{review_text}', '{full_text}', '{text_for_embedding}')"88 )89 90 insert_query = f"""91 INSERT INTO {TABLE_NAME}92 (id, product_id, user_id, profile_name, score, review_time,93 summary, review_text, full_review_text, review_text_for_embedding)94 VALUES {','.join(values)}95 """96 cur.execute(insert_query)97 pbar.update(len(batch))98 99 conn.commit()100 finally:101 conn.close()102 103 insert_time = time.time() - insert_start104 data_time = time.time() - data_start105 106 print(f"\n ✓ Inserted {rows_needed:,} rows")107 print(f" Download: {download_time:.1f}s")108 print(f" Prepare: {prep_time:.1f}s")109 print(f" Insert: {insert_time:.1f}s ({rows_needed/insert_time:.0f} rows/sec)")110 print(f" Total: {data_time:.1f}s")111else:112 print(f"\n✓ Already have {total_rows:,} rows - skipping data ingestion")
Step 6: Generate Embeddings with EMBED_TEXT()
This is the core AI step. Instead of writing Python code to call an external embedding API, we use SingleStore's EMBED_TEXT() function to generate embeddings with a simple SQL call.
How EMBED_TEXT() works
EMBED_TEXT(text_column, model => 'titan-embed-text-v2-0', tokens_threshold => 2000000)
Integrated directly into SingleStore's SQL layer : you call it like any other function in a
SELECTorINSERTstatementThe embedding computation runs on SingleStore's Aura compute infrastructure, which manages the model hosting and scaling for you
The
tokens_thresholdparameter controls batching: the function accumulates rows until it reaches this token count, then sends them to the model in one batch (higher = more throughput, but more memory)Returns a JSON array of floats, which we unpack with
JSON_ARRAY_UNPACK_F64()and cast toVECTOR(1024)
Why this is simpler than external embedding APIs
With traditional approaches, you'd need to: extract text from the database → call an external API (OpenAI, Cohere, etc.) → manage rate limits and retries → write embeddings back. With EMBED_TEXT(), it's a single SQL statement.
Why parallel batch processing?
We use 50 worker threads, each processing 10 rows at a time. This saturates the embedding throughput capacity. The INSERT ... ON DUPLICATE KEY UPDATE pattern lets us safely upsert, if a row already has an embedding, it gets overwritten (enabling retries on failure).
In [6]:
1if needs_embedding:2 embedding_total_start = time.time()3 rows_to_embed = SAMPLE_SIZE - embedded_rows4 print(f"\n🚀 Generating {rows_to_embed:,} embeddings in parallel...")5 print(f" Using token-truncated text (max {MAX_EMBEDDING_TOKENS} tokens) for optimal embedding quality")6 7 # Get text content size statistics for token throughput calculation8 content_stats_conn = s2.connect()9 try:10 with content_stats_conn.cursor() as cur:11 cur.execute(f"USE {DATABASE_NAME}")12 cur.execute(f"""13 SELECT14 SUM(LENGTH(review_text_for_embedding)) as total_chars,15 AVG(LENGTH(review_text_for_embedding)) as avg_chars,16 MIN(LENGTH(review_text_for_embedding)) as min_chars,17 MAX(LENGTH(review_text_for_embedding)) as max_chars18 FROM {TABLE_NAME}19 WHERE review_embedding IS NULL20 """)21 stats = cur.fetchone()22 total_content_chars = float(stats[0] or 0)23 avg_content_chars = float(stats[1] or 0)24 min_content_chars = float(stats[2] or 0)25 max_content_chars = float(stats[3] or 0)26 # Rough estimate: 1 token ≈ 4 characters for English text27 estimated_total_tokens = total_content_chars / APPROX_CHARS_PER_TOKEN28 finally:29 content_stats_conn.close()30 31 print(f" 📏 Content size statistics (truncated text):")32 print(f" Total content: {total_content_chars:,} chars (~{estimated_total_tokens:,.0f} estimated tokens)")33 print(f" Avg per row: {avg_content_chars:,.0f} chars (~{avg_content_chars/APPROX_CHARS_PER_TOKEN:,.0f} tokens)")34 print(f" Range: {min_content_chars:,} - {max_content_chars:,} chars")35 36 INSERT_SQL = f"""37 INSERT INTO {TABLE_NAME} (id, review_embedding)38 SELECT39 id,40 JSON_ARRAY_UNPACK_F64(41 cluster.EMBED_TEXT(review_text_for_embedding, model => '{EMBEDDING_MODEL}', tokens_threshold => 2000000)42 ) :> VECTOR({VECTOR_DIMENSIONS})43 FROM (44 WITH numbered AS (45 SELECT ROW_NUMBER() OVER (ORDER BY id) AS rn, id, review_text_for_embedding46 FROM {TABLE_NAME}47 WHERE review_embedding IS NULL48 )49 SELECT * FROM numbered WHERE rn BETWEEN %s AND %s50 )51 ON DUPLICATE KEY UPDATE review_embedding = VALUES(review_embedding);52 """53 54 from queue import Queue55 import threading56 57 # OPTIMIZED: Small batches + Many workers = True parallelism58 batch_size = 10 # Small batches for maximum parallelism59 max_workers = 50 # High worker count to saturate cluster60 61 # Create connection pool62 connection_pool = Queue(maxsize=max_workers)63 64 pool_start = time.time()65 for _ in range(max_workers):66 conn = s2.connect()67 with conn.cursor() as cur:68 cur.execute(f"USE {DATABASE_NAME}")69 connection_pool.put(conn)70 pool_time = time.time() - pool_start71 72 def run_insert_range(rn_start: int, rn_end: int):73 conn = connection_pool.get()74 try:75 with conn.cursor() as cursor:76 cursor.execute(INSERT_SQL, (rn_start, rn_end))77 conn.commit()78 return (rn_start, rn_end, None)79 except Exception as e:80 return (rn_start, rn_end, str(e)[:100])81 finally:82 connection_pool.put(conn)83 84 # Create batches85 ranges = [(i, min(i + batch_size - 1, rows_to_embed))86 for i in range(1, rows_to_embed + 1, batch_size)]87 88 embed_start = time.time()89 errors = []90 91 print(f" Configuration: {batch_size} rows/batch × {max_workers} workers = {len(ranges)} batches")92 93 # Progress bar94 with tqdm(total=len(ranges), desc=" Processing", unit="batch") as pbar:95 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:96 futures = {executor.submit(run_insert_range, start, end): (start, end)97 for (start, end) in ranges}98 99 for future in concurrent.futures.as_completed(futures):100 start, end, err = future.result()101 if err:102 errors.append((start, end, err))103 104 elapsed = time.time() - embed_start105 embeddings_done = (pbar.n + 1) * batch_size106 rate = embeddings_done / elapsed if elapsed > 0 else 0107 108 pbar.set_postfix({'rate': f'{rate:.0f} emb/s'})109 pbar.update(1)110 111 embed_time = time.time() - embed_start112 113 # Cleanup114 while not connection_pool.empty():115 conn = connection_pool.get()116 conn.close()117 118 # Summary119 print()120 if errors:121 print(f" ⚠️ {len(errors)}/{len(ranges)} batches failed ({len(errors)/len(ranges)*100:.1f}%)")122 123 actual_throughput = rows_to_embed / embed_time124 chars_per_sec = total_content_chars / embed_time if embed_time > 0 else 0125 tokens_per_sec = estimated_total_tokens / embed_time if embed_time > 0 else 0126 print(f" ✓ Generated {rows_to_embed:,} embeddings in {embed_time:.1f}s")127 print(f" Throughput: {actual_throughput:.0f} embeddings/sec")128 print(f" Avg latency: {embed_time/rows_to_embed*1000:.2f} ms/embedding")129 print(f" Content processed: {total_content_chars:,} chars (~{estimated_total_tokens:,.0f} tokens)")130 print(f" Token throughput: {tokens_per_sec:,.0f} tokens/sec ({chars_per_sec:,.0f} chars/sec)")131 132else:133 print(f"\n✓ Already have {embedded_rows:,} embeddings - skipping")
Step 7: Create Vector Index (HNSW)
A vector index is what makes similarity search fast. Without it, every query must compute DOT_PRODUCT() against all rows (a full table scan). With an HNSW index, SingleStore uses Approximate Nearest Neighbor (ANN) search that runs in sub-linear time.
What is HNSW?
Hierarchical Navigable Small World (HNSW) is a graph-based ANN algorithm. It builds a multi-layer graph where:
Each vector is a node connected to its nearest neighbors
Higher layers have fewer nodes and longer-range connections (for fast coarse search)
Lower layers have more nodes and shorter-range connections (for fine-grained refinement)
Search starts at the top layer and "navigates" down, like zooming in on a map.
HNSW_FLAT vs other types
HNSW_FLAT stores the original full-precision vectors (no compression). This gives exact dot-product scores while still getting the speed benefits of the HNSW graph navigation. Use HNSW_PQ (product quantization) if you need to save memory at the cost of some accuracy.
Important: Query pattern for index usage
For SingleStore to use the vector index, your query must follow this pattern:
SELECT ..., DOT_PRODUCT(vector_col, query_vec) AS score
FROM table
ORDER BY score DESC
LIMIT k
Adding WHERE filters on non-vector columns (e.g., score >= 3) may cause the optimizer to bypass the vector index and fall back to a full scan. See the search endpoint below for how to handle filtered vector search correctly.
In [7]:
1if needs_index:2 index_start = time.time()3 print("\n🔍 Creating vector index...")4 5 conn = s2.connect()6 try:7 with conn.cursor() as cur:8 cur.execute(f"USE {DATABASE_NAME}")9 cur.execute(f"""10 ALTER TABLE {TABLE_NAME}11 ADD VECTOR INDEX vindex_review_embedding(review_embedding)12 INDEX_OPTIONS '{{"index_type":"HNSW_FLAT"}}'13 """)14 conn.commit()15 finally:16 conn.close()17 18 index_time = time.time() - index_start19 print(f"✓ Vector index created in {index_time:.2f}s")20else:21 print("\n✓ Vector index already exists - skipping")
Step 8: Verify Vector Index Usage
Use EXPLAIN to confirm that the query optimizer is actually using the HNSW vector index. Look for VECTOR_KNN_SCAN or ANN in the query plan — this proves the index is being used for approximate nearest-neighbor search rather than a full table scan.
If you see a regular TABLE SCAN instead, the query pattern may not match what the optimizer expects (see the note in Step 7 above).
In [8]:
1# Verify the vector index is being used by the query optimizer2print("🔍 Verifying vector index usage with EXPLAIN...\n")3 4conn = s2.connect()5try:6 with conn.cursor() as cur:7 cur.execute(f"USE {DATABASE_NAME}")8 9 # Generate a sample query embedding for the EXPLAIN10 cur.execute(f"""11 SELECT JSON_ARRAY_UNPACK_F64(12 cluster.EMBED_TEXT('sample query text', model => '{EMBEDDING_MODEL}')13 ) :> VECTOR({VECTOR_DIMENSIONS}) as query_embedding14 """)15 sample_embedding = cur.fetchone()[0]16 17 # Run EXPLAIN on the vector search query18 cur.execute(f"""19 EXPLAIN SELECT20 id, score,21 LEFT(full_review_text, 80) as preview,22 DOT_PRODUCT(review_embedding, %s) as similarity23 FROM {TABLE_NAME}24 ORDER BY similarity DESC25 LIMIT 526 """, (sample_embedding,))27 28 explain_results = cur.fetchall()29 30 print("EXPLAIN output:")31 print("-" * 80)32 for row in explain_results:33 print(row)34 print("-" * 80)35 36 # Check if vector index is being used37 explain_text = str(explain_results).lower()38 if 'vector' in explain_text or 'ann' in explain_text or 'hnsw' in explain_text:39 print("\n✅ Vector index IS being used (ANN search)")40 else:41 print("\n⚠️ Vector index may NOT be used — check the EXPLAIN output above")42 print(" Ensure the query follows: ORDER BY DOT_PRODUCT(...) DESC LIMIT k")43finally:44 conn.close()
Step 9: Test Vector Search
Run a simple semantic search to verify everything works end-to-end. The flow is:
Embed the query : call
EMBED_TEXT()on the search text to get a query vectorCompute similarity : calculate
DOT_PRODUCT(review_embedding, query_vec)for each rowRank and return :
ORDER BY similarity DESC LIMIT kreturns the top matches
Because we created an HNSW index, Step 2-3 use approximate nearest-neighbor search instead of scanning every row. For 10,000 rows the difference is small, but for millions of rows this is the difference between milliseconds and seconds.
Note: Set
TEST_VECTOR_SEARCH = Truein the configuration cell to enable this step.
In [9]:
1if TEST_VECTOR_SEARCH:2 test_start = time.time()3 print("\n" + "=" * 70)4 print("TESTING VECTOR SEARCH")5 print("=" * 70)6 7 test_query = "delicious and healthy food"8 9 conn = s2.connect()10 try:11 with conn.cursor() as cur:12 cur.execute(f"USE {DATABASE_NAME}")13 14 # Embed query using the same model used for stored embeddings15 cur.execute(f"""16 SELECT JSON_ARRAY_UNPACK_F64(17 cluster.EMBED_TEXT(%s, model => '{EMBEDDING_MODEL}')18 ) :> VECTOR({VECTOR_DIMENSIONS}) as query_embedding19 """, (test_query,))20 query_embedding = cur.fetchone()[0]21 22 # Vector similarity search — this pattern ensures HNSW index usage:23 # ORDER BY DOT_PRODUCT(...) DESC LIMIT k (no WHERE filters on non-vector columns)24 search_start = time.time()25 cur.execute(f"""26 SELECT27 id, score,28 LEFT(full_review_text, 80) as preview,29 DOT_PRODUCT(review_embedding, %s) as similarity30 FROM {TABLE_NAME}31 ORDER BY similarity DESC32 LIMIT 533 """, (query_embedding,))34 35 results = cur.fetchall()36 search_time = time.time() - search_start37 finally:38 conn.close()39 40 print(f"\nQuery: '{test_query}'")41 print(f"Search time: {search_time*1000:.2f} ms\n")42 43 for i, row in enumerate(results, 1):44 print(f"{i}. [Score: {row[3]:.4f}] [{row[1]}★] {row[2]}...")45 46 test_time = time.time() - test_start47 print("\n" + "=" * 70)48 print(f"✅ Vector search working! (total test time: {test_time:.2f}s)")49else:50 print("✅ Skipped Vector search check")
Step 10: Pipeline Summary
Review the full execution timing breakdown. This helps you understand where time is spent and optimize for your use case:
Cold start (install + import) is one-time overhead
Embedding generation is typically the bottleneck, scale workers and batch size to match your cluster capacity
Index creation is fast and only happens once
In [10]:
1# NOTEBOOK EXECUTION SUMMARY2 3total_time = time.time() - start_time4 5print("\n" + "=" * 80)6print("📊 NOTEBOOK EXECUTION SUMMARY")7print("=" * 80)8 9# Configuration summary10print(f"\n📋 Configuration:")11print(f" Dataset size: {SAMPLE_SIZE:,} rows")12print(f" Embedding model: {EMBEDDING_MODEL}")13print(f" Vector dimensions: {VECTOR_DIMENSIONS}")14print(f" Max embedding tokens:{MAX_EMBEDDING_TOKENS} (~{MAX_EMBEDDING_TOKENS * APPROX_CHARS_PER_TOKEN} chars)")15print(f" Database: {DATABASE_NAME}")16print(f" Table: {TABLE_NAME}")17 18# Timing breakdown19print(f"\n⏱️ Execution Timing:")20print(f" Setup:")21print(f" - Dependency install: {install_time:.1f}s")22print(f" - Library imports: {import_time:.2f}s")23 24# State check time (if available)25if 'state_check_time' in globals():26 print(f" State check: {state_check_time*1000:.2f} ms")27 28# Database/Table setup29if 'setup_time' in globals():30 print(f" Database/Table setup: {setup_time:.2f}s")31else:32 print(f" Database/Table setup: skipped (already exists)")33 34# Data ingestion35if 'data_time' in globals():36 print(f" Data ingestion: {data_time:.2f}s")37 if 'download_time' in globals():38 print(f" - Download: {download_time:.2f}s")39 if 'prep_time' in globals():40 print(f" - Prepare: {prep_time:.2f}s")41 if 'insert_time' in globals():42 rows_rate = rows_needed/insert_time if 'rows_needed' in globals() and insert_time > 0 else 043 print(f" - Insert: {insert_time:.2f}s ({rows_rate:.0f} rows/s)")44 if 'num_truncated' in globals():45 print(f" - Reviews truncated: {num_truncated:,}/{SAMPLE_SIZE:,} ({num_truncated/SAMPLE_SIZE*100:.1f}%)")46else:47 print(f" Data ingestion: skipped ({total_rows:,} rows exist)")48 49# Embedding generation50if 'embed_time' in globals():51 emb_rate = rows_to_embed/embed_time if 'rows_to_embed' in globals() and embed_time > 0 else 052 print(f" Embedding generation: {embed_time:.2f}s ({emb_rate:.0f} emb/s)")53 if 'total_content_chars' in globals() and total_content_chars > 0:54 print(f" - Content size: {total_content_chars:,} chars (~{estimated_total_tokens:,.0f} tokens)")55 print(f" - Avg text/row: {avg_content_chars:,.0f} chars (~{avg_content_chars/APPROX_CHARS_PER_TOKEN:,.0f} tokens)")56 print(f" - Text range: {min_content_chars:,} - {max_content_chars:,} chars/row")57 print(f" - Token throughput: {estimated_total_tokens/embed_time:,.0f} tokens/sec")58else:59 print(f" Embedding generation: skipped ({embedded_rows:,} embeddings exist)")60 61# Index creation62if 'index_time' in globals():63 print(f" Index creation: {index_time:.2f}s")64else:65 print(f" Index creation: skipped (index exists)")66 67# Vector search test68if 'test_time' in globals():69 print(f" Vector search test: {test_time:.2f}s")70else:71 print(f" Vector search test: skipped")72 73# Total time summary74print(f"\n{'─' * 80}")75print(f"🎯 Total Pipeline Time: {total_time:.2f}s ({total_time/60:.2f} minutes)")76cold_start_overhead = install_time + import_time77print(f" Cold Start Overhead: {cold_start_overhead:.2f}s ({cold_start_overhead/total_time*100:.1f}%)")78print(f"{'─' * 80}")79 80# Performance metrics81print(f"\n📈 Performance Metrics (if applicable):")82 83if 'data_time' in globals() and 'embed_time' in globals():84 core_time = data_time + embed_time85 print(f" End-to-end throughput: {SAMPLE_SIZE/core_time:.0f} rows/sec (excluding setup)")86elif 'data_time' in globals():87 print(f" Data processing rate: {rows_needed/data_time:.0f} rows/sec")88elif 'embed_time' in globals():89 print(f" Embedding rate: {rows_to_embed/embed_time:.0f} embeddings/sec")90 91if 'insert_time' in globals() and 'rows_needed' in globals():92 print(f" Data ingest rate: {rows_needed/insert_time:.0f} rows/sec")93 94if 'embed_time' in globals() and 'rows_to_embed' in globals():95 print(f" Embedding throughput: {rows_to_embed/embed_time:.0f} embeddings/sec")96 if 'total_content_chars' in globals() and total_content_chars > 0:97 print(f" Token throughput: {estimated_total_tokens/embed_time:,.0f} tokens/sec")98 print(f" Chars throughput: {total_content_chars/embed_time:,.0f} chars/sec")
Step 11: Define Cloud Function API
Now we wrap the vector search in a FastAPI application that can be deployed as a SingleStore Cloud Function. This gives you a production-ready REST API endpoint.
Endpoints
| Method | Path | Description |
|--------|------|-------------|
| GET | / | API info and available endpoints |
| GET | /health | Health check with database statistics |
| POST | /search | Semantic vector search |
This API demonstrates EMBED_TEXT() + DOT_PRODUCT() vector search as a building block. You can extend it with application-specific filters, pagination, or business logic for your use case.
In [11]:
1# CLOUD FUNCTION API DEFINITION2print("\n" + "=" * 80)3print("🚀 DEFINING CLOUD FUNCTION API")4print("=" * 80)5 6from fastapi import FastAPI, HTTPException7from pydantic import BaseModel, Field8from typing import Optional, List9import singlestoredb as s210import singlestoredb.apps as apps11 12# Request/Response Models13class SearchRequest(BaseModel):14 query: str = Field(..., description="Search query text", min_length=1, max_length=500)15 limit: Optional[int] = Field(10, description="Number of results to return", ge=1, le=100)16 17class SearchResult(BaseModel):18 id: int19 product_id: str20 review_text: str21 star_rating: int22 similarity_score: float23 24class SearchResponse(BaseModel):25 status: str26 query: str27 results_count: int28 results: List[SearchResult]29 latency_ms: float30 31class HealthResponse(BaseModel):32 status: str33 database: str34 total_reviews: int35 embedded_reviews: int36 index_status: str37 38# Create FastAPI Application39app = FastAPI(40 title="Product Review Vector Search API",41 version="1.0.0",42 description="Semantic search for product reviews using SingleStore vector embeddings"43)44 45@app.get("/")46async def root():47 """API information endpoint"""48 return {49 "service": "Product Review Vector Search API",50 "version": "1.0.0",51 "model": EMBEDDING_MODEL,52 "dimensions": VECTOR_DIMENSIONS,53 "database": DATABASE_NAME,54 "endpoints": {55 "GET /": "API information",56 "GET /health": "Health check and database stats",57 "POST /search": "Semantic vector search"58 }59 }60 61@app.get("/health", response_model=HealthResponse)62async def health():63 """Health check endpoint with database statistics"""64 try:65 with s2.connect() as conn:66 with conn.cursor() as cur:67 cur.execute(f"USE {DATABASE_NAME}")68 69 cur.execute(f"SELECT COUNT(*) FROM {TABLE_NAME}")70 total = cur.fetchone()[0]71 72 cur.execute(f"SELECT COUNT(*) FROM {TABLE_NAME} WHERE review_embedding IS NOT NULL")73 embedded = cur.fetchone()[0]74 75 cur.execute(f"""76 SELECT COUNT(*) FROM information_schema.statistics77 WHERE table_schema = '{DATABASE_NAME}'78 AND table_name = '{TABLE_NAME}'79 AND index_name = 'vindex_review_embedding'80 """)81 index_exists = cur.fetchone()[0] > 082 83 return HealthResponse(84 status="healthy",85 database=DATABASE_NAME,86 total_reviews=total,87 embedded_reviews=embedded,88 index_status="active" if index_exists else "missing"89 )90 except Exception as e:91 raise HTTPException(status_code=503, detail=f"Service unavailable: {str(e)}")92 93@app.post("/search", response_model=SearchResponse)94async def search(request: SearchRequest):95 """96 Semantic search endpoint for product reviews.97 98 Embeds the query text using EMBED_TEXT() with the same model used for99 stored embeddings, then performs DOT_PRODUCT vector similarity search100 using the HNSW index to return the top-k most relevant reviews.101 """102 search_start = time.time()103 104 try:105 with s2.connect() as conn:106 with conn.cursor() as cur:107 cur.execute(f"USE {DATABASE_NAME}")108 109 # Embed the query using the same model as stored embeddings110 cur.execute(f"""111 SELECT JSON_ARRAY_UNPACK_F64(112 cluster.EMBED_TEXT(%s, model => '{EMBEDDING_MODEL}')113 ) :> VECTOR({VECTOR_DIMENSIONS})114 """, (request.query,))115 query_embedding = cur.fetchone()[0]116 117 # Vector similarity search using HNSW index118 cur.execute(f"""119 SELECT120 id,121 product_id,122 full_review_text,123 score,124 DOT_PRODUCT(review_embedding, %s) as similarity125 FROM {TABLE_NAME}126 ORDER BY similarity DESC127 LIMIT %s128 """, (query_embedding, request.limit))129 130 rows = cur.fetchall()131 132 results = [133 SearchResult(134 id=row[0],135 product_id=row[1],136 review_text=row[2],137 star_rating=row[3],138 similarity_score=float(row[4])139 ) for row in rows140 ]141 142 latency = (time.time() - search_start) * 1000143 144 return SearchResponse(145 status="success",146 query=request.query,147 results_count=len(results),148 results=results,149 latency_ms=round(latency, 2)150 )151 152 except Exception as e:153 raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")154 155print("\n✓ FastAPI application defined")156print("\nEndpoints configured:")157print(" GET / - API information")158print(" GET /health - Health check with database stats")159print(" POST /search - Semantic vector search")
Step 12: Deploy as Cloud Function
singlestoredb.apps.run_function_app() deploys the FastAPI app as a SingleStore Cloud Function. This:
Packages the FastAPI app into a containerized microservice
Hosts it on SingleStore's infrastructure with automatic HTTPS
Provides a URL you can call from any HTTP client
How to Call the Deployed Endpoint
After deployment, you'll need two things from the SingleStore Portal UI:
Endpoint URL : Find it in the Cloud Functions section of the portal after deployment
API Key : Generate one in the portal under Cloud Functions > API Keys. This key is required for all requests.
All requests must include a Bearer authorization header with your API key:
# Health check
curl https://<your-endpoint-url>/health \
-H "Authorization: Bearer <your-api-key>"
# Semantic search
curl -X POST https://<your-endpoint-url>/search \
-H "Authorization: Bearer <your-api-key>" \
-H "Content-Type: application/json" \
-d '{"query": "delicious healthy snack", "limit": 5}'
Note: Replace
<your-endpoint-url>and<your-api-key>with the actual values from the SingleStore Portal.
In [12]:
1# ========================================2# DEPLOY CLOUD FUNCTION3# ========================================4 5print("\n" + "=" * 80)6print("🚀 DEPLOYING CLOUD FUNCTION")7print("=" * 80)8 9# Deploy the function app10connection_info = await apps.run_function_app(app)

Details
About this Template
Learn how to use EMBED_TEXT AI Function in Vector Search and deploy it as a SingleStore CloudFunction
This Notebook can be run in Standard and Enterprise deployments.
Tags
See Notebook in action
Launch this notebook in SingleStore and start executing queries instantly.
License
This Notebook has been released under the Apache 2.0 open source license.