load.load_lambda

Contains the main function used by the Load Lambda AWS resource.

 1"""
 2Contains the main function used by the Load Lambda AWS resource.
 3"""
 4
 5import json
 6import logging
 7import os
 8
 9import requests
10
11from src.load.load_utils import (
12    access_files_from_processed_bucket,
13    load_dim_counterparty_into_warehouse,
14    load_dim_currency_into_warehouse,
15    load_dim_dates_into_warehouse,
16    load_dim_design_into_warehouse,
17    load_dim_location_into_warehouse,
18    load_dim_staff_into_warehouse,
19    load_fact_sales_order_into_warehouse,
20)
21
22
23def lambda_handler(event, context):
24    """
25    This function will get warehouse credentials from AWS Secrets Manager, import the most recent transformed data from s3 and load it into the warehouse.
26
27    # Returns:
28        A message with status code 200 on successful loading into the warehouse.
29        A message with status code 500 on unsuccessful loading of the data.
30    """
31    logger = logging.getLogger()
32    logger.setLevel(logging.INFO)
33
34    try:
35        secret_name = (
36            "arn:aws:secretsmanager:eu-west-2:267414915338:secret:Totesys_Warehouse_Credentials-4OiHJW"
37        )
38
39        secrets_extension_endpoint = (
40            f"http://localhost:2773/secretsmanager/get?secretId={secret_name}"
41        )
42        headers = {
43            "X-Aws-Parameters-Secrets-Token": os.environ.get("AWS_SESSION_TOKEN")
44        }
45
46        response = requests.get(secrets_extension_endpoint, headers=headers)
47        logger.info(f"Response status code: {response.status_code}")
48
49        secret = json.loads(response.text)["SecretString"]
50        secret = json.loads(secret)
51
52        os.environ["DBUSER"] = secret["user"]
53        os.environ["DBNAME"] = secret["database"]
54        os.environ["DBPASSWORD"] = secret["password"]
55        os.environ["PORT"] = secret["port"]
56        os.environ["HOST"] = secret["host"]
57
58        # Only 7 out of 11 tables included to match mock database
59        # To extract ALL tables include missing table names
60        table_names = {
61            "dim_design": load_dim_design_into_warehouse,
62            "dim_currency": load_dim_currency_into_warehouse,
63            "dim_location": load_dim_location_into_warehouse,
64            "dim_date": load_dim_dates_into_warehouse,
65            "dim_staff": load_dim_staff_into_warehouse,
66            "dim_counterparty": load_dim_counterparty_into_warehouse,
67            "fact_sales_order": load_fact_sales_order_into_warehouse,
68        }
69
70        for table_name in table_names:
71            df = access_files_from_processed_bucket(
72                table_name, os.environ["TRANSFORM_BUCKET_NAME"]
73            )
74            logger.info(f"Extracted data from processed bucket for table {table_name}.")
75            table_names[table_name](df)
76            logger.info(f"Loaded {table_name} to the warehouse.")
77
78        return {
79            "statusCode": 200,
80            "body": json.dumps({"message": "Data successfully loaded"}),
81        }
82
83    except Exception as e:
84        logger.error(f"Error: {str(e)}")
85        return {
86            "statusCode": 500,
87            "body": json.dumps({"message": "Error!", "error": str(e)}),
88        }
def lambda_handler(event, context):
24def lambda_handler(event, context):
25    """
26    This function will get warehouse credentials from AWS Secrets Manager, import the most recent transformed data from s3 and load it into the warehouse.
27
28    # Returns:
29        A message with status code 200 on successful loading into the warehouse.
30        A message with status code 500 on unsuccessful loading of the data.
31    """
32    logger = logging.getLogger()
33    logger.setLevel(logging.INFO)
34
35    try:
36        secret_name = (
37            "arn:aws:secretsmanager:eu-west-2:267414915338:secret:Totesys_Warehouse_Credentials-4OiHJW"
38        )
39
40        secrets_extension_endpoint = (
41            f"http://localhost:2773/secretsmanager/get?secretId={secret_name}"
42        )
43        headers = {
44            "X-Aws-Parameters-Secrets-Token": os.environ.get("AWS_SESSION_TOKEN")
45        }
46
47        response = requests.get(secrets_extension_endpoint, headers=headers)
48        logger.info(f"Response status code: {response.status_code}")
49
50        secret = json.loads(response.text)["SecretString"]
51        secret = json.loads(secret)
52
53        os.environ["DBUSER"] = secret["user"]
54        os.environ["DBNAME"] = secret["database"]
55        os.environ["DBPASSWORD"] = secret["password"]
56        os.environ["PORT"] = secret["port"]
57        os.environ["HOST"] = secret["host"]
58
59        # Only 7 out of 11 tables included to match mock database
60        # To extract ALL tables include missing table names
61        table_names = {
62            "dim_design": load_dim_design_into_warehouse,
63            "dim_currency": load_dim_currency_into_warehouse,
64            "dim_location": load_dim_location_into_warehouse,
65            "dim_date": load_dim_dates_into_warehouse,
66            "dim_staff": load_dim_staff_into_warehouse,
67            "dim_counterparty": load_dim_counterparty_into_warehouse,
68            "fact_sales_order": load_fact_sales_order_into_warehouse,
69        }
70
71        for table_name in table_names:
72            df = access_files_from_processed_bucket(
73                table_name, os.environ["TRANSFORM_BUCKET_NAME"]
74            )
75            logger.info(f"Extracted data from processed bucket for table {table_name}.")
76            table_names[table_name](df)
77            logger.info(f"Loaded {table_name} to the warehouse.")
78
79        return {
80            "statusCode": 200,
81            "body": json.dumps({"message": "Data successfully loaded"}),
82        }
83
84    except Exception as e:
85        logger.error(f"Error: {str(e)}")
86        return {
87            "statusCode": 500,
88            "body": json.dumps({"message": "Error!", "error": str(e)}),
89        }

This function will get warehouse credentials from AWS Secrets Manager, import the most recent transformed data from s3 and load it into the warehouse.

Returns:

A message with status code 200 on successful loading into the warehouse.
A message with status code 500 on unsuccessful loading of the data.