Publish your first SingleStore Cloud function
Notebook
Note
This notebook can be run on a Free Starter Workspace. To create a Free Starter Workspace navigate to Start using the left nav. You can also use your existing Standard or Premium workspace with this Notebook.
This Jupyter notebook will help you build your first Cloud Function, showcasing how to leverage the ultra-fast queries of SingleStore to build a responsive API server using FastAPI
Create some simple tables
This setup establishes a basic relational structure to store some items information.
In [1]:
1%%sql2DROP TABLE IF EXISTS items;3 4CREATE TABLE IF NOT EXISTS5items (6 id INT PRIMARY KEY,7 name VARCHAR(255),8 price FLOAT9);
Importing logging module to enable live logs
In [2]:
1import logging
Create a Connection Pool
To run multiple simultaneous queries, we use sqlalchemy to create a pool of sql connections to the workspace you have selected. We also define a method to execute queries and transactions using a connection from this pool.
In [3]:
1from sqlalchemy import create_engine, text2import requests3 4ca_cert_url = "https://portal.singlestore.com/static/ca/singlestore_bundle.pem"5ca_cert_path = "/tmp/singlestore_bundle.pem"6 7response = requests.get(ca_cert_url)8with open(ca_cert_path, "wb") as f:9 f.write(response.content)10 11sql_connection_string = connection_url.replace("singlestoredb", "mysql+pymysql")12 13engine = create_engine(14 f"{sql_connection_string}?ssl_ca={ca_cert_path}",15 pool_size=10, # Maximum number of connections in the pool is 1016 max_overflow=5, # Allow up to 5 additional connections (temporary overflow)17 pool_timeout=30 # Wait up to 30 seconds for a connection from the pool18)19logging.info("Connection to workspace established successfully.")20 21 22def execute_query(query: str):23 logging.info(f"Executing query: {query}")24 with engine.connect() as connection:25 return connection.execute(text(query))26 27def execute_transaction(transactional_query: str):28 logging.info(f"Starting transaction for query: {transactional_query}")29 with engine.connect() as connection:30 transaction = connection.begin()31 try:32 result = connection.execute(text(transactional_query))33 transaction.commit()34 logging.info("Transaction committed successfully.")35 return result36 except Exception as e:37 transaction.rollback()38 logging.error(f"Transaction rolled back due to error: {e}")39 raise e
Setup Environment
Lets setup the environment ro run a FastAPI app defining the Data Model and an executor to run the different requests in different threads simultaneously
In [4]:
1from fastapi import FastAPI, HTTPException2from pydantic import BaseModel3from singlestoredb import connect4from concurrent.futures import ThreadPoolExecutor5import asyncio6 7# Define the Type of the Data8class Item(BaseModel):9 id: int10 name: str11 price: float12 13# Create an executor that can execute queries on multiple threads simultaneously14executor = ThreadPoolExecutor()15def run_in_thread(fn, *args):16 loop = asyncio.get_event_loop()17 return loop.run_in_executor(executor, fn, *args)
Define FastAPI App
Next, we will be defining a FastAPI app that can insert, query and delete data from your table
In [5]:
1app = FastAPI()2 3# add logging middleware4@app.middleware("http")5async def log_requests(request, call_next):6 logging.info(f"Incoming request: {request.method} {request.url}")7 response = await call_next(request)8 logging.info(f"Response status: {response.status_code}")9 return response10 11# Get all items12@app.get("/items", response_model=list[Item])13async def get_items():14 def get_items_query():15 result = execute_query("SELECT * FROM items;")16 rows = result.fetchall()17 logging.info(f"Fetched {len(rows)} items from the database")18 return [{"id": row[0], "name": row[1], "price": row[2]} for row in rows]19 20 try:21 return await run_in_thread(get_items_query)22 except Exception as e:23 logging.error(f"Error fetching all items: {str(e)}")24 raise HTTPException(status_code=500, detail=f"Error fetching all items: {str(e)}")25 26# Insert an item27@app.post("/items", response_model=dict)28async def create_item(item: Item):29 def insert_item_query():30 logging.info(f"Inserting item: {item}")31 result = execute_transaction(f"INSERT INTO items (id, name, price) VALUES ({item.id}, '{item.name}', {item.price})")32 logging.info(f"Item with id {item.id} inserted successfully")33 return {"message": f"Item with id {item.id} inserted successfully"}34 35 try:36 return await run_in_thread(insert_item_query)37 except Exception as e:38 logging.error(f"Error while inserting item with id {item.id}: {str(e)}")39 raise HTTPException(status_code=500, detail=f"Error while inserting item with id {item.id}: {str(e)}")40 41# Get item by id42@app.get("/items/{item_id}", response_model=Item)43async def get_item(item_id: int):44 def get_item_query():45 result = execute_query(f"SELECT * FROM items WHERE id={item_id}")46 row = result.fetchone()47 if not row:48 logging.error(f"Item with id {item_id} not found")49 raise HTTPException(status_code=404, detail="Item not found")50 logging.info(f"Item with id {item_id} fetched successfully")51 return {"id": row[0], "name": row[1], "price": row[2]}52 53 try:54 return await run_in_thread(get_item_query)55 except HTTPException as e:56 raise e57 except Exception as e:58 logging.error(f"Error fetching item with id {item_id}: {str(e)}")59 raise HTTPException(status_code=500, detail=f"Error fetching item with id {item_id}: {str(e)}")60 61# Delete item by id62@app.delete("/items/{item_id}", response_model=dict)63async def delete_item(item_id: int):64 logging.info(f"Deleting item with id {item_id}")65 def delete_item_query():66 result = execute_transaction(f"DELETE FROM items WHERE id={item_id}")67 logging.info(f"Number of rows deleted: {result.rowcount}")68 return {"message": f"number of rows deleted: {result.rowcount}"}69 70 try:71 return await run_in_thread(delete_item_query)72 except Exception as e:73 logging.error(f"Error deleting item with id {item_id}: {str(e)}")74 raise HTTPException(status_code=500, detail=f"Error deleting item with id {item_id}: {str(e)}")
Start the FastAPI server
The link at which the cloud function will be available interactively will be displayed.
In [6]:
1import singlestoredb.apps as apps2connection_info = await apps.run_function_app(app)
Publish Cloud Function
After validating the Cloud Function interactively, you can publish it and use it as an API server for your data!

Details
About this Template
Learn how to connect to SingleStoreDB and perform basicCRUD operations and finally deploy these functions as callable API endpoints.
This Notebook can be run in Shared Tier, Standard and Enterprise deployments.
Tags
License
This Notebook has been released under the Apache 2.0 open source license.
See Notebook in action
Launch this notebook in SingleStore and start executing queries instantly.