ingestion.ingest_utils

Contains the utility functions for the ingestion lambda_handler.

  1"""
  2Contains the utility functions for the ingestion lambda_handler.
  3"""
  4
  5import datetime
  6import json
  7from datetime import timezone
  8
  9import boto3
 10from pg8000.native import identifier
 11
 12from src.utils.db_connection import close_conn, create_conn
 13from src.utils.default_serialiser import default_serialiser
 14
 15
 16def extract_data(table_name):
 17    """
 18    This function connects to the database whose credentials are stored as environment variables and selects all the information in the given table.
 19
 20    # Arguments:
 21        table_name: a string representing the name of the table in the database that we want to extract.
 22
 23    # Returns:
 24        A list of dictionaries where each dictionary represents a single row in the given table and the keys are the column names in the given table.
 25
 26    # Raises:
 27        RuntimeError: An error occurred during data extraction.
 28    """
 29
 30    query = f"SELECT * FROM {identifier(table_name)}"
 31
 32    conn = create_conn()
 33
 34    if conn:
 35        try:
 36            data = conn.run(query)
 37            columns = [column["name"] for column in conn.columns]
 38            result = [dict(zip(columns, row)) for row in data]
 39            return result
 40        except Exception as e:
 41            raise RuntimeError(f"Database query failed: {e}")
 42        finally:
 43            close_conn(conn)
 44
 45
 46def convert_to_json(data):
 47    """
 48    This function converts a list of dictionaries that the extract_data function returns into a json object. It is functionally identical to the json.dumps method, with a specified function for the default argument.
 49
 50    # Arguments:
 51        data: a list of dictionaries.
 52
 53    # Returns:
 54        A json object.
 55    """
 56
 57    return json.dumps(data, default=default_serialiser)
 58
 59
 60def upload_to_s3(data, bucket_name, table_name):
 61    """
 62    This function takes a json object and uploads it to a given bucket with a key that includes table name and datestamp.
 63
 64    # Arguments:
 65        data: a json object containing the data for a table in the database.
 66        bucket_name: a string representing the name of the s3 bucket to upload to.
 67        table_name: a string representing the table whose data we are uploading.
 68
 69    # Returns:
 70        A message confirming successful upload and showing the full location.
 71    """
 72
 73    s3 = boto3.client("s3")
 74
 75    now = datetime.datetime.now(timezone.utc)
 76    date_path = now.strftime("%Y/%m/%d")
 77    timestamp = now.strftime("%Y%m%dT%H%M%SZ")
 78
 79    key = f"{table_name}/{date_path}/{table_name}-{timestamp}.json"
 80
 81    try:
 82        s3.put_object(
 83            Bucket=bucket_name, Key=key, Body=data, ContentType="application/json"
 84        )
 85
 86        message = f"Uploaded to s3://{bucket_name}/{key}"
 87        print(message)
 88        return message
 89
 90    except Exception as e:
 91        raise RuntimeError(f"Database query failed: {e}")
 92
 93
 94def ingest(table_name, bucket_name):
 95    """
 96    This function calls extract_data, and converts the data to json through the convert_to_json function. It then uploads the data into the given s3 bucket.
 97
 98    # Arguments:
 99        table_name: a string representing the name of the table the data is from.
100        bucket_name: a string representing the name of the s3 bucket that is being uploaded to.
101
102    # Return:
103        A string indicating successful extraction of the data.
104
105    # Raises:
106        RuntimeError: An error occurred during data extraction.
107    """
108
109    try:
110        extracted_data = extract_data(table_name)
111
112        converted_data = convert_to_json(extracted_data)
113
114        upload_to_s3(converted_data, bucket_name, table_name)
115
116        return "Ingestion successful"
117
118    except Exception as e:
119        raise RuntimeError(f"Ingestion failed: {e}")
def extract_data(table_name):
17def extract_data(table_name):
18    """
19    This function connects to the database whose credentials are stored as environment variables and selects all the information in the given table.
20
21    # Arguments:
22        table_name: a string representing the name of the table in the database that we want to extract.
23
24    # Returns:
25        A list of dictionaries where each dictionary represents a single row in the given table and the keys are the column names in the given table.
26
27    # Raises:
28        RuntimeError: An error occurred during data extraction.
29    """
30
31    query = f"SELECT * FROM {identifier(table_name)}"
32
33    conn = create_conn()
34
35    if conn:
36        try:
37            data = conn.run(query)
38            columns = [column["name"] for column in conn.columns]
39            result = [dict(zip(columns, row)) for row in data]
40            return result
41        except Exception as e:
42            raise RuntimeError(f"Database query failed: {e}")
43        finally:
44            close_conn(conn)

