Speed Up LLMs Using a Semantic Cache Layer with SingleStoreDB

DL

David Lee

Cloud Solutions Engineer

Speed Up LLMs Using a Semantic Cache Layer with SingleStoreDB

Large Language Models (LLMs) like OpenAI’s ChatGPT are very large in size and complexity.  They are at the center of numerous applications, ranging from chatbots to robust generative tasks.

While they have an impressive ability to understand and output human-like text, they also present challenges when deployed in production — particularly in terms of latency and computational cost. A semantic cache layer addresses many of these challenges in LLM production workloads.

what-is-a-semantic-cache-layerWhat is a semantic cache layer?

A semantic cache not only stores previous results like a traditional cache, but it also understands the semantic meaning of the query. This means that queries or questions that are not exact matches can still provide previous answers if the intent of the request is the same.

This is crucial for LLM production workloads for a number of reasons:

  • Repetitive queries. Users asking very similar questions do not need to repeatedly invoke the LLM.
  • Reduced latency. A single call to the model can be resource intensive and time consuming. A semantic cache can answer questions almost instantaneously for a better user experience.
  • Scalability. Handling increased simultaneous requests can strain the system, and a semantic cache significantly offloads demand for computational resources
  • Cost. Lower operational costs by reducing the number of calls to the model.

why-use-single-store-db-as-the-semantic-cache-layerWhy use SingleStoreDB as the semantic cache layer?

SingleStoreDB is a real-time, distributed database designed for blazing fast queries with an architecture that supports a hybrid model for transactional and analytical workloads.  This pairs nicely with generative AI use cases as it allows for reading or writing data for both training and real-time tasks — without adding complexity and data movement from multiple products for the same task. SingleStoreDB also has a built-in plancache to speed up subsequent queries with the same plan.

Let's build this!

Tables

To illustrate this, we have a stock ticker data table:

CREATE TABLE stock_table (
ticker
varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci
DEFAULT NULL,
created_at
datetime DEFAULT NULL,
`
open` float DEFAULT NULL,
`
high` float DEFAULT NULL,
`
low` float DEFAULT NULL,
`
close` float DEFAULT NULL,
volume
int(11) DEFAULT NULL,
SORT
KEY (ticker, created_at desc),
SHARD
KEY (ticker)
);

The table is populated with about 6 million records.  Here are five random rows:

There is also a table to store the vector embeddings:

