Building an Agentic AI Chatbot in SingleStore Aura

Part 1: Setting Up the Tools

Clock Icon

11 min read

Pencil Icon

May 27, 2025

In a recent hackathon, our team wanted to prototype an intelligent, domain-specific AI chatbot that could leverage both web search capabilities and knowledge from SingleStore's documentation.

Building an Agentic AI Chatbot in SingleStore Aura

Unlike many off-the-shelf solutions that rely heavily on third-party frameworks, we decided to build our agent from the ground up — giving us complete control over its architecture and behavior while showcasing the power of SingleStore's platform features.

The resulting solution is a simple chatbot that can retrieve information from the web, search through SingleStore's documentation using vector and full-text search and maintain contextual conversations with users — all while being powered by a maintainable data pipeline that keeps its knowledge base up-to-date.

This blog refers to the backend implementation. The corresponding code accompanying this blog can be found on Github. This is by no means a production-ready agent, but provides a solid foundation to build one. I refer to several features that were in active development/private preview at the time of writing this blog — we expect to ship these features for public preview in the coming weeks.

the-challengeThe challenge

Creating an effective, domain-specific agent chatbot presents several significant challenges:

  1. Knowledge acquisition. How do we efficiently scrape, process and store documentation content?
  2. Information retrieval. How do we enable the agent to search through that content effectively?
  3. Reasoning and response generation. How do we enable the agent to formulate coherent, helpful responses?
  4. Maintainability. How do we ensure the system stays up-to-date with the latest documentation?
  5. Performance and scalability. How do we make the system responsive and capable of handling multiple users?

tools-neededTools needed

One of the goals was to build things from the ground up to really understand how all individual components fit together. Rather than using off-the-shelf agent frameworks that might restrict our flexibility, we designed our solution from first principles, leveraging SingleStore's capabilities wherever possible — but leaving the implementation modular enough that any single module can be replaced by another provider.

The architecture consists of three primary components:

1-data-pipeline1. Data pipeline

Our data pipeline manages the flow of information from source documentation to searchable knowledge base:

  • Scraping and parsing. Automated extraction of content from SingleStore documentation sitemap
  • Cleaning and processing. Transformation of HTML content to structured markdown
  • Chunking and embedding. Breaking content into semantically meaningful chunks and generating vector embeddings
  • Storage. Organizing the processed data in SingleStore tables with appropriate indexes

2-search-infrastructure2. Search infrastructure

We implemented a hybrid search approach combining:

  • Vector search. Using embeddings for semantic similarity matching
  • Full-text search. Leveraging SingleStore's built-in text search capabilities
  • Cloud functions. Exposing search functionality as cloud functions for the agent to call

In part II of this blog, we'll cover more on agent workflow

scheduled-job-for-scraping-and-parsing-documentationScheduled job for scraping and parsing documentation

the-challenge-of-fresh-documentationThe challenge of fresh documentation

One of the most critical aspects of any documentation-based chatbot is ensuring it has access to the most up-to-date information. Documentation evolves constantly — new features are added, bugs are fixed and explanations are clarified. A chatbot providing outdated information frustrates users and potentially leads them down incorrect paths.

For our SingleStore chatbot, we needed a reliable, automated system to:

  1. Discover all documentation pages
  2. Identify which pages were new or had been updated
  3. Scrape the content efficiently
  4. Transform HTML content into a more processable format
  5. Store everything in a structured database

This section details how we implemented this system as a scheduled job in SingleStore, ensuring our chatbot always has the latest information at its disposal.

designing-the-scraper-architectureDesigning the scraper architecture

When approaching this problem, we had several design considerations:

  • Efficiency. The scraper should minimize unnecessary downloads by only fetching new or updated content
  • Robustness. The system should handle network failures, timeouts and parsing errors gracefully
  • Scalability. The solution should efficiently process hundreds or thousands of pages
  • Maintainability. The code should be structured to be easily understood and modified

Our solution leverages Python's asynchronous capabilities to efficiently scrape multiple pages concurrently, while a SingleStore database stores both the raw content and processed markdown for each page.

implementation-detailsImplementation details

Setting up the database schema

The first step was creating an appropriate table structure in SingleStore to store the documentation content:

