New

How to Build a Multi-Agent AI App with AutoGen

Notebook

SingleStore Notebooks

How to Build a Multi-Agent AI App with AutoGen

Note

This notebook can be run on a Free Starter Workspace. To create a Free Starter Workspace navigate to Start using the left nav. You can also use your existing Standard or Premium workspace with this Notebook.

Python Notebook Introduction

This Jupyter notebook is designed to demonstrate the use of various Python libraries for text processing, document loading, and vector embeddings. It also showcases the use of the OpenAI API for generating embeddings and the SingleStoreDB for storing and retrieving documents.

The notebook is divided into several sections:

  1. Installation of Required Libraries: This section covers the installation of necessary libraries such as langchain_community, pyautogen, langchain_openai, langchain_text_splitters, and unstructured.

  2. Data Loading and Preparation: This section involves loading a markdown document from a URL and preparing it for further processing.

  3. Document Splitting and Embedding Generation: This section demonstrates how to split the loaded document into smaller parts and generate embeddings for each part using the OpenAI API.

  4. SingleStoreDB Setup: This section covers the setup of SingleStoreDB for storing and retrieving documents.

  5. Agent Setup and Group Chat Simulation: This section demonstrates the setup of various agents (like a boss, coder, product manager, and code reviewer) and simulates a group chat among them to solve a given problem.

  6. Chat Simulation: This section runs the chat simulation without and with the Retrieve and Generate (RAG) model.

Please ensure that you have the necessary API keys and environment variables set up before running this notebook.

In [1]:

# Check if the database is running on a shared tier
shared_tier_check = %sql show variables like 'is_shared_tier'
# If not on a shared tier, or if the shared tier is turned off, drop the existing database and create a new one
if not shared_tier_check or shared_tier_check[0][1] == 'OFF':
%sql DROP DATABASE IF EXISTS autogen
%sql CREATE DATABASE autogen

In [2]:

!pip install --quiet langchain_community pyautogen langchain_openai langchain_text_splitters unstructured

In [3]:

!pip install --quiet markdown

In [4]:

import requests
r = requests.get("https://raw.githubusercontent.com/microsoft/FLAML/main/website/docs/Examples/Integrate%20-%20Spark.md")
open('example.md', 'wb').write(r.content)

In [5]:

from langchain_community.vectorstores import SingleStoreDB
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain_text_splitters import CharacterTextSplitter
from typing import List, Dict, Union
import os
loader = UnstructuredMarkdownLoader("./example.md")
os.environ["OPENAI_API_KEY"] = "api-key"
data = loader.load()
text_splitter = CharacterTextSplitter()
docs = text_splitter.split_documents(data)
embeddings = OpenAIEmbeddings()
os.environ["SINGLESTOREDB_URL"] = "admin:pass@host:3306/db"

In [6]:

singlestore_db = SingleStoreDB.from_documents(
docs,
embeddings,
table_name="notebook2", # use table with a custom name
)

In [7]:

!pip install --quiet pyautogen[retrievechat]

In [8]:

class SingleStoreRetrieveUserProxyAgent(RetrieveUserProxyAgent):
def __init__(self, singlestore_db: SingleStoreDB, **kwargs):
super().__init__(**kwargs)
self.singlestore_db = singlestore_db
def query_vector_db(
self,
query_texts: List[str],
n_results: int = 10,
search_string: str = "",
**kwargs,
) -> Dict[str, List[List[str]]]:
documents = []
ids = []
for query_index, query_text in enumerate(query_texts):
searched_docs = self.singlestore_db.similarity_search(
query=query_text,
k=n_results,
)
# Assuming searched_docs is a list of documents with only 'page_content' property
batch_documents = [doc.page_content for doc in searched_docs]
documents.append(batch_documents)
# Generate a unique ID for each document based on enumeration
batch_ids = [f"{query_index}-{i}" for i in range(len(batch_documents))]
ids.append(batch_ids)
return {
"ids": ids,
"documents": documents,
}
def retrieve_docs(self, problem: str, n_results: int = 20, search_string: str = "", **kwargs):
results = self.query_vector_db(
query_texts=[problem],
n_results=n_results,
search_string=search_string,
**kwargs,
)
self._results = results

In [9]:

import os
os.environ["OPENAI_API_KEY"] =
os.environ["AUTOGEN_USE_DOCKER"] = "False"

In [10]:

import autogen
from autogen.agentchat.contrib.retrieve_assistant_agent import RetrieveAssistantAgent
from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent
from autogen import config_list_from_json
from autogen import AssistantAgent

In [11]:

