transform.transform_lambda

Contains the main function used by the Transform Lambda AWS resource.

 1"""
 2Contains the main function used by the Transform Lambda AWS resource.
 3"""
 4
 5import json
 6import logging
 7import os
 8
 9from src.transform.transform_utils import (
10    get_all_table_data_from_ingest_bucket,
11    transform_dim_counterparty,
12    transform_dim_currency,
13    transform_dim_date,
14    transform_dim_design,
15    transform_dim_location,
16    transform_dim_staff,
17    transform_fact_sales_order,
18    upload_to_s3,
19)
20
21
22def lambda_handler(event, context):
23    """
24    This function will run the transform function on all tables in the ingestion bucket and upload them as parquet to the processed bucket.
25
26    # Returns:
27        A message with status code 200 on successful input into the processed bucket.
28        A message with status code 500 on an unsuccessful attempt.
29
30    """
31
32    logger = logging.getLogger()
33    logger.setLevel(logging.INFO)
34
35    try:
36
37        ingested_data = get_all_table_data_from_ingest_bucket()
38        logger.info("Extracted data from ingestion bucket.")
39
40        fact_sales_order = transform_fact_sales_order(ingested_data["sales_order"])
41        dim_design = transform_dim_design(ingested_data["design"])
42        dim_currency = transform_dim_currency(ingested_data["currency"])
43        dim_location = transform_dim_location(ingested_data["address"])
44        dim_date = transform_dim_date(fact_sales_order)
45        dim_staff = transform_dim_staff(
46            ingested_data["staff"], ingested_data["department"]
47        )
48        dim_counterparty = transform_dim_counterparty(
49            ingested_data["counterparty"], ingested_data["address"]
50        )
51        logger.info("Transformation of ingested data complete.")
52
53        table_names = {
54            "fact_sales_order": fact_sales_order,
55            "dim_design": dim_design,
56            "dim_currency": dim_currency,
57            "dim_location": dim_location,
58            "dim_date": dim_date,
59            "dim_staff": dim_staff,
60            "dim_counterparty": dim_counterparty,
61        }
62
63        for k, v in table_names.items():
64            upload_to_s3(v, os.environ["TRANSFORM_BUCKET_NAME"], k)
65            logger.info(f"Uploaded transformed data to S3 for table {k}.")
66
67        return {
68            "statusCode": 200,
69            "body": json.dumps({"message": "Data successfully transformed"}),
70        }
71
72    except Exception as e:
73        logger.error(f"Error: {str(e)}")
74        return {
75            "statusCode": 500,
76            "body": json.dumps({"message": "Error!", "error": str(e)}),
77        }
def lambda_handler(event, context):
23def lambda_handler(event, context):
24    """
25    This function will run the transform function on all tables in the ingestion bucket and upload them as parquet to the processed bucket.
26
27    # Returns:
28        A message with status code 200 on successful input into the processed bucket.
29        A message with status code 500 on an unsuccessful attempt.
30
31    """
32
33    logger = logging.getLogger()
34    logger.setLevel(logging.INFO)
35
36    try:
37
38        ingested_data = get_all_table_data_from_ingest_bucket()
39        logger.info("Extracted data from ingestion bucket.")
40
41        fact_sales_order = transform_fact_sales_order(ingested_data["sales_order"])
42        dim_design = transform_dim_design(ingested_data["design"])
43        dim_currency = transform_dim_currency(ingested_data["currency"])
44        dim_location = transform_dim_location(ingested_data["address"])
45        dim_date = transform_dim_date(fact_sales_order)
46        dim_staff = transform_dim_staff(
47            ingested_data["staff"], ingested_data["department"]
48        )
49        dim_counterparty = transform_dim_counterparty(
50            ingested_data["counterparty"], ingested_data["address"]
51        )
52        logger.info("Transformation of ingested data complete.")
53
54        table_names = {
55            "fact_sales_order": fact_sales_order,
56            "dim_design": dim_design,
57            "dim_currency": dim_currency,
58            "dim_location": dim_location,
59            "dim_date": dim_date,
60            "dim_staff": dim_staff,
61            "dim_counterparty": dim_counterparty,
62        }
63
64        for k, v in table_names.items():
65            upload_to_s3(v, os.environ["TRANSFORM_BUCKET_NAME"], k)
66            logger.info(f"Uploaded transformed data to S3 for table {k}.")
67
68        return {
69            "statusCode": 200,
70            "body": json.dumps({"message": "Data successfully transformed"}),
71        }
72
73    except Exception as e:
74        logger.error(f"Error: {str(e)}")
75        return {
76            "statusCode": 500,
77            "body": json.dumps({"message": "Error!", "error": str(e)}),
78        }

This function will run the transform function on all tables in the ingestion bucket and upload them as parquet to the processed bucket.

Returns:

A message with status code 200 on successful input into the processed bucket.
A message with status code 500 on an unsuccessful attempt.