
Large Language Models (LLMs) like OpenAI’s ChatGPT are very large in size and complexity. They are at the center of numerous applications, ranging from chatbots to robust generative tasks.
While they have an impressive ability to understand and output human-like text, they also present challenges when deployed in production — particularly in terms of latency and computational cost. A semantic cache layer addresses many of these challenges in LLM production workloads.
What is a semantic cache layer?
A semantic cache not only stores previous results like a traditional cache, but it also understands the semantic meaning of the query. This means that queries or questions that are not exact matches can still provide previous answers if the intent of the request is the same.
This is crucial for LLM production workloads for a number of reasons:
- Repetitive queries. Users asking very similar questions do not need to repeatedly invoke the LLM.
- Reduced latency. A single call to the model can be resource intensive and time consuming. A semantic cache can answer questions almost instantaneously for a better user experience.
- Scalability. Handling increased simultaneous requests can strain the system, and a semantic cache significantly offloads demand for computational resources
- Cost. Lower operational costs by reducing the number of calls to the model.
Why use SingleStoreDB as the semantic cache layer?
SingleStoreDB is a real-time, distributed database designed for blazing fast queries with an architecture that supports a hybrid model for transactional and analytical workloads. This pairs nicely with generative AI use cases as it allows for reading or writing data for both training and real-time tasks — without adding complexity and data movement from multiple products for the same task. SingleStoreDB also has a built-in plancache to speed up subsequent queries with the same plan.
Let's build this!
Tables
To illustrate this, we have a stock ticker data table:
1CREATE TABLE stock_table (2 ticker varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, 3 created_at datetime DEFAULT NULL, 4 `open` float DEFAULT NULL, 5 `high` float DEFAULT NULL, 6 `low` float DEFAULT NULL, 7 `close` float DEFAULT NULL, 8 volume int(11) DEFAULT NULL, 9 SORT KEY (ticker, created_at desc), 10 SHARD KEY (ticker)11);
The table is populated with about 6 million records. Here are five random rows:

