ingestion.ingest_utils
Contains the utility functions for the ingestion lambda_handler.
1""" 2Contains the utility functions for the ingestion lambda_handler. 3""" 4 5import datetime 6import json 7from datetime import timezone 8 9import boto3 10from pg8000.native import identifier 11 12from src.utils.db_connection import close_conn, create_conn 13from src.utils.default_serialiser import default_serialiser 14 15 16def extract_data(table_name): 17 """ 18 This function connects to the database whose credentials are stored as environment variables and selects all the information in the given table. 19 20 # Arguments: 21 table_name: a string representing the name of the table in the database that we want to extract. 22 23 # Returns: 24 A list of dictionaries where each dictionary represents a single row in the given table and the keys are the column names in the given table. 25 26 # Raises: 27 RuntimeError: An error occurred during data extraction. 28 """ 29 30 query = f"SELECT * FROM {identifier(table_name)}" 31 32 conn = create_conn() 33 34 if conn: 35 try: 36 data = conn.run(query) 37 columns = [column["name"] for column in conn.columns] 38 result = [dict(zip(columns, row)) for row in data] 39 return result 40 except Exception as e: 41 raise RuntimeError(f"Database query failed: {e}") 42 finally: 43 close_conn(conn) 44 45 46def convert_to_json(data): 47 """ 48 This function converts a list of dictionaries that the extract_data function returns into a json object. It is functionally identical to the json.dumps method, with a specified function for the default argument. 49 50 # Arguments: 51 data: a list of dictionaries. 52 53 # Returns: 54 A json object. 55 """ 56 57 return json.dumps(data, default=default_serialiser) 58 59 60def upload_to_s3(data, bucket_name, table_name): 61 """ 62 This function takes a json object and uploads it to a given bucket with a key that includes table name and datestamp. 63 64 # Arguments: 65 data: a json object containing the data for a table in the database. 66 bucket_name: a string representing the name of the s3 bucket to upload to. 67 table_name: a string representing the table whose data we are uploading. 68 69 # Returns: 70 A message confirming successful upload and showing the full location. 71 """ 72 73 s3 = boto3.client("s3") 74 75 now = datetime.datetime.now(timezone.utc) 76 date_path = now.strftime("%Y/%m/%d") 77 timestamp = now.strftime("%Y%m%dT%H%M%SZ") 78 79 key = f"{table_name}/{date_path}/{table_name}-{timestamp}.json" 80 81 try: 82 s3.put_object( 83 Bucket=bucket_name, Key=key, Body=data, ContentType="application/json" 84 ) 85 86 message = f"Uploaded to s3://{bucket_name}/{key}" 87 print(message) 88 return message 89 90 except Exception as e: 91 raise RuntimeError(f"Database query failed: {e}") 92 93 94def ingest(table_name, bucket_name): 95 """ 96 This function calls extract_data, and converts the data to json through the convert_to_json function. It then uploads the data into the given s3 bucket. 97 98 # Arguments: 99 table_name: a string representing the name of the table the data is from. 100 bucket_name: a string representing the name of the s3 bucket that is being uploaded to. 101 102 # Return: 103 A string indicating successful extraction of the data. 104 105 # Raises: 106 RuntimeError: An error occurred during data extraction. 107 """ 108 109 try: 110 extracted_data = extract_data(table_name) 111 112 converted_data = convert_to_json(extracted_data) 113 114 upload_to_s3(converted_data, bucket_name, table_name) 115 116 return "Ingestion successful" 117 118 except Exception as e: 119 raise RuntimeError(f"Ingestion failed: {e}")
def
extract_data(table_name):
17def extract_data(table_name): 18 """ 19 This function connects to the database whose credentials are stored as environment variables and selects all the information in the given table. 20 21 # Arguments: 22 table_name: a string representing the name of the table in the database that we want to extract. 23 24 # Returns: 25 A list of dictionaries where each dictionary represents a single row in the given table and the keys are the column names in the given table. 26 27 # Raises: 28 RuntimeError: An error occurred during data extraction. 29 """ 30 31 query = f"SELECT * FROM {identifier(table_name)}" 32 33 conn = create_conn() 34 35 if conn: 36 try: 37 data = conn.run(query) 38 columns = [column["name"] for column in conn.columns] 39 result = [dict(zip(columns, row)) for row in data] 40 return result 41 except Exception as e: 42 raise RuntimeError(f"Database query failed: {e}") 43 finally: 44 close_conn(conn)
This function connects to the database whose credentials are stored as environment variables and selects all the information in the given table.
Arguments:
table_name: a string representing the name of the table in the database that we want to extract.
Returns:
A list of dictionaries where each dictionary represents a single row in the given table and the keys are the column names in the given table.
Raises:
RuntimeError: An error occurred during data extraction.
def
convert_to_json(data):
47def convert_to_json(data): 48 """ 49 This function converts a list of dictionaries that the extract_data function returns into a json object. It is functionally identical to the json.dumps method, with a specified function for the default argument. 50 51 # Arguments: 52 data: a list of dictionaries. 53 54 # Returns: 55 A json object. 56 """ 57 58 return json.dumps(data, default=default_serialiser)
This function converts a list of dictionaries that the extract_data function returns into a json object. It is functionally identical to the json.dumps method, with a specified function for the default argument.
Arguments:
data: a list of dictionaries.
Returns:
A json object.
def
upload_to_s3(data, bucket_name, table_name):
61def upload_to_s3(data, bucket_name, table_name): 62 """ 63 This function takes a json object and uploads it to a given bucket with a key that includes table name and datestamp. 64 65 # Arguments: 66 data: a json object containing the data for a table in the database. 67 bucket_name: a string representing the name of the s3 bucket to upload to. 68 table_name: a string representing the table whose data we are uploading. 69 70 # Returns: 71 A message confirming successful upload and showing the full location. 72 """ 73 74 s3 = boto3.client("s3") 75 76 now = datetime.datetime.now(timezone.utc) 77 date_path = now.strftime("%Y/%m/%d") 78 timestamp = now.strftime("%Y%m%dT%H%M%SZ") 79 80 key = f"{table_name}/{date_path}/{table_name}-{timestamp}.json" 81 82 try: 83 s3.put_object( 84 Bucket=bucket_name, Key=key, Body=data, ContentType="application/json" 85 ) 86 87 message = f"Uploaded to s3://{bucket_name}/{key}" 88 print(message) 89 return message 90 91 except Exception as e: 92 raise RuntimeError(f"Database query failed: {e}")
This function takes a json object and uploads it to a given bucket with a key that includes table name and datestamp.
Arguments:
data: a json object containing the data for a table in the database.
bucket_name: a string representing the name of the s3 bucket to upload to.
table_name: a string representing the table whose data we are uploading.
Returns:
A message confirming successful upload and showing the full location.
def
ingest(table_name, bucket_name):
95def ingest(table_name, bucket_name): 96 """ 97 This function calls extract_data, and converts the data to json through the convert_to_json function. It then uploads the data into the given s3 bucket. 98 99 # Arguments: 100 table_name: a string representing the name of the table the data is from. 101 bucket_name: a string representing the name of the s3 bucket that is being uploaded to. 102 103 # Return: 104 A string indicating successful extraction of the data. 105 106 # Raises: 107 RuntimeError: An error occurred during data extraction. 108 """ 109 110 try: 111 extracted_data = extract_data(table_name) 112 113 converted_data = convert_to_json(extracted_data) 114 115 upload_to_s3(converted_data, bucket_name, table_name) 116 117 return "Ingestion successful" 118 119 except Exception as e: 120 raise RuntimeError(f"Ingestion failed: {e}")
This function calls extract_data, and converts the data to json through the convert_to_json function. It then uploads the data into the given s3 bucket.
Arguments:
table_name: a string representing the name of the table the data is from.
bucket_name: a string representing the name of the s3 bucket that is being uploaded to.
Return:
A string indicating successful extraction of the data.
Raises:
RuntimeError: An error occurred during data extraction.