1

CREATE TABLE IF NOT EXISTS s2docs (

2

id INT AUTO_INCREMENT PRIMARY KEY,

3

url TEXT,

4

content LONGTEXT,

5

md_content LONGTEXT NULL,

6

scraped_at DATETIME,

7

lastmod DATETIME

8

);

This schema includes:

  • A unique identifier for each page
  • The URL of the page for reference
  • The original HTML content
  • The processed markdown content (for easier parsing and searching)
  • A timestamp for when the page was scraped
  • The last modification date from the sitemap (for determining updates)

Sitemap parsing logic

Rather than manually maintaining a list of URLs to scrape, we leverage the sitemap.xml file that most documentation sites expose. This follows a standard format and includes valuable metadata, like when pages were last modified:

1

def get_sitemap_urls(sitemap_url):

2

"""

3

Retrieve and parse the sitemap XML to extract all URLs listed with their

4

lastmod dates.

5

Returns a list of tuples (url, lastmod) where lastmod is a datetime object

6

or None.

7

"""

8

# Implementation details omitted for brevity

9

# The function handles different sitemap formats and parses lastmod dates

10

# Returns a list of (url, lastmod) tuples

By tracking the lastmod values from the sitemap, we can determine which pages have been updated since our last scrape, avoiding unnecessary processing.

Efficient concurrent scraping

To scrape hundreds of pages in a reasonable timeframe, we implemented an asynchronous scraping system using aiohttp and asyncio:

1

async def async_run_scraper(concurrency=10,

2

SPIDER_SITEMAP_URL="https://docs.singlestore.com/sitemap-0.xml"):

3

"""

4

Main async function that:

5

1. Retrieves the sitemap URLs and batch-checks already scraped URLs.

6

2. Scrapes pending pages concurrently using aiohttp, limited by a

7

semaphore for concurrency.

8

3. Only scrapes pages that are new or have been updated since last

9

scraped.

10

4. Prints progress updates every 1% of pending pages processed.

11

"""

12

# Implementation details omitted for brevity

The concurrency parameter allows us to control how many pages we scrape simultaneously, balancing speed against system resources and politeness to the web server.

HTML to markdown conversion

For easier processing in later stages, we convert the HTML content to markdown format:

1

def convert_html_to_markdown(html_content):

2

"""

3

Convert HTML content to markdown format.

4

"""

5

try:

6

# Configure html2text

7

h = html2text.HTML2Text()

8

h.ignore_links = False

9

h.ignore_images = False

10

h.ignore_tables = False

11

h.body_width = 0 # No wrapping

12

h.unicode_snob = True # Use Unicode instead of ASCII

13

h.single_line_break = True # Use single line breaks

14

15

# Convert HTML to markdown

16

markdown_content = h.handle(html_content)

17

return markdown_content

18

except Exception as e:

19

# Error handling omitted for brevity

20

return None

This conversion simplifies the subsequent chunking and embedding processes by removing HTML complexity while preserving the document's structure and information.

executing-as-a-scheduled-jobExecuting as a scheduled job

One of SingleStore's powerful features is the ability to run scheduled jobs, allowing us to periodically update our documentation database without manual intervention. We packaged our scraper into a notebook that can be run as a scheduled job:

1

# prepare target tables if they don't exist

2

create_table()

3

# Run scraper (e.g. via cron), using asynchronous scraping.

4

run_scraper()

5

print("Scraping complete.")

By scheduling this job to run daily (or at any appropriate interval), we ensure our chatbot always has access to the most up-to-date documentation.

The scraping and parsing component forms the foundation of our chatbot's knowledge base. By implementing it as a scheduled job in SingleStore, we've created a reliable system that ensures our chatbot always has the freshest information available.

scheduled-jobs-for-chunking-and-embedding-documentationScheduled jobs for chunking and embedding documentation

the-need-for-semantic-understandingThe need for semantic understanding

Once we've collected and parsed the documentation content, the next challenge is preparing it for efficient semantic search. Raw markdown documents — while more structured than HTML — are still not optimized for retrieval by an AI agent. To enable our chatbot to find the most relevant information, we need to:

  1. Clean and normalize the text content
  2. Break down documents into semantically coherent chunks
  3. Generate vector embeddings that capture the meaning of each chunk
  4. Store these chunks and embeddings in a way that enables fast retrieval