There is also a table to store the vector embeddings:
1CREATE TABLE embeddings (2 id bigint(11) NOT NULL AUTO_INCREMENT, 3 category varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, 4 question longtext CHARACTER SET utf8 COLLATE utf8_general_ci, 5 question_embedding longblob, 6 answer longtext CHARACTER SET utf8 COLLATE utf8_general_ci, 7 answer_embedding longblob, 8 created_at datetime DEFAULT NULL, 9 UNIQUE KEY `PRIMARY (id) USING HASH,10SHARD KEY __SHARDKEY (id),11SORT KEY __UNORDERED ()12);
1import singlestoredb as s22import getpass3import numpy as np4import openai5from sqlalchemy import create_engine6from langchain.agents import create_sql_agent7from langchain.agents.agent_toolkits import SQLDatabaseToolkit8from langchain.sql_database import SQLDatabase9from langchain.llms.openai import OpenAI10from langchain.agents import AgentExecutor11import time12from openai.embeddings_utils import get_embeddings13apikey = getpass.getpass("Enter openai apikey here")14os.environ["OPENAI_API_KEY"] = apikey15model = 'text-embedding-ada-002'16table_name = 'embeddings'17s2_conn = s2.connect(connection_url)
Agent executor
1# Create the agent executor2db = SQLDatabase.from_uri(3 connection_url,4 include_tables=['embeddings', 'stock_table'],5 sample_rows_in_table_info=16)7llm = OpenAI(8 openai_api_key=os.environ["OPENAI_API_KEY"],9 temperature=0,10 verbose=True11)12toolkit = SQLDatabaseToolkit(db=db, llm=llm)13 14agent_executor = create_sql_agent(15 llm=OpenAI(temperature=0),16 toolkit=toolkit,17 verbose=True,18 top_k=3,19 max_iterations=520)
Function to process user questions
Now that the LLM is set up, we can write a function that embeds a question and checks that against the most semantically similar question in our database. If it is above a high threshold, we can assume the user is asking the same kind of question that had been asked before, and output the previous answer. If that question was not asked before it will make the call to our model.
1def process_user_question(question):2 print(f'\nQuestion asked: {question}')3 category = 'chatbot'4 5 # Record the start time6 start_time = time.time()7 8 question_embedding= [np.array(x, '<f4') for x in9get_embeddings([question], api_key=apikey, engine=model)]10 11 # Calculate the elapsed time12 elapsed_time = (time.time() - start_time) * 100013 print(f"Execution time for getting the question embedding:14{elapsed_time:.2f} milliseconds")15 16 params = {17 'question_embedding': question_embedding,18 }19 # Check if embedding is similar to existing questions20 stmt = f'select question, answer, dot_product( %(question_embedding)s,21question_embedding) :> float as score from embeddings where22category="chatbot" order by score desc limit 1;'23 24 25 with s2_conn.cursor() as cur:26 # Record the start time27 start_time = time.time()28 29 cur.execute(stmt, params)30 row = cur.fetchone()31 32 elapsed_time = (time.time() - start_time) * 100033 print(f"Execution time for checking existing questions:34{elapsed_time:.2f} milliseconds")35 36 try:37 38 question2, answer, score = row39 print(f"\nClosest Matching row:\nQuestion: {question2}\nAnswer:40{answer}\nSimilarity Score: {score}")41 42 if score >.97:43 print('Action to take: Using existing answer')44 return answer45 46 else:47 print('Action to take: Running agent_executor')48 49 50 # Record the start time51 start_time = time.time()52 53 answer2 = agent_executor.run(question)54 55 # Calculate the elapsed time56 elapsed_time = (time.time() - start_time) * 100057 print(f"agent_executor execution time: {elapsed_time:.2f}58milliseconds")59 created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")60 61 # Record the start time62 start_time = time.time()63 64 answer_embedding = [np.array(x, '<f4') for x in65get_embeddings([answer2], api_key=apikey, engine=model)]66 67 # Calculate the elapsed time68 elapsed_time = (time.time() - start_time) * 100069 print(f"Answer embeddings execution time:70{elapsed_time:.2f} milliseconds")71 72 params = {'category': category, 'question': question,73 'question_embedding': question_embedding,74 'answer': answer2, 'answer_embedding':75answer_embedding,76 'created_at': created_at}77 78 # Send to SingleStoreDB79 stmt = f"INSERT INTO {table_name} (category, question,80question_embedding, answer, answer_embedding, created_at) VALUES81(%(category)s, \n%(question)s, \n%(question_embedding)s, \n%(answer)s,82\n%(answer_embedding)s, \n%(created_at)s)"83 84 # Record the start time85 start_time = time.time()86 87 with s2_conn.cursor() as cur:88 cur.execute(stmt, params)89 90 # Calculate the elapsed time91 elapsed_time = (time.time() - start_time) * 100092 print(f"Insert to SingleStore execution time:93{elapsed_time:.2f} milliseconds")94 95 return answer296 97 98 except:99 print('No existing rows. Running agent_executor')100 101 102 # Record the start time103 start_time = time.time()104 105 answer2 = agent_executor.run(question)106 107 # Calculate the elapsed time108 elapsed_time = (time.time() - start_time) * 1000109 print(f"agent_executor execution time: {elapsed_time:.2f}110milliseconds")111 112 created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")113 114 # Record the start time115 start_time = time.time()116 117 answer_embedding = [np.array(x, '<f4') for x in118get_embeddings([answer2], api_key=apikey, engine=model)]119 120 # Calculate the elapsed time121 elapsed_time = (time.time() - start_time) * 1000122 print(f"Answer embeddings execution time: {elapsed_time:.2f}123milliseconds")124 125 params = {'category': category, 'question': question,126 'question_embedding': question_embedding,127 'answer': answer2, 'answer_embedding':128answer_embedding,129 'created_at': created_at}130 131 # Send to SingleStoreDB132 stmt = f"INSERT INTO {table_name} (category, question,133question_embedding, answer, answer_embedding, created_at) VALUES134(%(category)s, \n%(question)s, \n%(question_embedding)s, \n%(answer)s,135\n%(answer_embedding)s, \n%(created_at)s)"136 137 # Record the start time138 start_time = time.time()139 140 with s2_conn.cursor() as cur:141 cur.execute(stmt, params)142 143 # Calculate the elapsed time144 elapsed_time = (time.time() - start_time) * 1000145 print(f"Insert to SingleStore execution time:146{elapsed_time:.2f} milliseconds")147 148 return answer2
Putting it to the test
Here are two questions that have nearly the same meaning:
1question_1 = "describe the database"2question_2 = "describe database"
Processing the first question will make a call to the model, since it hasn’t been asked before. This took 4.44 seconds to complete.

The next question is asking virtually the same thing, but the semantic cache is leveraged. This takes 286 milliseconds to complete, an improvement of over 15.5x!

Conclusion
In the rapidly evolving landscape of AI and LLMs, it’s extremely important to ensure that systems are fast, efficient and scalable. A semantic cache layer solves many challenges in production workloads. Combining this with SingleStoreDB — which is built on the same principles ± promotes a better developer and user experience, while improving operational efficiency and reducing costs associated with computational resources.