ingestion.ingest_lambda

Contains the main function used by the Ingestion Lambda AWS resource.

 1"""
 2Contains the main function used by the Ingestion Lambda AWS resource.
 3"""
 4
 5import json
 6import logging
 7import os
 8
 9import boto3
10import requests
11
12from src.ingestion.ingest_utils import ingest
13
14
15def lambda_handler(event, context):
16    """
17    This function will get database credentials from AWS Secrets Manager and store the data in the ingestion s3 bucket in json format for all tables in the database.
18
19    # Returns:
20        A message with status code 200 on successful extraction of the data from the database into the s3 bucket.
21        A message with status code 500 on an unsuccessful attempt.
22    """
23    logger = logging.getLogger()
24    logger.setLevel(logging.INFO)
25
26    try:
27        secret_name = "arn:aws:secretsmanager:eu-west-2:267414915338:secret:Totesys_DB_Credentials-YeWucm"
28
29        secrets_extension_endpoint = (
30            f"http://localhost:2773/secretsmanager/get?secretId={secret_name}"
31        )
32        headers = {
33            "X-Aws-Parameters-Secrets-Token": os.environ.get("AWS_SESSION_TOKEN")
34        }
35
36        response = requests.get(secrets_extension_endpoint, headers=headers)
37        logger.info(f"Response status code: {response.status_code}")
38
39        secret = json.loads(response.text)["SecretString"]
40        secret = json.loads(secret)
41
42        os.environ["DBUSER"] = secret["user"]
43        os.environ["DBNAME"] = secret["database"]
44        os.environ["DBPASSWORD"] = secret["password"]
45        os.environ["PORT"] = secret["port"]
46        os.environ["HOST"] = secret["host"]
47
48        table_names = [
49            "sales_order",
50            "design",
51            "address",
52            "counterparty",
53            "staff",
54            "currency",
55            "department",
56        ]
57        # Only 7 out of 11 tables included to match mock database
58        # To extract ALL tables include missing table names
59        for table in table_names:
60            logger.info(f"Ingesting {table} table.")
61            ingest(table, os.environ["INGESTION_BUCKET_NAME"])
62
63        step_function = os.environ["STEP_MACHINE_ARN"]
64        client = boto3.client("stepfunctions", region_name="eu-west-2")
65        sf_running = client.list_executions(
66            stateMachineArn=os.environ["STEP_MACHINE_ARN"], statusFilter="RUNNING"
67        )
68        sf_running_check = sf_running.get("executions", [])
69        if not sf_running_check:
70            client.start_execution(stateMachineArn=step_function)
71
72        return {
73            "statusCode": 200,
74            "body": json.dumps({"message": "Data successfully extracted"}),
75        }
76
77    except Exception as e:
78        logger.error(f"Error: {str(e)}")
79        return {
80            "statusCode": 500,
81            "body": json.dumps({"message": "Error!", "error": str(e)}),
82        }
def lambda_handler(event, context):
16def lambda_handler(event, context):
17    """
18    This function will get database credentials from AWS Secrets Manager and store the data in the ingestion s3 bucket in json format for all tables in the database.
19
20    # Returns:
21        A message with status code 200 on successful extraction of the data from the database into the s3 bucket.
22        A message with status code 500 on an unsuccessful attempt.
23    """
24    logger = logging.getLogger()
25    logger.setLevel(logging.INFO)
26
27    try:
28        secret_name = "arn:aws:secretsmanager:eu-west-2:267414915338:secret:Totesys_DB_Credentials-YeWucm"
29
30        secrets_extension_endpoint = (
31            f"http://localhost:2773/secretsmanager/get?secretId={secret_name}"
32        )
33        headers = {
34            "X-Aws-Parameters-Secrets-Token": os.environ.get("AWS_SESSION_TOKEN")
35        }
36
37        response = requests.get(secrets_extension_endpoint, headers=headers)
38        logger.info(f"Response status code: {response.status_code}")
39
40        secret = json.loads(response.text)["SecretString"]
41        secret = json.loads(secret)
42
43        os.environ["DBUSER"] = secret["user"]
44        os.environ["DBNAME"] = secret["database"]
45        os.environ["DBPASSWORD"] = secret["password"]
46        os.environ["PORT"] = secret["port"]
47        os.environ["HOST"] = secret["host"]
48
49        table_names = [
50            "sales_order",
51            "design",
52            "address",
53            "counterparty",
54            "staff",
55            "currency",
56            "department",
57        ]
58        # Only 7 out of 11 tables included to match mock database
59        # To extract ALL tables include missing table names
60        for table in table_names:
61            logger.info(f"Ingesting {table} table.")
62            ingest(table, os.environ["INGESTION_BUCKET_NAME"])
63
64        step_function = os.environ["STEP_MACHINE_ARN"]
65        client = boto3.client("stepfunctions", region_name="eu-west-2")
66        sf_running = client.list_executions(
67            stateMachineArn=os.environ["STEP_MACHINE_ARN"], statusFilter="RUNNING"
68        )
69        sf_running_check = sf_running.get("executions", [])
70        if not sf_running_check:
71            client.start_execution(stateMachineArn=step_function)
72
73        return {
74            "statusCode": 200,
75            "body": json.dumps({"message": "Data successfully extracted"}),
76        }
77
78    except Exception as e:
79        logger.error(f"Error: {str(e)}")
80        return {
81            "statusCode": 500,
82            "body": json.dumps({"message": "Error!", "error": str(e)}),
83        }

This function will get database credentials from AWS Secrets Manager and store the data in the ingestion s3 bucket in json format for all tables in the database.

Returns:

A message with status code 200 on successful extraction of the data from the database into the s3 bucket.
A message with status code 500 on an unsuccessful attempt.