This section explores how we implemented this process as scheduled jobs in SingleStore, leveraging GPU acceleration for embedding generation.

data-cleaningData cleaning

Before chunking, we need to ensure our text data is properly cleaned and normalized. The main issue we observed was excessive whitespace in the markdown content after HTML parsing.

Cleaning implementation

We implemented a simple but effective cleaning function using regular expressions:

1

import re

2

3

# Define regex pattern for cleaning text: collapse runs of spaces, tabs, and

4

newlines

5

whitespace_pattern = re.compile(r'[ \t\r\n]+')

6

7

def clean_text(text):

8

"""Collapse all runs of whitespace (spaces, tabs, newlines) and trim."""

9

return whitespace_pattern.sub(' ', text).strip()

This function collapses all consecutive whitespace characters (spaces, tabs, newlines) into a single space and trims the result, which greatly improves the text quality for downstream processing.

Batch processing for efficiency

To handle large volumes of documentation efficiently, we process rows in batches:

1

def process_rows():

2

# code omitted for brevity

This batch approach allows us to process large datasets efficiently while maintaining progress visibility, and enabling resumption if interrupted.

chunking-and-embedding-generationChunking and embedding generation

After cleaning, we need to divide the documents into semantically meaningful chunks and generate vector embeddings for them. This process is handled by our GPU-accelerated embedding chunker.

Chunking strategy

I initially spent hours trying to implement one myself, but found that LangChain's RecursiveCharacterTextSplitter handles almost every edge case I could think of for our chunking and went with that. This chunker intelligently splits text by trying to keep semantically related content together.

1

from langchain.text_splitter import RecursiveCharacterTextSplitter

2

3

# Create a text splitter with appropriate parameters

4

text_splitter = RecursiveCharacterTextSplitter(

5

chunk_size=512, # Target chunk size in characters

6

chunk_overlap=50 # Overlap between chunks to maintain context

7

)

8

9

# Split a document into chunks

10

chunks = text_splitter.split_text(cleaned_text)

GPU-accelerated embedding generation

For embedding generation, we can leverage GPU acceleration (by running this code in a GPU- enabled environment like Aura GPU compute for notebooks/scheduled jobs ) when available to process chunks in batches efficiently:

1

def batch_encode_text(model, texts, batch_size=32, device='cuda' if

2

torch.cuda.is_available() else 'cpu'):

3

"""Encode a list of texts in batches, using GPU if available."""

4

if not texts:

5

return np.array([])

6

7

model = model.to(device)

8

all_embeddings = []

9

for i in range(0, len(texts), batch_size):

10

batch = texts[i:i+batch_size]

11

embeddings = model.encode(batch, convert_to_tensor=True)

12

if torch.is_tensor(embeddings):

13

embeddings = embeddings.cpu().numpy()

14

all_embeddings.append(embeddings)

15

if all_embeddings:

16

return np.vstack(all_embeddings)

17

return np.array([])

This function:

  1. Automatically uses GPU if available, significantly speeding up embedding generation
  2. Processes chunks in batches to optimize throughput and manage memory efficiently
  3. Converts tensor outputs to numpy arrays for compatibility with storage requirements

Database schema for vector storage

We store the chunks and their embeddings in a table optimized for vector search:

1

CREATE TABLE IF NOT EXISTS s2docs_chunks_{model_name} (

2

doc_id BIGINT NOT NULL,

3

source_url TEXT,

4

chunk_index INT NOT NULL,

5

chunk_text LONGTEXT,

6

embedding JSON DEFAULT NULL,

7

vector_embedding VECTOR(384) DEFAULT NULL,

8

SORT KEY (doc_id, chunk_index)

9

)

This schema includes:

  • A reference to the original document (doc_id)
  • The source URL for traceability
  • Position information (chunk_index) for maintaining document order
  • The text content of the chunk
  • The vector embedding in both JSON and VECTOR formats
  • Appropriate keys for efficient querying

Creating vector and full-text indexes

To enable efficient search, we create both vector and full-text indexes:

1

