In a recent hackathon, our team wanted to prototype an intelligent, domain-specific AI chatbot that could leverage both web search capabilities and knowledge from SingleStore's documentation.

Unlike many off-the-shelf solutions that rely heavily on third-party frameworks, we decided to build our agent from the ground up — giving us complete control over its architecture and behavior while showcasing the power of SingleStore's platform features.
The resulting solution is a simple chatbot that can retrieve information from the web, search through SingleStore's documentation using vector and full-text search and maintain contextual conversations with users — all while being powered by a maintainable data pipeline that keeps its knowledge base up-to-date.
This blog refers to the backend implementation. The corresponding code accompanying this blog can be found on Github. This is by no means a production-ready agent, but provides a solid foundation to build one. I refer to several features that were in active development/private preview at the time of writing this blog — we expect to ship these features for public preview in the coming weeks.
The challenge
Creating an effective, domain-specific agent chatbot presents several significant challenges:
- Knowledge acquisition. How do we efficiently scrape, process and store documentation content?
- Information retrieval. How do we enable the agent to search through that content effectively?
- Reasoning and response generation. How do we enable the agent to formulate coherent, helpful responses?
- Maintainability. How do we ensure the system stays up-to-date with the latest documentation?
- Performance and scalability. How do we make the system responsive and capable of handling multiple users?
Tools needed
One of the goals was to build things from the ground up to really understand how all individual components fit together. Rather than using off-the-shelf agent frameworks that might restrict our flexibility, we designed our solution from first principles, leveraging SingleStore's capabilities wherever possible — but leaving the implementation modular enough that any single module can be replaced by another provider.
The architecture consists of three primary components:
1. Data pipeline
Our data pipeline manages the flow of information from source documentation to searchable knowledge base:
- Scraping and parsing. Automated extraction of content from SingleStore documentation sitemap
- Cleaning and processing. Transformation of HTML content to structured markdown
- Chunking and embedding. Breaking content into semantically meaningful chunks and generating vector embeddings
- Storage. Organizing the processed data in SingleStore tables with appropriate indexes
2. Search infrastructure
We implemented a hybrid search approach combining:
- Vector search. Using embeddings for semantic similarity matching
- Full-text search. Leveraging SingleStore's built-in text search capabilities
- Cloud functions. Exposing search functionality as cloud functions for the agent to call
In part II of this blog, we'll cover more on agent workflow
Scheduled job for scraping and parsing documentation
The challenge of fresh documentation
One of the most critical aspects of any documentation-based chatbot is ensuring it has access to the most up-to-date information. Documentation evolves constantly — new features are added, bugs are fixed and explanations are clarified. A chatbot providing outdated information frustrates users and potentially leads them down incorrect paths.
For our SingleStore chatbot, we needed a reliable, automated system to:
- Discover all documentation pages
- Identify which pages were new or had been updated
- Scrape the content efficiently
- Transform HTML content into a more processable format
- Store everything in a structured database
This section details how we implemented this system as a scheduled job in SingleStore, ensuring our chatbot always has the latest information at its disposal.
Designing the scraper architecture
When approaching this problem, we had several design considerations:
- Efficiency. The scraper should minimize unnecessary downloads by only fetching new or updated content
- Robustness. The system should handle network failures, timeouts and parsing errors gracefully
- Scalability. The solution should efficiently process hundreds or thousands of pages
- Maintainability. The code should be structured to be easily understood and modified
Our solution leverages Python's asynchronous capabilities to efficiently scrape multiple pages concurrently, while a SingleStore database stores both the raw content and processed markdown for each page.
Implementation details
Setting up the database schema
The first step was creating an appropriate table structure in SingleStore to store the documentation content:
1
CREATE TABLE IF NOT EXISTS s2docs (2
id INT AUTO_INCREMENT PRIMARY KEY,3
url TEXT,4
content LONGTEXT,5
md_content LONGTEXT NULL,6
scraped_at DATETIME,7
lastmod DATETIME8
);
This schema includes:
- A unique identifier for each page
- The URL of the page for reference
- The original HTML content
- The processed markdown content (for easier parsing and searching)
- A timestamp for when the page was scraped
- The last modification date from the sitemap (for determining updates)
Sitemap parsing logic
Rather than manually maintaining a list of URLs to scrape, we leverage the sitemap.xml file that most documentation sites expose. This follows a standard format and includes valuable metadata, like when pages were last modified:
1
def get_sitemap_urls(sitemap_url):2
"""3
Retrieve and parse the sitemap XML to extract all URLs listed with their4
lastmod dates.5
Returns a list of tuples (url, lastmod) where lastmod is a datetime object6
or None.7
"""8
# Implementation details omitted for brevity9
# The function handles different sitemap formats and parses lastmod dates10
# Returns a list of (url, lastmod) tuples
By tracking the lastmod
values from the sitemap, we can determine which pages have been updated since our last scrape, avoiding unnecessary processing.
Efficient concurrent scraping
To scrape hundreds of pages in a reasonable timeframe, we implemented an asynchronous scraping system using aiohttp
and asyncio
:
1
async def async_run_scraper(concurrency=10,2
SPIDER_SITEMAP_URL="https://docs.singlestore.com/sitemap-0.xml"):3
"""4
Main async function that:5
1. Retrieves the sitemap URLs and batch-checks already scraped URLs.6
2. Scrapes pending pages concurrently using aiohttp, limited by a7
semaphore for concurrency.8
3. Only scrapes pages that are new or have been updated since last9
scraped.10
4. Prints progress updates every 1% of pending pages processed.11
"""12
# Implementation details omitted for brevity
The concurrency parameter allows us to control how many pages we scrape simultaneously, balancing speed against system resources and politeness to the web server.
HTML to markdown conversion
For easier processing in later stages, we convert the HTML content to markdown format:
1
def convert_html_to_markdown(html_content):2
"""3
Convert HTML content to markdown format.4
"""5
try:6
# Configure html2text7
h = html2text.HTML2Text()8
h.ignore_links = False9
h.ignore_images = False10
h.ignore_tables = False11
h.body_width = 0 # No wrapping12
h.unicode_snob = True # Use Unicode instead of ASCII13
h.single_line_break = True # Use single line breaks14
15
# Convert HTML to markdown16
markdown_content = h.handle(html_content)17
return markdown_content18
except Exception as e:19
# Error handling omitted for brevity20
return None
This conversion simplifies the subsequent chunking and embedding processes by removing HTML complexity while preserving the document's structure and information.
Executing as a scheduled job
One of SingleStore's powerful features is the ability to run scheduled jobs, allowing us to periodically update our documentation database without manual intervention. We packaged our scraper into a notebook that can be run as a scheduled job:
1
# prepare target tables if they don't exist2
create_table()3
# Run scraper (e.g. via cron), using asynchronous scraping.4
run_scraper()5
print("Scraping complete.")
By scheduling this job to run daily (or at any appropriate interval), we ensure our chatbot always has access to the most up-to-date documentation.
The scraping and parsing component forms the foundation of our chatbot's knowledge base. By implementing it as a scheduled job in SingleStore, we've created a reliable system that ensures our chatbot always has the freshest information available.
Scheduled jobs for chunking and embedding documentation
The need for semantic understanding
Once we've collected and parsed the documentation content, the next challenge is preparing it for efficient semantic search. Raw markdown documents — while more structured than HTML — are still not optimized for retrieval by an AI agent. To enable our chatbot to find the most relevant information, we need to:
- Clean and normalize the text content
- Break down documents into semantically coherent chunks
- Generate vector embeddings that capture the meaning of each chunk
- Store these chunks and embeddings in a way that enables fast retrieval
This section explores how we implemented this process as scheduled jobs in SingleStore, leveraging GPU acceleration for embedding generation.
Data cleaning
Before chunking, we need to ensure our text data is properly cleaned and normalized. The main issue we observed was excessive whitespace in the markdown content after HTML parsing.
Cleaning implementation
We implemented a simple but effective cleaning function using regular expressions:
1
import re2
3
# Define regex pattern for cleaning text: collapse runs of spaces, tabs, and4
newlines5
whitespace_pattern = re.compile(r'[ \t\r\n]+')6
7
def clean_text(text):8
"""Collapse all runs of whitespace (spaces, tabs, newlines) and trim."""9
return whitespace_pattern.sub(' ', text).strip()
This function collapses all consecutive whitespace characters (spaces, tabs, newlines) into a single space and trims the result, which greatly improves the text quality for downstream processing.
Batch processing for efficiency
To handle large volumes of documentation efficiently, we process rows in batches:
1
def process_rows():2
# code omitted for brevity
This batch approach allows us to process large datasets efficiently while maintaining progress visibility, and enabling resumption if interrupted.
Chunking and embedding generation
After cleaning, we need to divide the documents into semantically meaningful chunks and generate vector embeddings for them. This process is handled by our GPU-accelerated embedding chunker.
Chunking strategy
I initially spent hours trying to implement one myself, but found that LangChain's RecursiveCharacterTextSplitter
handles almost every edge case I could think of for our chunking and went with that. This chunker intelligently splits text by trying to keep semantically related content together.
1
from langchain.text_splitter import RecursiveCharacterTextSplitter2
3
# Create a text splitter with appropriate parameters4
text_splitter = RecursiveCharacterTextSplitter(5
chunk_size=512, # Target chunk size in characters6
chunk_overlap=50 # Overlap between chunks to maintain context7
)8
9
# Split a document into chunks10
chunks = text_splitter.split_text(cleaned_text)
GPU-accelerated embedding generation
For embedding generation, we can leverage GPU acceleration (by running this code in a GPU- enabled environment like Aura GPU compute for notebooks/scheduled jobs ) when available to process chunks in batches efficiently:
1
def batch_encode_text(model, texts, batch_size=32, device='cuda' if2
torch.cuda.is_available() else 'cpu'):3
"""Encode a list of texts in batches, using GPU if available."""4
if not texts:5
return np.array([])6
7
model = model.to(device)8
all_embeddings = []9
for i in range(0, len(texts), batch_size):10
batch = texts[i:i+batch_size]11
embeddings = model.encode(batch, convert_to_tensor=True)12
if torch.is_tensor(embeddings):13
embeddings = embeddings.cpu().numpy()14
all_embeddings.append(embeddings)15
if all_embeddings:16
return np.vstack(all_embeddings)17
return np.array([])
This function:
- Automatically uses GPU if available, significantly speeding up embedding generation
- Processes chunks in batches to optimize throughput and manage memory efficiently
- Converts tensor outputs to numpy arrays for compatibility with storage requirements
Database schema for vector storage
We store the chunks and their embeddings in a table optimized for vector search:
1
CREATE TABLE IF NOT EXISTS s2docs_chunks_{model_name} (2
doc_id BIGINT NOT NULL,3
source_url TEXT,4
chunk_index INT NOT NULL,5
chunk_text LONGTEXT,6
embedding JSON DEFAULT NULL,7
vector_embedding VECTOR(384) DEFAULT NULL,8
SORT KEY (doc_id, chunk_index)9
)
This schema includes:
- A reference to the original document (doc_id)
- The source URL for traceability
- Position information (chunk_index) for maintaining document order
- The text content of the chunk
- The vector embedding in both JSON and VECTOR formats
- Appropriate keys for efficient querying
Creating vector and full-text indexes
To enable efficient search, we create both vector and full-text indexes:
1
def create_indexes(cursor, table_name):2
"""Create vector and fulltext indexes on a columnstore table in3
SingleStore."""4
# Create the vector index using ANN5
cursor.execute(f"CREATE INDEX idx_{table_name}_vector ON6
{table_name}(vector_embedding) USING ANN;")7
8
# Create the fulltext index9
cursor.execute(f"ALTER TABLE {table_name} ADD FULLTEXT INDEX10
idx_{table_name}_text (chunk_text);")
These indexes enable:
- Fast approximate nearest neighbor (ANN) searches for similar embeddings
- Efficient full-text search capabilities
- The foundation for hybrid search combining both approaches
Optimizing for efficiency and resilience
This implementation includes several optimizations to ensure the process is efficient and resilient.
Incremental processing
We track which documents have been processed to avoid redundant work if the current execution gets interrupted, and reran this code:
1
def get_unprocessed_docs_count(cursor, table_name, last_processed_id):2
"""Return the count of documents that need processing."""3
count_query = f"""4
SELECT COUNT(*) FROM s2docs5
WHERE md_content_cleaned IS NOT NULL6
AND md_content_cleaned != ''7
AND id > {last_processed_id}8
AND id NOT IN (SELECT doc_id FROM {table_name})9
"""10
cursor.execute(count_query)11
return cursor.fetchone()[0]
This ensures we only process documents that are new or have been updated since the last run.
Checkpoint system
We implement a checkpoint system to enable resuming processing after interruptions:
1
def update_checkpoint(checkpoint_table, last_processed_id):2
"""Update the checkpoint table with the latest processed doc id."""3
# ...existing code...
This checkpoint system allows the process to recover after interruptions, picking up where it left off rather than starting over.
Parallel processing
To maximize throughput, especially on systems with multiple cores and GPU acceleration, we implement parallel processing:
1
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as2
executor:3
futures = [executor.submit(process_document_batch, batch, text_splitter,4
embed_model)5
for batch in doc_batches]6
for future in concurrent.futures.as_completed(futures):7
chunks_data = future.result()8
insert_chunks(table_name, chunks_data)
This parallel approach significantly reduces processing time, especially for large document collections.
Executing as scheduled jobs
Both the cleaning and chunking/embedding processes are implemented as notebooks that can be executed as scheduled jobs in SingleStore:
1
# For data cleaning2
process_rows()3
4
# For chunking and embedding5
process_documents(batch_size=200, max_workers=16)
By scheduling these jobs to run after the scraping job completes, we ensure our vector database stays current with minimal manual intervention.
Cloud functions for hybrid search
Beyond basic vector search
While vector embeddings provide excellent semantic understanding, they're not always sufficient on their own. For documentation search, we wanted to combine the strengths of multiple search approaches:
- Vector search. Finding semantically similar content, even when keywords don't match
- Full-text search. Pinpointing exact keywords and phrases
- Combined approach. Leveraging both methods with configurable weights
This section explores how we implemented a hybrid search approach as cloud functions in SingleStore, making them accessible to our agent through simple API calls.
The power of hybrid search
Each search approach has distinct advantages:
- Vector search excels at understanding concepts and finding related information, even when terminology differs
- Full-text search excels at finding exact matches and respecting keyword importance
- Combined approaches provide the best of both worlds, leveraging semantic understanding while respecting keyword relevance
Our hybrid search implementation balances these approaches using configurable weights, allowing us to tune the search behavior based on the query characteristics.
Implementation with FastAPI cloud functions
We implemented our search functionality as serverless cloud functions in SingleStore using FastAPI, which:
- Receive search queries from the agent
- Process the query and generate embeddings
- Execute hybrid search against our document chunks
- Format and return the most relevant results
This architecture provides low latency, high scalability and easy integration with our agent.
Setting up the environment
Our implementation begins with setting up the necessary dependencies:
1
import json2
import torch3
import singlestoredb as s24
import singlestoredb.apps as apps5
from pydantic import BaseModel6
from sentence_transformers import SentenceTransformer7
from fastapi import FastAPI, HTTPException8
from fastapi.concurrency import run_in_threadpool9
10
def connect_to_db(database_name='knowlagent'):11
"""Return a new SingleStore DB connection."""12
return s2.connect(database=database_name)
These imports set up our environment for:
- Database connectivity through SingleStore
- Embedding generation with SentenceTransformer
- API endpoints with FastAPI
- Concurrent processing with threadpool
Core hybrid search function
The heart of our implementation is the hybrid_search
function, which combines vector similarity and full-text search:
1
def hybrid_search(query_text, model_name='all-MiniLM-L6-v2', top_k=5,2
vector_weight=0.7, text_weight=0.3):3
"""4
Perform a hybrid search using both vector similarity and fulltext matches.5
If the fulltext index is not found, fall back to a vector-only search.6
This version computes the individual scores in a subquery, then combines7
them.8
"""9
# code omitted for brevity
- Connects to the database where our document chunks are stored
- Generates an embedding for the query using the SentenceTransformer model
- Checks for the presence of a FULLTEXT index to determine search capabilities
- Executes a hybrid query that:
- Computes vector similarity using DOT_PRODUCT
- Performs full-text search using MATCH AGAINST
- Combines scores with configurable weights
- Orders results by the combined score
- Falls back to vector-only search if the FULLTEXT index isn't available
- Returns the top results ordered by relevance
FastAPI application definition
With our core search function in place, we expose it through FastAPI endpoints:
1
def query_hybrid(query_text, model_name, top_k, vector_weight, text_weight):2
"""Wrapper to call hybrid_search with provided parameters."""3
return hybrid_search(query_text, model_name=model_name, top_k=top_k,4
vector_weight=vector_weight, text_weight=text_weight)5
6
# Define a Pydantic model for the search request.7
class SearchRequest(BaseModel):8
query_text: str9
top_k: int = 510
vector_weight: float = 0.711
text_weight: float = 0.312
model_name: str = "all-MiniLM-L6-v2"13
14
app = FastAPI()15
16
# Endpoint that accepts search parameters via the request body.17
@app.post("/v2/search")18
async def get_chunks(request: SearchRequest):19
# code omitted for brevity20
21
# Get search for chunks by query22
@app.get("/v1/search/{query_text}")23
async def get_chunks(query_text: str):24
# code omitted for brevity
This setup provides:
- Two API endpoints:
- A flexible
/v2/search
POST endpoint that accepts all search parameters - A simple
/v1/search/{query_text}
GET endpoint with default parameters
- A flexible
- Request validation through Pydantic models
- Non-blocking operation by running the database queries in a thread pool
- Proper error handling with descriptive HTTP exceptions
Deploying as a cloud function
Finally, we deploy our FastAPI app as a SingleStore cloud function:
1
connection_info = await apps.run_function_app(app)
This single line:
- Packages our FastAPI application as a cloud function
- Deploys it to the SingleStore environment
- Makes it available for API calls
Advantages of our hybrid search approach
Hybrid search approach offers several key advantages:
1. Weighted combination
By combining vector and full-text search with configurable weights, we can:
- Prioritize semantic similarity or exact matches based on the use case
- Fine-tune the search behavior for different types of queries
- Achieve better overall results than either approach alone
2. Fallback mechanism
The implementation includes an automatic fallback to vector-only search if the FULLTEXT index isn't available, ensuring:
- Robustness in different deployment scenarios
- Graceful degradation rather than complete failure
3. Efficient SQL
We use a single SQL query with a subquery to:
- Compute both similarity metrics in one database operation
- Combine scores efficiently
- Order and limit results in a single pass
The hybrid search cloud functions form the bridge between our knowledge base and our agent. By combining vector and full-text search approaches, we've created a flexible, powerful system that enables our chatbot to quickly retrieve relevant documentation.
In part two, we’ll walk through how to connect and use all this work by having our agent call this search endpoint for RAG.