llm_config = {
"config_list": [{"model": "gpt-3.5-turbo", "api_key": os.environ["OPENAI_API_KEY"]}],
}
def termination_msg(x):
return isinstance(x, dict) and "TERMINATE" == str(x.get("content", ""))[-9:].upper()
boss = autogen.UserProxyAgent(
name="Boss",
is_termination_msg=termination_msg,
human_input_mode="NEVER",
code_execution_config=False, # we don't want to execute code in this case.
default_auto_reply="Reply `TERMINATE` if the task is done.",
description="The boss who ask questions and give tasks.",
)
boss_aid = SingleStoreRetrieveUserProxyAgent(
name="Boss_Assistant",
is_termination_msg=termination_msg,
human_input_mode="NEVER",
max_consecutive_auto_reply=3,
retrieve_config={
"task": "code",
},
code_execution_config=False, # we don't want to execute code in this case.
description="Assistant who has extra content retrieval power for solving difficult problems.",
singlestore_db=singlestore_db
)
coder = autogen.AssistantAgent(
name="Senior_Python_Engineer",
is_termination_msg=termination_msg,
system_message="You are a senior python engineer, you provide python code to answer questions. Reply `TERMINATE` in the end when everything is done.",
llm_config=llm_config,
description="Senior Python Engineer who can write code to solve problems and answer questions.",
)
pm = autogen.AssistantAgent(
name="Product_Manager",
is_termination_msg=termination_msg,
system_message="You are a product manager. Reply `TERMINATE` in the end when everything is done.",
llm_config=llm_config,
description="Product Manager who can design and plan the project.",
)
reviewer = autogen.AssistantAgent(
name="Code_Reviewer",
is_termination_msg=termination_msg,
system_message="You are a code reviewer. Reply `TERMINATE` in the end when everything is done.",
llm_config=llm_config,
description="Code Reviewer who can review the code.",
)
PROBLEM = "How to use spark for parallel training in FLAML? Give me sample code."
def _reset_agents():
boss.reset()
boss_aid.reset()
coder.reset()
pm.reset()
reviewer.reset()
def rag_chat():
_reset_agents()
groupchat = autogen.GroupChat(
agents=[boss_aid, pm, coder, reviewer], messages=[], max_round=12, speaker_selection_method="round_robin"
)
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)
# Start chatting with boss_aid as this is the user proxy agent.
boss_aid.initiate_chat(
manager,
problem=PROBLEM,
n_results=3,
)
def norag_chat():
_reset_agents()
groupchat = autogen.GroupChat(
agents=[boss, pm, coder, reviewer],
messages=[],
max_round=12,
speaker_selection_method="auto",
allow_repeat_speaker=False,
)
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)
# Start chatting with the boss as this is the user proxy agent.
boss.initiate_chat(
manager,
message=PROBLEM,
)
def call_rag_chat():
_reset_agents()
# In this case, we will have multiple user proxy agents and we don't initiate the chat
# with RAG user proxy agent.
# In order to use RAG user proxy agent, we need to wrap RAG agents in a function and call
# it from other agents.
def retrieve_content(
message: Annotated[
str,
"Refined message which keeps the original meaning and can be used to retrieve content for code generation and question answering.",
],
n_results: Annotated[int, "number of results"] = 3,
) -> str:
boss_aid.n_results = n_results # Set the number of results to be retrieved.
# Check if we need to update the context.
update_context_case1, update_context_case2 = boss_aid._check_update_context(message)
if (update_context_case1 or update_context_case2) and boss_aid.update_context:
boss_aid.problem = message if not hasattr(boss_aid, "problem") else boss_aid.problem
_, ret_msg = boss_aid._generate_retrieve_user_reply(message)
else:
ret_msg = boss_aid.generate_init_message(message, n_results=n_results)
return ret_msg if ret_msg else message
boss_aid.human_input_mode = "NEVER" # Disable human input for boss_aid since it only retrieves content.
for caller in [pm, coder, reviewer]:
d_retrieve_content = caller.register_for_llm(
description="retrieve content for code generation and question answering.", api_style="function"
)(retrieve_content)
for executor in [boss, pm]:
executor.register_for_execution()(d_retrieve_content)
groupchat = autogen.GroupChat(
agents=[boss, pm, coder, reviewer],
messages=[],
max_round=12,
speaker_selection_method="round_robin",
allow_repeat_speaker=False,
)
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)
# Start chatting with the boss as this is the user proxy agent.
boss.initiate_chat(
manager,
message=PROBLEM,
)

In [12]:

norag_chat()

In [13]:

rag_chat()

In [14]:

shared_tier_check = %sql show variables like 'is_shared_tier'
if not shared_tier_check or shared_tier_check[0][1] == 'OFF':
%sql DROP DATABASE IF EXISTS autogen

Details

Tags

#starter#autogen#rag#multiagent#groupchat

License

This Notebook has been released under the Apache 2.0 open source license.