def create_indexes(cursor, table_name):

2

"""Create vector and fulltext indexes on a columnstore table in

3

SingleStore."""

4

# Create the vector index using ANN

5

cursor.execute(f"CREATE INDEX idx_{table_name}_vector ON

6

{table_name}(vector_embedding) USING ANN;")

7

8

# Create the fulltext index

9

cursor.execute(f"ALTER TABLE {table_name} ADD FULLTEXT INDEX

10

idx_{table_name}_text (chunk_text);")

These indexes enable:

  • Fast approximate nearest neighbor (ANN) searches for similar embeddings
  • Efficient full-text search capabilities
  • The foundation for hybrid search combining both approaches

optimizing-for-efficiency-and-resilienceOptimizing for efficiency and resilience

This implementation includes several optimizations to ensure the process is efficient and resilient.

Incremental processing

We track which documents have been processed to avoid redundant work if the current execution gets interrupted, and  reran this code:

1

def get_unprocessed_docs_count(cursor, table_name, last_processed_id):

2

"""Return the count of documents that need processing."""

3

count_query = f"""

4

SELECT COUNT(*) FROM s2docs

5

WHERE md_content_cleaned IS NOT NULL

6

AND md_content_cleaned != ''

7

AND id > {last_processed_id}

8

AND id NOT IN (SELECT doc_id FROM {table_name})

9

"""

10

cursor.execute(count_query)

11

return cursor.fetchone()[0]

This ensures we only process documents that are new or have been updated since the last run.

Checkpoint system

We implement a checkpoint system to enable resuming processing after interruptions:

1

def update_checkpoint(checkpoint_table, last_processed_id):

2

"""Update the checkpoint table with the latest processed doc id."""

3

# ...existing code...

This checkpoint system allows the process to recover after interruptions, picking up where it left off rather than starting over.

Parallel processing

To maximize throughput, especially on systems with multiple cores and GPU acceleration, we implement parallel processing:

1

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as

2

executor:

3

futures = [executor.submit(process_document_batch, batch, text_splitter,

4

embed_model)

5

for batch in doc_batches]

6

for future in concurrent.futures.as_completed(futures):

7

chunks_data = future.result()

8

insert_chunks(table_name, chunks_data)

This parallel approach significantly reduces processing time, especially for large document collections.

executing-as-scheduled-jobsExecuting as scheduled jobs

Both the cleaning and chunking/embedding processes are implemented as notebooks that can be executed as scheduled jobs in SingleStore:

1

# For data cleaning

2

process_rows()

3

4

# For chunking and embedding

5

process_documents(batch_size=200, max_workers=16)

By scheduling these jobs to run after the scraping job completes, we ensure our vector database stays current with minimal manual intervention.

While vector embeddings provide excellent semantic understanding, they're not always sufficient on their own. For documentation search, we wanted to combine the strengths of multiple search approaches:

  1. Vector search. Finding semantically similar content, even when keywords don't match
  2. Full-text search. Pinpointing exact keywords and phrases
  3. Combined approach. Leveraging both methods with configurable weights

This section explores how we implemented a hybrid search approach as cloud functions in SingleStore, making them accessible to our agent through simple API calls.

Each search approach has distinct advantages:

  • Vector search excels at understanding concepts and finding related information, even when terminology differs
  • Full-text search excels at finding exact matches and respecting keyword importance
  • Combined approaches provide the best of both worlds, leveraging semantic understanding while respecting keyword relevance

Our hybrid search implementation balances these approaches using configurable weights, allowing us to tune the search behavior based on the query characteristics.

implementation-with-fast-api-cloud-functionsImplementation with FastAPI cloud functions

We implemented our search functionality as serverless cloud functions in SingleStore using FastAPI, which:

  1. Receive search queries from the agent
  2. Process the query and generate embeddings
  3. Execute hybrid search against our document chunks
  4. Format and return the most relevant results

This architecture provides low latency, high scalability and easy integration with our agent.

Setting up the environment

Our implementation begins with setting up the necessary dependencies:

1

import json

2

import torch

3

import singlestoredb as s2

4

import singlestoredb.apps as apps

5

from pydantic import BaseModel

6

from sentence_transformers import SentenceTransformer

7

from fastapi import FastAPI, HTTPException

8

from fastapi.concurrency import run_in_threadpool

9

10

def connect_to_db(database_name='knowlagent'):

11

"""Return a new SingleStore DB connection."""

12

return s2.connect(database=database_name)

These imports set up our environment for:

  • Database connectivity through SingleStore
  • Embedding generation with SentenceTransformer
  • API endpoints with FastAPI
  • Concurrent processing with threadpool

Core hybrid search function

The heart of our implementation is the hybrid_search function, which combines vector similarity and full-text search:

1

def hybrid_search(query_text, model_name='all-MiniLM-L6-v2', top_k=5,

2

vector_weight=0.7, text_weight=0.3):

3

"""

4

Perform a hybrid search using both vector similarity and fulltext matches.

5

If the fulltext index is not found, fall back to a vector-only search.

6

This version computes the individual scores in a subquery, then combines

7

them.

8

"""

9

# code omitted for brevity
This function:
  1. Connects to the database where our document chunks are stored
  2. Generates an embedding for the query using the SentenceTransformer model
  3. Checks for the presence of a FULLTEXT index to determine search capabilities
  4. Executes a hybrid query that:
    1. Computes vector similarity using DOT_PRODUCT
    2. Performs full-text search using MATCH AGAINST
    3. Combines scores with configurable weights
    4. Orders results by the combined score
  5. Falls back to vector-only search if the FULLTEXT index isn't available
  6. Returns the top results ordered by relevance

FastAPI application definition

With our core search function in place, we expose it through FastAPI endpoints:

1

def query_hybrid(query_text, model_name, top_k, vector_weight, text_weight):

2

"""Wrapper to call hybrid_search with provided parameters."""

3

return hybrid_search(query_text, model_name=model_name, top_k=top_k,

4

vector_weight=vector_weight, text_weight=text_weight)

5

6

# Define a Pydantic model for the search request.

7

class SearchRequest(BaseModel):

8

query_text: str

9

top_k: int = 5

10

vector_weight: float = 0.7

11

text_weight: float = 0.3

12

model_name: str = "all-MiniLM-L6-v2"

13

14

app = FastAPI()

15

16

# Endpoint that accepts search parameters via the request body.

17

@app.post("/v2/search")

18

async def get_chunks(request: SearchRequest):

19

# code omitted for brevity

20

21

# Get search for chunks by query

22

@app.get("/v1/search/{query_text}")

23

async def get_chunks(query_text: str):

24

# code omitted for brevity

This setup provides:

  1. Two API endpoints:
    1. A flexible /v2/search POST endpoint that accepts all search parameters
    2. A simple /v1/search/{query_text} GET endpoint with default parameters
  2. Request validation through Pydantic models
  3. Non-blocking operation by running the database queries in a thread pool
  4. Proper error handling with descriptive HTTP exceptions

Deploying as a cloud function

Finally, we deploy our FastAPI app as a SingleStore cloud function:

1

connection_info = await apps.run_function_app(app)

This single line:

  1. Packages our FastAPI application as a cloud function
  2. Deploys it to the SingleStore environment
  3. Makes it available for API calls

advantages-of-our-hybrid-search-approachAdvantages of our hybrid search approach

Hybrid search approach offers several key advantages:

1. Weighted combination

By combining vector and full-text search with configurable weights, we can:

  • Prioritize semantic similarity or exact matches based on the use case
  • Fine-tune the search behavior for different types of queries
  • Achieve better overall results than either approach alone

2. Fallback mechanism

The implementation includes an automatic fallback to vector-only search if the FULLTEXT index isn't available, ensuring:

  • Robustness in different deployment scenarios
  • Graceful degradation rather than complete failure

3. Efficient SQL

We use a single SQL query with a subquery to:

  • Compute both similarity metrics in one database operation
  • Combine scores efficiently
  • Order and limit results in a single pass

The hybrid search cloud functions form the bridge between our knowledge base and our agent. By combining vector and full-text search approaches, we've created a flexible, powerful system that enables our chatbot to quickly retrieve relevant documentation.

In part two, we’ll walk through how to connect and use all this work by having our agent call this search endpoint for RAG.


Share