transform.transform_lambda
Contains the main function used by the Transform Lambda AWS resource.
1""" 2Contains the main function used by the Transform Lambda AWS resource. 3""" 4 5import json 6import logging 7import os 8 9from src.transform.transform_utils import ( 10 get_all_table_data_from_ingest_bucket, 11 transform_dim_counterparty, 12 transform_dim_currency, 13 transform_dim_date, 14 transform_dim_design, 15 transform_dim_location, 16 transform_dim_staff, 17 transform_fact_sales_order, 18 upload_to_s3, 19) 20 21 22def lambda_handler(event, context): 23 """ 24 This function will run the transform function on all tables in the ingestion bucket and upload them as parquet to the processed bucket. 25 26 # Returns: 27 A message with status code 200 on successful input into the processed bucket. 28 A message with status code 500 on an unsuccessful attempt. 29 30 """ 31 32 logger = logging.getLogger() 33 logger.setLevel(logging.INFO) 34 35 try: 36 37 ingested_data = get_all_table_data_from_ingest_bucket() 38 logger.info("Extracted data from ingestion bucket.") 39 40 fact_sales_order = transform_fact_sales_order(ingested_data["sales_order"]) 41 dim_design = transform_dim_design(ingested_data["design"]) 42 dim_currency = transform_dim_currency(ingested_data["currency"]) 43 dim_location = transform_dim_location(ingested_data["address"]) 44 dim_date = transform_dim_date(fact_sales_order) 45 dim_staff = transform_dim_staff( 46 ingested_data["staff"], ingested_data["department"] 47 ) 48 dim_counterparty = transform_dim_counterparty( 49 ingested_data["counterparty"], ingested_data["address"] 50 ) 51 logger.info("Transformation of ingested data complete.") 52 53 table_names = { 54 "fact_sales_order": fact_sales_order, 55 "dim_design": dim_design, 56 "dim_currency": dim_currency, 57 "dim_location": dim_location, 58 "dim_date": dim_date, 59 "dim_staff": dim_staff, 60 "dim_counterparty": dim_counterparty, 61 } 62 63 for k, v in table_names.items(): 64 upload_to_s3(v, os.environ["TRANSFORM_BUCKET_NAME"], k) 65 logger.info(f"Uploaded transformed data to S3 for table {k}.") 66 67 return { 68 "statusCode": 200, 69 "body": json.dumps({"message": "Data successfully transformed"}), 70 } 71 72 except Exception as e: 73 logger.error(f"Error: {str(e)}") 74 return { 75 "statusCode": 500, 76 "body": json.dumps({"message": "Error!", "error": str(e)}), 77 }
def
lambda_handler(event, context):
23def lambda_handler(event, context): 24 """ 25 This function will run the transform function on all tables in the ingestion bucket and upload them as parquet to the processed bucket. 26 27 # Returns: 28 A message with status code 200 on successful input into the processed bucket. 29 A message with status code 500 on an unsuccessful attempt. 30 31 """ 32 33 logger = logging.getLogger() 34 logger.setLevel(logging.INFO) 35 36 try: 37 38 ingested_data = get_all_table_data_from_ingest_bucket() 39 logger.info("Extracted data from ingestion bucket.") 40 41 fact_sales_order = transform_fact_sales_order(ingested_data["sales_order"]) 42 dim_design = transform_dim_design(ingested_data["design"]) 43 dim_currency = transform_dim_currency(ingested_data["currency"]) 44 dim_location = transform_dim_location(ingested_data["address"]) 45 dim_date = transform_dim_date(fact_sales_order) 46 dim_staff = transform_dim_staff( 47 ingested_data["staff"], ingested_data["department"] 48 ) 49 dim_counterparty = transform_dim_counterparty( 50 ingested_data["counterparty"], ingested_data["address"] 51 ) 52 logger.info("Transformation of ingested data complete.") 53 54 table_names = { 55 "fact_sales_order": fact_sales_order, 56 "dim_design": dim_design, 57 "dim_currency": dim_currency, 58 "dim_location": dim_location, 59 "dim_date": dim_date, 60 "dim_staff": dim_staff, 61 "dim_counterparty": dim_counterparty, 62 } 63 64 for k, v in table_names.items(): 65 upload_to_s3(v, os.environ["TRANSFORM_BUCKET_NAME"], k) 66 logger.info(f"Uploaded transformed data to S3 for table {k}.") 67 68 return { 69 "statusCode": 200, 70 "body": json.dumps({"message": "Data successfully transformed"}), 71 } 72 73 except Exception as e: 74 logger.error(f"Error: {str(e)}") 75 return { 76 "statusCode": 500, 77 "body": json.dumps({"message": "Error!", "error": str(e)}), 78 }
This function will run the transform function on all tables in the ingestion bucket and upload them as parquet to the processed bucket.
Returns:
A message with status code 200 on successful input into the processed bucket.
A message with status code 500 on an unsuccessful attempt.