CREATE TABLE embeddings (
id
bigint(11) NOT NULL AUTO_INCREMENT,
category
varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci
DEFAULT NULL,
question longtext
CHARACTER SET utf8 COLLATE utf8_general_ci,
question_embedding longblob,
answer longtext
CHARACTER SET utf8 COLLATE utf8_general_ci,
answer_embedding longblob,
created_at
datetime DEFAULT NULL,
UNIQUE KEY `PRIMARY (id) USING HASH,
SHARD
KEY __SHARDKEY (id),
SORT
KEY __UNORDERED ()
);

lets-create-the-llm-in-single-store-db-notebooks-using-langchainLet’s create the LLM in SingleStoreDB notebooks using Langchain

Imports and variables

import singlestoredb as s2
import getpass
import numpy as np
import openai
from sqlalchemy import create_engine
from langchain.agents import create_sql_agent
from langchain.agents.agent_toolkits import SQLDatabaseToolkit
from langchain.sql_database import SQLDatabase
from langchain.llms.openai import OpenAI
from langchain.agents import AgentExecutor
import time
from openai.embeddings_utils import get_embeddings
apikey = getpass.getpass("Enter openai apikey here")
os.environ["OPENAI_API_KEY"] = apikey
model = 'text-embedding-ada-002'
table_name = 'embeddings'
s2_conn = s2.connect(connection_url)

Agent executor

# Create the agent executor
db = SQLDatabase.from_uri(connection_url, include_tables=['embeddings',
'stock_table'], sample_rows_in_table_info=1)
llm = OpenAI(openai_api_key=os.environ["OPENAI_API_KEY"], temperature=0,
verbose=True)
toolkit = SQLDatabaseToolkit(db=db, llm=llm)

agent_executor = create_sql_agent(
    llm=OpenAI(temperature=0),
    toolkit=toolkit,
    verbose=True,
    top_k=3,
    max_iterations=5
)

Function to process user questions

Now that the LLM is set up, we can write a function that embeds a question and checks that against the most semantically similar question in our database. If it is above a high threshold, we can assume the user is asking the same kind of question that had been asked before, and output the previous answer.  If that question was not asked before it will make the call to our model.

def process_user_question(question):
    print(f'\nQuestion asked: {question}')
    category = 'chatbot'

    # Record the start time
    start_time = time.time()

    question_embedding= [np.array(x, '<f4') for x in
get_embeddings([question], api_key=apikey, engine=model)]

    # Calculate the elapsed time
    elapsed_time = (time.time() - start_time) * 1000
    print(f"Execution time for getting the question embedding:
{elapsed_time:.2f} milliseconds")

    params = {
              'question_embedding': question_embedding,
            }
    # Check if embedding is similar to existing questions
    stmt = f'select question, answer, dot_product( %(question_embedding)s,
question_embedding) :> float as score from embeddings where
category="chatbot" order by score desc limit 1;'


    with s2_conn.cursor() as cur:
        # Record the start time
        start_time = time.time()

        cur.execute(stmt, params)
        row = cur.fetchone()

        elapsed_time = (time.time() - start_time) * 1000
        print(f"Execution time for checking existing questions:
{elapsed_time:.2f} milliseconds")

        try:

            question2, answer, score = row
            print(f"\nClosest Matching row:\nQuestion: {question2}\nAnswer:
{answer}\nSimilarity Score: {score}")

            if score >.97:
                print('Action to take: Using existing answer')
                return answer

            else:
                print('Action to take: Running agent_executor')


                # Record the start time
                start_time = time.time()

                answer2 = agent_executor.run(question)

                # Calculate the elapsed time
                elapsed_time = (time.time() - start_time) * 1000
                print(f"agent_executor execution time: {elapsed_time:.2f}
milliseconds")
                created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

                # Record the start time
                start_time = time.time()

                answer_embedding = [np.array(x, '<f4') for x in
get_embeddings([answer2], api_key=apikey, engine=model)]

                # Calculate the elapsed time
                elapsed_time = (time.time() - start_time) * 1000
                print(f"Answer embeddings execution time:
{elapsed_time:.2f} milliseconds")

                params = {'category': category, 'question': question,
                        'question_embedding': question_embedding,
                        'answer': answer2, 'answer_embedding':
answer_embedding,
                        'created_at': created_at}

                # Send to SingleStoreDB
                stmt = f"INSERT INTO {table_name} (category, question,
question_embedding, answer, answer_embedding, created_at) VALUES
(%(category)s, \n%(question)s, \n%(question_embedding)s, \n%(answer)s,
\n%(answer_embedding)s, \n%(created_at)s)"

                # Record the start time
                start_time = time.time()

                with s2_conn.cursor() as cur:
                    cur.execute(stmt, params)

                # Calculate the elapsed time
                elapsed_time = (time.time() - start_time) * 1000
                print(f"Insert to SingleStore execution time:
{elapsed_time:.2f} milliseconds")

                return answer2


        except:
            print('No existing rows.  Running agent_executor')


            # Record the start time
            start_time = time.time()

            answer2 = agent_executor.run(question)

            # Calculate the elapsed time
            elapsed_time = (time.time() - start_time) * 1000
            print(f"agent_executor execution time: {elapsed_time:.2f}
milliseconds")

            created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

            # Record the start time
            start_time = time.time()

            answer_embedding = [np.array(x, '<f4') for x in
get_embeddings([answer2], api_key=apikey, engine=model)]

            # Calculate the elapsed time
            elapsed_time = (time.time() - start_time) * 1000
            print(f"Answer embeddings execution time: {elapsed_time:.2f}
milliseconds")

            params = {'category': category, 'question': question,
                    'question_embedding': question_embedding,
                    'answer': answer2, 'answer_embedding':
answer_embedding,
                    'created_at': created_at}

            # Send to SingleStoreDB
            stmt = f"INSERT INTO {table_name} (category, question,
question_embedding, answer, answer_embedding, created_at) VALUES
(%(category)s, \n%(question)s, \n%(question_embedding)s, \n%(answer)s,
\n%(answer_embedding)s, \n%(created_at)s)"

            # Record the start time
            start_time = time.time()

            with s2_conn.cursor() as cur:
                cur.execute(stmt, params)

            # Calculate the elapsed time
            elapsed_time = (time.time() - start_time) * 1000
            print(f"Insert to SingleStore execution time:
{elapsed_time:.2f} milliseconds")

            return answer2

Putting it to the test

Here are two questions that have nearly the same meaning:

question_1 = "describe the database"
question_2 = "describe database"

Processing the first question will make a call to the model, since it hasn’t been asked before. This took 4.44 seconds to complete.


               

The next question is asking virtually the same thing, but the semantic cache is leveraged.  This takes 286 milliseconds to complete, an improvement of over 15.5x!

conclusionConclusion

In the rapidly evolving landscape of AI and LLMs, it’s extremely important to ensure that systems are fast, efficient and scalable.  A semantic cache layer solves many challenges in production workloads. Combining this with SingleStoreDB — which is built on the same principles ± promotes a better developer and user experience, while improving operational efficiency and reducing costs associated with computational resources.

Try SingleStoreDB for free today.


Share