New

Vector Search with Kai

Notebook

SingleStore Notebooks

Vector Search with Kai

Vector Search with Kai

In this notebook, we load a dataset into a collection, create a vector index and perform vector searches using Kai in a way that is compatible with MongoDB clients and applications

In [1]:

!pip install datasets --quiet

In [2]:

import os
import pprint
import time
import concurrent.futures
import datasets
from pymongo import MongoClient
from datasets import load_dataset
from bson import json_util

1. Initializing a pymongo client

In [3]:

current_database = %sql SELECT DATABASE() as CurrentDatabase
DB = current_database[0][0]
COLLECTION = 'wiki_embeddings'

In [4]:

# Using the environment variable that holds the kai endpoint
client = MongoClient(connection_url_kai)
collection = client[DB][COLLECTION]

2. Create a collection and load the dataset

It is recommended that you create a collection with the embedding field as a top level column for optimized utilization of storage. The name of the column should be the name of the field holding the embedding

In [5]:

client[DB].create_collection(COLLECTION,
columns=[{ 'id': "emb", 'type': "VECTOR(768) NOT NULL" }],
);

In [6]:

# Using the "wikipedia-22-12-simple-embeddings" dataset from Hugging Face
dataset = load_dataset("Cohere/wikipedia-22-12-simple-embeddings", split="train")

In [7]:

DB_SIZE = 50000 #Currently loading 50k documents to the collection, can go to a max of 485,859 for this dataset
insert_data = []
insert_count = 0
# Iterate through the dataset and prepare the documents for insertion
# The script below ingests 1000 records into the database at a time
for item in dataset:
if insert_count >= DB_SIZE:
break
# Convert the dataset item to MongoDB document format
doc_item = json_util.loads(json_util.dumps(item))
insert_data.append(doc_item)
# Insert in batches of 1000 documents
if len(insert_data) == 1000:
collection.insert_many(insert_data)
insert_count += 1000
print(f"{insert_count} of {DB_SIZE} records ingested")
insert_data = []
# Insert any remaining documents
if len(insert_data) > 0:
collection.insert_many(insert_data)
print("Data Ingested")

A sample document from the collection

In [8]:

sample_doc = collection.find_one()
pprint.pprint(sample_doc, compact=True)

3. Create a vector Index

In [9]:

client[DB].command({
'createIndexes': COLLECTION,
'indexes': [{
'key': {'emb': 'vector'},
'name': 'vector_index',
'kaiSearchOptions': {"index_type":"AUTO", "metric_type": "EUCLIDEAN_DISTANCE", "dimensions": 768}
}],
})

Selecting the query embedding from the sample_doc selected above

In [10]:

# input vector
query_vector = sample_doc['emb']

4. Perform a vector search

In [11]:

def execute_kai_search(query_vector):
pipeline = [
{
'$vectorSearch': {
"index": "vector_index",
"path": "emb",
"queryVector": query_vector,
"numCandidates": 20,
"limit": 3,
}
},
{
'$project': {
'_id':1,
'text': 1,
}
}
]
results = collection.aggregate(pipeline)
return list(results)

In [12]:

execute_kai_search(query_vector)

Running concurrent vector search queries

In [13]:

num_concurrent_queries = 250
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent_queries) as executor:
futures = [executor.submit(execute_kai_search, query_vector) for _ in range(num_concurrent_queries)]
concurrent.futures.wait(futures)
end_time = time.time()
print(f"Executed {num_concurrent_queries} concurrent queries.")
print(f"Total execution time: {end_time - start_time} seconds")
for f in futures:
if f.exception() is not None:
print(f.exception())
failed_count = sum(1 for f in futures if f.exception() is not None)
print(f"Failed queries: {failed_count}")

This shows the Kai can create vector indexes instantaneously and perform a large number of concurrent vector search queries surpassing MongoDB Atlas Vector Search capabilities

Details

About this Template

Run Vector Search using MongoDB clients and power GenAI usecases for your MongoDB applications

Tags

mongoembeddingsvectorgenaikaistarter

License

This Notebook has been released under the Apache 2.0 open source license.