This function connects to the database whose credentials are stored as environment variables and selects all the information in the given table.

Arguments:

table_name: a string representing the name of the table in the database that we want to extract.

Returns:

A list of dictionaries where each dictionary represents a single row in the given table and the keys are the column names in the given table.

Raises:

RuntimeError: An error occurred during data extraction.
def convert_to_json(data):
47def convert_to_json(data):
48    """
49    This function converts a list of dictionaries that the extract_data function returns into a json object. It is functionally identical to the json.dumps method, with a specified function for the default argument.
50
51    # Arguments:
52        data: a list of dictionaries.
53
54    # Returns:
55        A json object.
56    """
57
58    return json.dumps(data, default=default_serialiser)

This function converts a list of dictionaries that the extract_data function returns into a json object. It is functionally identical to the json.dumps method, with a specified function for the default argument.

Arguments:

data: a list of dictionaries.

Returns:

A json object.
def upload_to_s3(data, bucket_name, table_name):
61def upload_to_s3(data, bucket_name, table_name):
62    """
63    This function takes a json object and uploads it to a given bucket with a key that includes table name and datestamp.
64
65    # Arguments:
66        data: a json object containing the data for a table in the database.
67        bucket_name: a string representing the name of the s3 bucket to upload to.
68        table_name: a string representing the table whose data we are uploading.
69
70    # Returns:
71        A message confirming successful upload and showing the full location.
72    """
73
74    s3 = boto3.client("s3")
75
76    now = datetime.datetime.now(timezone.utc)
77    date_path = now.strftime("%Y/%m/%d")
78    timestamp = now.strftime("%Y%m%dT%H%M%SZ")
79
80    key = f"{table_name}/{date_path}/{table_name}-{timestamp}.json"
81
82    try:
83        s3.put_object(
84            Bucket=bucket_name, Key=key, Body=data, ContentType="application/json"
85        )
86
87        message = f"Uploaded to s3://{bucket_name}/{key}"
88        print(message)
89        return message
90
91    except Exception as e:
92        raise RuntimeError(f"Database query failed: {e}")

This function takes a json object and uploads it to a given bucket with a key that includes table name and datestamp.

Arguments:

data: a json object containing the data for a table in the database.
bucket_name: a string representing the name of the s3 bucket to upload to.
table_name: a string representing the table whose data we are uploading.

Returns:

A message confirming successful upload and showing the full location.
def ingest(table_name, bucket_name):
 95def ingest(table_name, bucket_name):
 96    """
 97    This function calls extract_data, and converts the data to json through the convert_to_json function. It then uploads the data into the given s3 bucket.
 98
 99    # Arguments:
100        table_name: a string representing the name of the table the data is from.
101        bucket_name: a string representing the name of the s3 bucket that is being uploaded to.
102
103    # Return:
104        A string indicating successful extraction of the data.
105
106    # Raises:
107        RuntimeError: An error occurred during data extraction.
108    """
109
110    try:
111        extracted_data = extract_data(table_name)
112
113        converted_data = convert_to_json(extracted_data)
114
115        upload_to_s3(converted_data, bucket_name, table_name)
116
117        return "Ingestion successful"
118
119    except Exception as e:
120        raise RuntimeError(f"Ingestion failed: {e}")

This function calls extract_data, and converts the data to json through the convert_to_json function. It then uploads the data into the given s3 bucket.

Arguments:

table_name: a string representing the name of the table the data is from.
bucket_name: a string representing the name of the s3 bucket that is being uploaded to.

Return:

A string indicating successful extraction of the data.

Raises:

RuntimeError: An error occurred during data extraction.