Publish your first SingleStore Cloud function
Notebook
Note
This notebook can be run on a Free Starter Workspace. To create a Free Starter Workspace navigate to Start using the left nav. You can also use your existing Standard or Premium workspace with this Notebook.
This Jupyter notebook will help you build your first Cloud Function, showcasing how to leverage the ultra-fast queries of SingleStore to build a responsive API server using FastAPI
Create some simple tables
This setup establishes a basic relational structure to store some items information.
In [1]:
%%sqlDROP TABLE IF EXISTS items;CREATE TABLE IF NOT EXISTSitems (id INT PRIMARY KEY,name VARCHAR(255),price FLOAT);
Create a Connection Pool
To run multiple simultaneous queries, we use sqlalchemy to create a pool of sql connections to the workspace you have selected. We also define a method to execute queries and transactions using a connection from this pool.
In [2]:
from sqlalchemy import create_engine, textimport requestsca_cert_url = "https://portal.singlestore.com/static/ca/singlestore_bundle.pem"ca_cert_path = "/tmp/singlestore_bundle.pem"response = requests.get(ca_cert_url)with open(ca_cert_path, "wb") as f:f.write(response.content)sql_connection_string = connection_url.replace("singlestoredb", "mysql+pymysql")engine = create_engine(f"{sql_connection_string}?ssl_ca={ca_cert_path}",pool_size=10, # Maximum number of connections in the pool is 10max_overflow=5, # Allow up to 5 additional connections (temporary overflow)pool_timeout=30 # Wait up to 30 seconds for a connection from the pool)def execute_query(query: str):with engine.connect() as connection:return connection.execute(text(query))def execute_transaction(transactional_query: str):with engine.connect() as connection:transaction = connection.begin()try:result = connection.execute(text(transactional_query))transaction.commit()return resultexcept Exception as e:transaction.rollback()raise e
Setup Environment
Lets setup the environment ro run a FastAPI app defining the Data Model and an executor to run the different requests in different threads simultaneously
In [3]:
from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelfrom singlestoredb import connectfrom concurrent.futures import ThreadPoolExecutorimport asyncio# Define the Type of the Dataclass Item(BaseModel):id: intname: strprice: float# Create an executor that can execute queries on multiple threads simultaneouslyexecutor = ThreadPoolExecutor()def run_in_thread(fn, *args):loop = asyncio.get_event_loop()return loop.run_in_executor(executor, fn, *args)
Define FastAPI App
Next, we will be defining a FastAPI app that can insert, query and delete data from your table
In [4]:
app = FastAPI()# Get all items@app.get("/items", response_model=list[Item])async def get_items():def get_items_query():result = execute_query("SELECT * FROM items;")rows = result.fetchall()return [{"id": row[0], "name": row[1], "price": row[2]} for row in rows]try:return await run_in_thread(get_items_query)except Exception as e:raise HTTPException(status_code=500, detail=f"Error fetching all items: {str(e)}")# Insert an item@app.post("/items", response_model=dict)async def create_item(item: Item):def insert_item_query():result = execute_transaction(f"INSERT INTO items (id, name, price) VALUES ({item.id}, '{item.name}', {item.price})")return {"message": f"Item with id {item.id} inserted successfully"}try:return await run_in_thread(insert_item_query)except Exception as e:raise HTTPException(status_code=500, detail=f"Error while inserting item with id {item.id}: {str(e)}")# Get item by id@app.get("/items/{item_id}", response_model=Item)async def get_item(item_id: int):def get_item_query():result = execute_query(f"SELECT * FROM items WHERE id={item_id}")row = result.fetchone()if not row:raise HTTPException(status_code=404, detail="Item not found")return {"id": row[0], "name": row[1], "price": row[2]}try:return await run_in_thread(get_item_query)except HTTPException as e:raise eexcept Exception as e:raise HTTPException(status_code=500, detail=f"Error fetching item with id {item_id}: {str(e)}")# Delete item by id@app.delete("/items/{item_id}", response_model=dict)async def delete_item(item_id: int):def delete_item_query():result = execute_transaction(f"DELETE FROM items WHERE id={item_id}")return {"message": f"number of rows deleted: {result.rowcount}"}try:return await run_in_thread(delete_item_query)except Exception as e:raise HTTPException(status_code=500, detail=f"Error deleting item with id {item_id}: {str(e)}")
Start the FastAPI server
The link at which the cloud function will be available interactively will be displayed.
In [5]:
import singlestoredb.apps as appsconnection_info = await apps.run_function_app(app)
Publish Cloud Function
After validating the Cloud Function interactively, you can publish it and use it as an API server for your data!
Details
About this Template
Learn how to connect to SingleStoreDB and perform basicCRUD operations and finally deploy these functions as callable API endpoints.
This Notebook can be run in Shared Tier, Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.