load.load_lambda
Contains the main function used by the Load Lambda AWS resource.
1""" 2Contains the main function used by the Load Lambda AWS resource. 3""" 4 5import json 6import logging 7import os 8 9import requests 10 11from src.load.load_utils import ( 12 access_files_from_processed_bucket, 13 load_dim_counterparty_into_warehouse, 14 load_dim_currency_into_warehouse, 15 load_dim_dates_into_warehouse, 16 load_dim_design_into_warehouse, 17 load_dim_location_into_warehouse, 18 load_dim_staff_into_warehouse, 19 load_fact_sales_order_into_warehouse, 20) 21 22 23def lambda_handler(event, context): 24 """ 25 This function will get warehouse credentials from AWS Secrets Manager, import the most recent transformed data from s3 and load it into the warehouse. 26 27 # Returns: 28 A message with status code 200 on successful loading into the warehouse. 29 A message with status code 500 on unsuccessful loading of the data. 30 """ 31 logger = logging.getLogger() 32 logger.setLevel(logging.INFO) 33 34 try: 35 secret_name = ( 36 "arn:aws:secretsmanager:eu-west-2:267414915338:secret:Totesys_Warehouse_Credentials-4OiHJW" 37 ) 38 39 secrets_extension_endpoint = ( 40 f"http://localhost:2773/secretsmanager/get?secretId={secret_name}" 41 ) 42 headers = { 43 "X-Aws-Parameters-Secrets-Token": os.environ.get("AWS_SESSION_TOKEN") 44 } 45 46 response = requests.get(secrets_extension_endpoint, headers=headers) 47 logger.info(f"Response status code: {response.status_code}") 48 49 secret = json.loads(response.text)["SecretString"] 50 secret = json.loads(secret) 51 52 os.environ["DBUSER"] = secret["user"] 53 os.environ["DBNAME"] = secret["database"] 54 os.environ["DBPASSWORD"] = secret["password"] 55 os.environ["PORT"] = secret["port"] 56 os.environ["HOST"] = secret["host"] 57 58 # Only 7 out of 11 tables included to match mock database 59 # To extract ALL tables include missing table names 60 table_names = { 61 "dim_design": load_dim_design_into_warehouse, 62 "dim_currency": load_dim_currency_into_warehouse, 63 "dim_location": load_dim_location_into_warehouse, 64 "dim_date": load_dim_dates_into_warehouse, 65 "dim_staff": load_dim_staff_into_warehouse, 66 "dim_counterparty": load_dim_counterparty_into_warehouse, 67 "fact_sales_order": load_fact_sales_order_into_warehouse, 68 } 69 70 for table_name in table_names: 71 df = access_files_from_processed_bucket( 72 table_name, os.environ["TRANSFORM_BUCKET_NAME"] 73 ) 74 logger.info(f"Extracted data from processed bucket for table {table_name}.") 75 table_names[table_name](df) 76 logger.info(f"Loaded {table_name} to the warehouse.") 77 78 return { 79 "statusCode": 200, 80 "body": json.dumps({"message": "Data successfully loaded"}), 81 } 82 83 except Exception as e: 84 logger.error(f"Error: {str(e)}") 85 return { 86 "statusCode": 500, 87 "body": json.dumps({"message": "Error!", "error": str(e)}), 88 }
def
lambda_handler(event, context):
24def lambda_handler(event, context): 25 """ 26 This function will get warehouse credentials from AWS Secrets Manager, import the most recent transformed data from s3 and load it into the warehouse. 27 28 # Returns: 29 A message with status code 200 on successful loading into the warehouse. 30 A message with status code 500 on unsuccessful loading of the data. 31 """ 32 logger = logging.getLogger() 33 logger.setLevel(logging.INFO) 34 35 try: 36 secret_name = ( 37 "arn:aws:secretsmanager:eu-west-2:267414915338:secret:Totesys_Warehouse_Credentials-4OiHJW" 38 ) 39 40 secrets_extension_endpoint = ( 41 f"http://localhost:2773/secretsmanager/get?secretId={secret_name}" 42 ) 43 headers = { 44 "X-Aws-Parameters-Secrets-Token": os.environ.get("AWS_SESSION_TOKEN") 45 } 46 47 response = requests.get(secrets_extension_endpoint, headers=headers) 48 logger.info(f"Response status code: {response.status_code}") 49 50 secret = json.loads(response.text)["SecretString"] 51 secret = json.loads(secret) 52 53 os.environ["DBUSER"] = secret["user"] 54 os.environ["DBNAME"] = secret["database"] 55 os.environ["DBPASSWORD"] = secret["password"] 56 os.environ["PORT"] = secret["port"] 57 os.environ["HOST"] = secret["host"] 58 59 # Only 7 out of 11 tables included to match mock database 60 # To extract ALL tables include missing table names 61 table_names = { 62 "dim_design": load_dim_design_into_warehouse, 63 "dim_currency": load_dim_currency_into_warehouse, 64 "dim_location": load_dim_location_into_warehouse, 65 "dim_date": load_dim_dates_into_warehouse, 66 "dim_staff": load_dim_staff_into_warehouse, 67 "dim_counterparty": load_dim_counterparty_into_warehouse, 68 "fact_sales_order": load_fact_sales_order_into_warehouse, 69 } 70 71 for table_name in table_names: 72 df = access_files_from_processed_bucket( 73 table_name, os.environ["TRANSFORM_BUCKET_NAME"] 74 ) 75 logger.info(f"Extracted data from processed bucket for table {table_name}.") 76 table_names[table_name](df) 77 logger.info(f"Loaded {table_name} to the warehouse.") 78 79 return { 80 "statusCode": 200, 81 "body": json.dumps({"message": "Data successfully loaded"}), 82 } 83 84 except Exception as e: 85 logger.error(f"Error: {str(e)}") 86 return { 87 "statusCode": 500, 88 "body": json.dumps({"message": "Error!", "error": str(e)}), 89 }
This function will get warehouse credentials from AWS Secrets Manager, import the most recent transformed data from s3 and load it into the warehouse.
Returns:
A message with status code 200 on successful loading into the warehouse.
A message with status code 500 on unsuccessful loading of the data.