New

Publish your first SingleStore Cloud function

Notebook


SingleStore Notebooks

Publish your first SingleStore Cloud function

Note

This notebook can be run on a Free Starter Workspace. To create a Free Starter Workspace navigate to Start using the left nav. You can also use your existing Standard or Premium workspace with this Notebook.

This Jupyter notebook will help you build your first Cloud Function, showcasing how to leverage the ultra-fast queries of SingleStore to build a responsive API server using FastAPI

Create some simple tables

This setup establishes a basic relational structure to store some items information.

In [1]:

1%%sql2DROP TABLE IF EXISTS items;3
4CREATE TABLE IF NOT EXISTS5items (6    id INT PRIMARY KEY,7    name VARCHAR(255),8    price FLOAT9);

Create a Connection Pool

To run multiple simultaneous queries, we use sqlalchemy to create a pool of sql connections to the workspace you have selected. We also define a method to execute queries and transactions using a connection from this pool.

In [2]:

1from sqlalchemy import create_engine, text2import requests3
4ca_cert_url = "https://portal.singlestore.com/static/ca/singlestore_bundle.pem"5ca_cert_path = "/tmp/singlestore_bundle.pem"6
7response = requests.get(ca_cert_url)8with open(ca_cert_path, "wb") as f:9    f.write(response.content)10
11sql_connection_string = connection_url.replace("singlestoredb", "mysql+pymysql")12engine = create_engine(13    f"{sql_connection_string}?ssl_ca={ca_cert_path}",14    pool_size=10,           # Maximum number of connections in the pool is 1015    max_overflow=5,         # Allow up to 5 additional connections (temporary overflow)16    pool_timeout=30         # Wait up to 30 seconds for a connection from the pool17)18
19def execute_query(query: str):20    with engine.connect() as connection:21        return connection.execute(text(query))22
23def execute_transaction(transactional_query: str):24    with engine.connect() as connection:25        transaction = connection.begin()26        try:27            result = connection.execute(text(transactional_query))28            transaction.commit()29            return result30        except Exception as e:31            transaction.rollback()32            raise e

Setup Environment

Lets setup the environment ro run a FastAPI app defining the Data Model and an executor to run the different requests in different threads simultaneously

In [3]:

1from fastapi import FastAPI, HTTPException2from pydantic import BaseModel3from singlestoredb import connect4from concurrent.futures import ThreadPoolExecutor5import asyncio6
7# Define the Type of the Data8class Item(BaseModel):9    id: int10    name: str11    price: float12
13# Create an executor that can execute queries on multiple threads simultaneously14executor = ThreadPoolExecutor()15def run_in_thread(fn, *args):16    loop = asyncio.get_event_loop()17    return loop.run_in_executor(executor, fn, *args)

Define FastAPI App

Next, we will be defining a FastAPI app that can insert, query and delete data from your table

In [4]:

1app = FastAPI()2
3# Get all items4@app.get("/items", response_model=list[Item])5async def get_items():6    def get_items_query():7        result = execute_query("SELECT * FROM items;")8        rows = result.fetchall()9        return [{"id": row[0], "name": row[1], "price": row[2]} for row in rows]10
11    try:12        return await run_in_thread(get_items_query)13    except Exception as e:14        raise HTTPException(status_code=500, detail=f"Error fetching all items: {str(e)}")15
16# Insert an item17@app.post("/items", response_model=dict)18async def create_item(item: Item):19    def insert_item_query():20        result = execute_transaction(f"INSERT INTO items (id, name, price) VALUES ({item.id}, '{item.name}', {item.price})")21        return {"message": f"Item with id {item.id} inserted successfully"}22
23    try:24        return await run_in_thread(insert_item_query)25    except Exception as e:26        raise HTTPException(status_code=500, detail=f"Error while inserting item with id {item.id}: {str(e)}")27
28# Get item by id29@app.get("/items/{item_id}", response_model=Item)30async def get_item(item_id: int):31    def get_item_query():32        result = execute_query(f"SELECT * FROM items WHERE id={item_id}")33        row = result.fetchone()34        if not row:35            raise HTTPException(status_code=404, detail="Item not found")36        return {"id": row[0], "name": row[1], "price": row[2]}37
38    try:39        return await run_in_thread(get_item_query)40    except HTTPException as e:41        raise e42    except Exception as e:43        raise HTTPException(status_code=500, detail=f"Error fetching item with id {item_id}: {str(e)}")44
45# Delete item by id46@app.delete("/items/{item_id}", response_model=dict)47async def delete_item(item_id: int):48    def delete_item_query():49        result = execute_transaction(f"DELETE FROM items WHERE id={item_id}")50        return {"message": f"number of rows deleted: {result.rowcount}"}51
52    try:53        return await run_in_thread(delete_item_query)54    except Exception as e:55        raise HTTPException(status_code=500, detail=f"Error deleting item with id {item_id}: {str(e)}")

Start the FastAPI server

The link at which the cloud function will be available interactively will be displayed.

In [5]:

1import singlestoredb.apps as apps2connection_info = await apps.run_function_app(app)

Publish Cloud Function

After validating the Cloud Function interactively, you can publish it and use it as an API server for your data!

Details


About this Template

Learn how to connect to SingleStoreDB and perform basicCRUD operations and finally deploy these functions as callable API endpoints.

This Notebook can be run in Shared Tier, Standard and Enterprise deployments.

Tags

starternotebookspython

License

This Notebook has been released under the Apache 2.0 open source license.

See Notebook in action

Launch this notebook in SingleStore and start executing queries instantly.