New

Backup Database to AWS S3

Notebook

SingleStore Notebooks

Backup Database to AWS S3

Intro

Introducing a powerful Python notebook designed to simplify performing database backups on schedule

What you will learn in this notebook:

  1. How to backup database to AWS S3 [SQL]

What benefits do you get out of using the notebook.

  1. This notebook explains how we use SingleStore secrets feature to provide the configuration parameters we can control backup process (either full or incremental) in scheduled environment

Questions?

Reach out to us through our forum.

Pre-requisites

We will need below parameters to proceed.

  1. To access AWS S3, we need AWS Access key ID,AWS Secret access key, Aws Session Token

  2. Database User should have 'BACKUP', 'OUTBOUND', 'PROCESS' grant

  3. S3 Path provided should not exist [ bucket should exists, remaining path will be created if not existing for initial backup]

Note:

  1. check user grants by running 'show grants'.

  2. S3 Path if not exists, will be created by singlestore.

  3. General format is 'database_name.backup'.

  4. AWS IAM user should have S3 read,   write access

Imports

In [1]:

import io
import logging
import time
import getpass
import singlestoredb as s2
from IPython.display import display, HTML

Variables

In [2]:

aws_key_id = None
aws_secret_key = None
aws_region = 'us-east-1'
s3_target_path = None
aws_session_token = None
is_incremental_backup = 'N'

Functions to display various alerts

In [3]:

def show_warn(warn_msg):
"""
Display a warning message in a formatted HTML alert box.
Parameters
----------
warn_msg : str
The warning message to display.
"""
display(HTML(f'''<div class="alert alert-block alert-warning">
<b class="fa fa-solid fa-exclamation-circle"></b>
<div>
<p><b>Action Required</b></p>
<p>{warn_msg}</p>
</div>
</div>'''))
def show_error(error_msg):
"""
Display an error message in a formatted HTML alert box.
Parameters
----------
error_msg : str
The error message to display.
"""
display(HTML(f'''<div class="alert alert-block alert-danger">
<b class="fa fa-solid fa-exclamation-triangle"></b>
<div>
<p><b>Error</b></p>
<p>{error_msg}</p>
</div>
</div>'''))
def show_success(success_msg):
"""
Display a success message in a formatted HTML alert box.
Parameters
----------
success_msg : str
The success message to display.
"""
display(HTML(f'''<div class="alert alert-block alert-success">
<b class="fa fa-solid fa-check-circle"></b>
<div>
<p><b>Success</b></p>
<p>{success_msg}</p>
</div>
</div>'''))

LogControl

Note

To enable logs

  • Modify 'enable_debug_log(False)' to 'enable_debug_log(True)' in code below

In [4]:

def enable_debug_log(enabled):
if enabled:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.ERROR)

Utility functions for handling S3 PATHs, SQL Statement, backup

In [5]:

def get_bkp_path(s3_path, db_name, do_increment):
"""
Get the backup path based on the type of backup (incremental or initial).
Parameters
----------
s3_path : str
The base S3 path for backups.
db_name : str
The name of the database.
do_increment: str
'Y' for incremental backup, else Full Backup
Returns
-------
str
The final backup path.
"""
if do_increment == 'Y':
logging.info('Is an incremental backup, will use exact path')
return s3_path
else:
logging.info('Is an initial backup, will use time appended path')
t = time.localtime(time.time())
my_path = f'{s3_path}/{db_name}/{t.tm_year}-{t.tm_mon:02d}-{t.tm_mday:02d}/{t.tm_hour:02d}{t.tm_min:02d}{t.tm_sec:02d}/'
logging.info(f'Backup Path : {my_path}')
print(f'Backup Path : {my_path}')
return my_path
def get_sql_statement(db_name_to_bkp, is_incremental_backup):
"""
Get the SQL statement for backing up a database.
Parameters
----------
db_name_to_bkp : str
The name of the database to backup.
is_incremental_backup : str
is incremental backup.
Returns
-------
str
The SQL statement for backup.
"""
global aws_key_id, aws_secret_key, aws_region, s3_target_path, aws_session_token
aws_key_id = (input('Enter AWS_API_KEY_ID:') if aws_key_id == None else aws_key_id)
aws_secret_key = (getpass.getpass('Enter AWS_API_SECRET:') if aws_secret_key == None else aws_secret_key)
aws_region = (input('Enter AWS_REGION:') if aws_region == None else aws_region)
s3_target_path = (input('Enter AWS S3 Path:') if s3_target_path == None else s3_target_path)
aws_session_token = (input('Enter AWS_SESSION_TOKEN:') if aws_session_token == None else aws_session_token)
data = io.StringIO()
data.write('BACKUP DATABASE ' + db_name_to_bkp + ' ')
if is_incremental_backup == 'Y':
data.write(' WITH DIFFERENTIAL ')
else:
data.write(' WITH INIT ')
data.write(' TO S3 "' + get_bkp_path(s3_target_path, db_name_to_bkp, is_incremental_backup) + '" ')
data.write(' CONFIG \'{"region":"' + aws_region + '"}\'')
data.write(' CREDENTIALS \'{"aws_access_key_id":"' + aws_key_id
+ '","aws_secret_access_key":"' + aws_secret_key + '"' )
if aws_session_token != '':
data.write(', "aws_session_token":"' + aws_session_token + '" ')
data.write('}\' ')
logging.debug(f'statement: {data.getvalue()}')
return data.getvalue()
def perform_backup(my_cursor, curr_db_name, do_incremental):
"""
Perform a database backup.
Parameters
----------
my_cursor : cursor
The database cursor.
curr_db_name : str
The name of the database to backup.
do_incremental: str
'Y' to perform incremental backup
"""
logging.debug(f'backing up db {curr_db_name}')
my_cursor.execute(get_sql_statement(curr_db_name, do_incremental))
results = cursor.fetchall()
if results is None:
logging.error('Backup execution failed')
else:
logging.info("Backup completed")

In [6]:

enable_debug_log(True)
print(connection_url)
try:
if connection_url.endswith('/') or connection_url.endswith('information_schema'):
#Hanlde case when database not selected or information_schema selected
#Connect to information schema and backup all databases
logging.debug('No database selected, will use information_schema and back up all databases')
my_db_url = connection_url
if connection_url.endswith('/'):
my_db_url = my_db_url + 'information_schema'
logging.debug(f'connection url updated {my_db_url}')
conn = s2.connect(my_db_url, results_type='dict')
with conn.cursor() as cursor:
# Get a list of databases to backup
cursor.execute(
"SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN ( 'cluster', 'memsql', 'information_schema' );")
for row in cursor.fetchall():
logging.debug(f"processing db {row['schema_name']}")
# Backup each database
perform_backup(my_cursor=cursor, curr_db_name=row['schema_name'], do_incremental='N')
logging.debug(f"processing db {row['schema_name']} complete")
else:
#Connect to selected database and take its backup
database_to_bkp = connection_url.split('/')[-1]
# Establish a connection to the database
conn = s2.connect(results_type='dict')
with conn.cursor() as cursor:
perform_backup(my_cursor=cursor, curr_db_name=database_to_bkp, do_incremental=is_incremental_backup)
show_success('Backup Process Completed')
except s2.exceptions.OperationalError as ope:
# Handle specific operational errors
if 'NoSuchBucket' in ope.errmsg:
logging.error('Provided S3 Bucket does not exist. Please check.')
show_error('Provided S3 Bucket does not exist. Please check.')
elif 'Access denied' in ope.errmsg:
logging.error('Failed to backup due to missing grants or firewall settings. Please check.')
show_error('Failed to backup due to missing grants or firewall settings. Please check.')
else:
logging.error(f'Failed. Error message: {ope.errmsg}')
show_error(f'Failed to backup. {ope.errmsg}')
except s2.Error as e:
# Handle any other errors
print(f'Encountered exception {e}')
logging.exception(e)
show_error(f'Failed to backup. {str(e)}')
print('\n\nScript execution completed')

Verify Result

If script executed without errors. please check the S3 bucket for uploaded files ( Backup Path is printed to console )

General format is 'database_name.backup' or 'database_name.incr_backup'.

You may use below query to check backups created ( apply filter to limit data as per your needs )

select * from information_schema.MV_BACKUP_HISTORY

Important Note

  • To use this as scheduled notebook, we have to modify to read configuration data from table instead of user input

Details

Tags

#starter#backup

License

This Notebook has been released under the Apache 2.0 open source license.