New

Publish your first SingleStore Cloud function

Notebook


SingleStore Notebooks

Publish your first SingleStore Cloud function

Note

This notebook can be run on a Free Starter Workspace. To create a Free Starter Workspace navigate to Start using the left nav. You can also use your existing Standard or Premium workspace with this Notebook.

This Jupyter notebook will help you build your first Cloud Function, showcasing how to leverage the ultra-fast queries of SingleStore to build a responsive API server using FastAPI

Create some simple tables

This setup establishes a basic relational structure to store some items information.

In [1]:

%%sql
DROP TABLE IF EXISTS items;
CREATE TABLE IF NOT EXISTS
items (
id INT PRIMARY KEY,
name VARCHAR(255),
price FLOAT
);

Create a Connection Pool

To run multiple simultaneous queries, we use sqlalchemy to create a pool of sql connections to the workspace you have selected. We also define a method to execute queries and transactions using a connection from this pool.

In [2]:

from sqlalchemy import create_engine, text
import requests
ca_cert_url = "https://portal.singlestore.com/static/ca/singlestore_bundle.pem"
ca_cert_path = "/tmp/singlestore_bundle.pem"
response = requests.get(ca_cert_url)
with open(ca_cert_path, "wb") as f:
f.write(response.content)
sql_connection_string = connection_url.replace("singlestoredb", "mysql+pymysql")
engine = create_engine(
f"{sql_connection_string}?ssl_ca={ca_cert_path}",
pool_size=10, # Maximum number of connections in the pool is 10
max_overflow=5, # Allow up to 5 additional connections (temporary overflow)
pool_timeout=30 # Wait up to 30 seconds for a connection from the pool
)
def execute_query(query: str):
with engine.connect() as connection:
return connection.execute(text(query))
def execute_transaction(transactional_query: str):
with engine.connect() as connection:
transaction = connection.begin()
try:
result = connection.execute(text(transactional_query))
transaction.commit()
return result
except Exception as e:
transaction.rollback()
raise e

Setup Environment

Lets setup the environment ro run a FastAPI app defining the Data Model and an executor to run the different requests in different threads simultaneously

In [3]:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from singlestoredb import connect
from concurrent.futures import ThreadPoolExecutor
import asyncio
# Define the Type of the Data
class Item(BaseModel):
id: int
name: str
price: float
# Create an executor that can execute queries on multiple threads simultaneously
executor = ThreadPoolExecutor()
def run_in_thread(fn, *args):
loop = asyncio.get_event_loop()
return loop.run_in_executor(executor, fn, *args)

Define FastAPI App

Next, we will be defining a FastAPI app that can insert, query and delete data from your table

In [4]:

app = FastAPI()
# Get all items
@app.get("/items", response_model=list[Item])
async def get_items():
def get_items_query():
result = execute_query("SELECT * FROM items;")
rows = result.fetchall()
return [{"id": row[0], "name": row[1], "price": row[2]} for row in rows]
try:
return await run_in_thread(get_items_query)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching all items: {str(e)}")
# Insert an item
@app.post("/items", response_model=dict)
async def create_item(item: Item):
def insert_item_query():
result = execute_transaction(f"INSERT INTO items (id, name, price) VALUES ({item.id}, '{item.name}', {item.price})")
return {"message": f"Item with id {item.id} inserted successfully"}
try:
return await run_in_thread(insert_item_query)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error while inserting item with id {item.id}: {str(e)}")
# Get item by id
@app.get("/items/{item_id}", response_model=Item)
async def get_item(item_id: int):
def get_item_query():
result = execute_query(f"SELECT * FROM items WHERE id={item_id}")
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Item not found")
return {"id": row[0], "name": row[1], "price": row[2]}
try:
return await run_in_thread(get_item_query)
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching item with id {item_id}: {str(e)}")
# Delete item by id
@app.delete("/items/{item_id}", response_model=dict)
async def delete_item(item_id: int):
def delete_item_query():
result = execute_transaction(f"DELETE FROM items WHERE id={item_id}")
return {"message": f"number of rows deleted: {result.rowcount}"}
try:
return await run_in_thread(delete_item_query)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error deleting item with id {item_id}: {str(e)}")

Start the FastAPI server

The link at which the cloud function will be available interactively will be displayed.

In [5]:

import singlestoredb.apps as apps
connection_info = await apps.run_function_app(app)

Publish Cloud Function

After validating the Cloud Function interactively, you can publish it and use it as an API server for your data!

Details


About this Template

Learn how to connect to SingleStoreDB and perform basicCRUD operations and finally deploy these functions as callable API endpoints.

Notebook Icon

This Notebook can be run in Shared Tier, Standard and Enterprise deployments.

Tags

starternotebookspython

License

This Notebook has been released under the Apache 2.0 open source license.