ingestion.ingest_lambda
Contains the main function used by the Ingestion Lambda AWS resource.
1""" 2Contains the main function used by the Ingestion Lambda AWS resource. 3""" 4 5import json 6import logging 7import os 8 9import boto3 10import requests 11 12from src.ingestion.ingest_utils import ingest 13 14 15def lambda_handler(event, context): 16 """ 17 This function will get database credentials from AWS Secrets Manager and store the data in the ingestion s3 bucket in json format for all tables in the database. 18 19 # Returns: 20 A message with status code 200 on successful extraction of the data from the database into the s3 bucket. 21 A message with status code 500 on an unsuccessful attempt. 22 """ 23 logger = logging.getLogger() 24 logger.setLevel(logging.INFO) 25 26 try: 27 secret_name = "arn:aws:secretsmanager:eu-west-2:267414915338:secret:Totesys_DB_Credentials-YeWucm" 28 29 secrets_extension_endpoint = ( 30 f"http://localhost:2773/secretsmanager/get?secretId={secret_name}" 31 ) 32 headers = { 33 "X-Aws-Parameters-Secrets-Token": os.environ.get("AWS_SESSION_TOKEN") 34 } 35 36 response = requests.get(secrets_extension_endpoint, headers=headers) 37 logger.info(f"Response status code: {response.status_code}") 38 39 secret = json.loads(response.text)["SecretString"] 40 secret = json.loads(secret) 41 42 os.environ["DBUSER"] = secret["user"] 43 os.environ["DBNAME"] = secret["database"] 44 os.environ["DBPASSWORD"] = secret["password"] 45 os.environ["PORT"] = secret["port"] 46 os.environ["HOST"] = secret["host"] 47 48 table_names = [ 49 "sales_order", 50 "design", 51 "address", 52 "counterparty", 53 "staff", 54 "currency", 55 "department", 56 ] 57 # Only 7 out of 11 tables included to match mock database 58 # To extract ALL tables include missing table names 59 for table in table_names: 60 logger.info(f"Ingesting {table} table.") 61 ingest(table, os.environ["INGESTION_BUCKET_NAME"]) 62 63 step_function = os.environ["STEP_MACHINE_ARN"] 64 client = boto3.client("stepfunctions", region_name="eu-west-2") 65 sf_running = client.list_executions( 66 stateMachineArn=os.environ["STEP_MACHINE_ARN"], statusFilter="RUNNING" 67 ) 68 sf_running_check = sf_running.get("executions", []) 69 if not sf_running_check: 70 client.start_execution(stateMachineArn=step_function) 71 72 return { 73 "statusCode": 200, 74 "body": json.dumps({"message": "Data successfully extracted"}), 75 } 76 77 except Exception as e: 78 logger.error(f"Error: {str(e)}") 79 return { 80 "statusCode": 500, 81 "body": json.dumps({"message": "Error!", "error": str(e)}), 82 }
def
lambda_handler(event, context):
16def lambda_handler(event, context): 17 """ 18 This function will get database credentials from AWS Secrets Manager and store the data in the ingestion s3 bucket in json format for all tables in the database. 19 20 # Returns: 21 A message with status code 200 on successful extraction of the data from the database into the s3 bucket. 22 A message with status code 500 on an unsuccessful attempt. 23 """ 24 logger = logging.getLogger() 25 logger.setLevel(logging.INFO) 26 27 try: 28 secret_name = "arn:aws:secretsmanager:eu-west-2:267414915338:secret:Totesys_DB_Credentials-YeWucm" 29 30 secrets_extension_endpoint = ( 31 f"http://localhost:2773/secretsmanager/get?secretId={secret_name}" 32 ) 33 headers = { 34 "X-Aws-Parameters-Secrets-Token": os.environ.get("AWS_SESSION_TOKEN") 35 } 36 37 response = requests.get(secrets_extension_endpoint, headers=headers) 38 logger.info(f"Response status code: {response.status_code}") 39 40 secret = json.loads(response.text)["SecretString"] 41 secret = json.loads(secret) 42 43 os.environ["DBUSER"] = secret["user"] 44 os.environ["DBNAME"] = secret["database"] 45 os.environ["DBPASSWORD"] = secret["password"] 46 os.environ["PORT"] = secret["port"] 47 os.environ["HOST"] = secret["host"] 48 49 table_names = [ 50 "sales_order", 51 "design", 52 "address", 53 "counterparty", 54 "staff", 55 "currency", 56 "department", 57 ] 58 # Only 7 out of 11 tables included to match mock database 59 # To extract ALL tables include missing table names 60 for table in table_names: 61 logger.info(f"Ingesting {table} table.") 62 ingest(table, os.environ["INGESTION_BUCKET_NAME"]) 63 64 step_function = os.environ["STEP_MACHINE_ARN"] 65 client = boto3.client("stepfunctions", region_name="eu-west-2") 66 sf_running = client.list_executions( 67 stateMachineArn=os.environ["STEP_MACHINE_ARN"], statusFilter="RUNNING" 68 ) 69 sf_running_check = sf_running.get("executions", []) 70 if not sf_running_check: 71 client.start_execution(stateMachineArn=step_function) 72 73 return { 74 "statusCode": 200, 75 "body": json.dumps({"message": "Data successfully extracted"}), 76 } 77 78 except Exception as e: 79 logger.error(f"Error: {str(e)}") 80 return { 81 "statusCode": 500, 82 "body": json.dumps({"message": "Error!", "error": str(e)}), 83 }
This function will get database credentials from AWS Secrets Manager and store the data in the ingestion s3 bucket in json format for all tables in the database.
Returns:
A message with status code 200 on successful extraction of the data from the database into the s3 bucket.
A message with status code 500 on an unsuccessful attempt.