transform.transform_utils

Contains the utility functions for the transform lambda_handler.

  1"""
  2Contains the utility functions for the transform lambda_handler.
  3"""
  4
  5import json
  6import os
  7from datetime import datetime, timezone
  8
  9import awswrangler as wr
 10import boto3
 11import pandas as pd
 12from currency_codes import get_currency_by_code
 13
 14
 15def get_table_data_from_ingest_bucket(table_name, bucket_name):
 16    """
 17    Connects to the ingestion s3 bucket and retrieves the most recent data for the given table.
 18
 19    # Arguments:
 20        table_name: the name of the table in the bucket to retrieve data from.
 21        bucket_name: the name of the s3 bucket, which should be the ingestion bucket.
 22
 23    # Returns:
 24        A list of dictionaries for a given table. The dictionaries' keys are column names from the ingested table and values are the most recent entries for that table.
 25
 26    # Raises:
 27        RuntimeError: An error occurred during data retrieval.
 28    """
 29    try:
 30        client = boto3.client("s3")
 31        listing_response = client.list_objects_v2(Bucket=bucket_name, Prefix=table_name)
 32        object_key = listing_response["Contents"][-1]["Key"]
 33        retrieval_response = client.get_object(Bucket=bucket_name, Key=object_key)
 34        body = retrieval_response["Body"].read().decode("utf-8")
 35        return json.loads(body)
 36
 37    except Exception as e:
 38        raise RuntimeError(f"Retrieval of data from ingest bucket failed: {e}")
 39
 40
 41def get_all_table_data_from_ingest_bucket():
 42    """
 43    Retrieves the most recent data for each table stored in the ingestion s3 bucket.
 44
 45    # Returns:
 46        A dictionary whose keys are table names and whose values are lists of dictionaries, each dictionary representing a table row.
 47    """
 48    table_names = [
 49        "sales_order",
 50        "design",
 51        "address",
 52        "counterparty",
 53        "staff",
 54        "currency",
 55        "department",
 56    ]
 57    ingested_data = {}
 58    for table_name in table_names:
 59        ingested_data[table_name] = get_table_data_from_ingest_bucket(
 60            table_name, os.environ["INGESTION_BUCKET_NAME"]
 61        )
 62
 63    return ingested_data
 64
 65
 66def transform_fact_sales_order(sales_order_data):
 67    """
 68    Transforms data from the sales_order table into the format required for fact_sales_order.
 69
 70    # Arguments:
 71        sales_order_data: a list of dictionaries representing the contents of the sales_order table.
 72
 73    # Returns:
 74        A dataframe in the required format.
 75    """
 76    df = pd.DataFrame(sales_order_data)
 77
 78    df[["created_date", "created_time"]] = df["created_at"].str.split("T", expand=True)
 79    df[["last_updated_date", "last_updated_time"]] = df["last_updated"].str.split(
 80        "T", expand=True
 81    )
 82
 83    df["created_date"] = pd.to_datetime(df["created_date"], format="%Y-%m-%d").dt.date
 84    df["last_updated_date"] = pd.to_datetime(
 85        df["last_updated_date"], format="%Y-%m-%d"
 86    ).dt.date
 87    df["agreed_payment_date"] = pd.to_datetime(
 88        df["agreed_payment_date"], format="%Y-%m-%d"
 89    ).dt.date
 90    df["agreed_delivery_date"] = pd.to_datetime(
 91        df["agreed_delivery_date"], format="%Y-%m-%d"
 92    ).dt.date
 93
 94    del df["created_at"]
 95    del df["last_updated"]
 96
 97    df.insert(0, "sales_record_id", df.index)
 98
 99    df = df.rename(columns={"staff_id": "sales_staff_id"})
100
101    columns_in_order = [
102        "sales_record_id",
103        "sales_order_id",
104        "created_date",
105        "created_time",
106        "last_updated_date",
107        "last_updated_time",
108        "sales_staff_id",
109        "counterparty_id",
110        "units_sold",
111        "unit_price",
112        "currency_id",
113        "design_id",
114        "agreed_payment_date",
115        "agreed_delivery_date",
116        "agreed_delivery_location_id",
117    ]
118
119    df = df[columns_in_order]
120    return df
121
122
123def transform_dim_design(design_data):
124    """
125    Transforms data from the design table into the format required for dim_design.
126
127    # Arguments:
128        design_data: a list of dictionaries representing the contents of the design table.
129
130    # Returns:
131        A dataframe in the required format.
132    """
133    df = pd.DataFrame(design_data)
134    del df["created_at"]
135    del df["last_updated"]
136
137    return df
138
139
140def transform_dim_currency(currency_data):
141    """
142    Transforms data from the currency table into the format required for dim_currency.
143
144    # Arguments:
145        cuurency_data: a list of dictionaries representing the contents of the currency table.
146
147    # Returns:
148        A dataframe in the required format.
149    """
150    df = pd.DataFrame(currency_data)
151    del df["created_at"]
152    del df["last_updated"]
153
154    df["currency_name"] = (
155        df["currency_code"].apply(get_currency_by_code).apply(lambda x: x.name)
156    )
157
158    return df
159
160
161def transform_dim_location(location_data):
162    """
163    Transforms data from the address table into the format required for dim_location.
164
165    # Arguments:
166        location_data: a list of dictionaries representing the contents of the address table.
167
168    # Returns:
169        A dataframe in the required format.
170    """
171    df = pd.DataFrame(location_data)
172    del df["created_at"]
173    del df["last_updated"]
174
175    df = df.rename(columns={"address_id": "location_id"})
176
177    return df
178
179
180def transform_dim_date(transformed_fact_sales_data):
181    """
182    Transforms data from the sales_order table into the format required for dim_date.
183
184    # Arguments:
185        transformed_fact_sales_data: a dataframe returned from the transform_fact_sales_order function.
186
187    # Returns:
188        A dataframe in the required format.
189    """
190    old_df = transformed_fact_sales_data
191    dates_1 = old_df["created_date"]
192    dates_2 = old_df["last_updated_date"]
193    dates_3 = old_df["agreed_payment_date"]
194    dates_4 = old_df["agreed_delivery_date"]
195    all_dates = pd.concat([dates_1, dates_2, dates_3, dates_4]).drop_duplicates(
196        ignore_index=True
197    )
198
199    df = all_dates.to_frame(name="date_id")
200    df["year"] = pd.to_datetime(df["date_id"]).dt.year
201    df["month"] = pd.to_datetime(df["date_id"]).dt.month
202    df["day"] = pd.to_datetime(df["date_id"]).dt.day
203    df["day_of_week"] = pd.to_datetime(df["date_id"]).dt.day_of_week
204    df["day_name"] = pd.to_datetime(df["date_id"]).dt.day_name()
205    df["month_name"] = pd.to_datetime(df["date_id"]).dt.month_name()
206    df["quarter"] = pd.to_datetime(df["date_id"]).dt.quarter
207
208    return df
209
210
211def transform_dim_staff(staff_data, department_data):
212    """
213    Transforms data from the staff and department tables into the format required for dim_staff.
214
215    # Arguments:
216        staff_data: a list of dictionaries representing the contents of the staff table.
217        department_data: a list of dictionaries representing the contents of the department table.
218
219    # Returns:
220        A dataframe in the required format.
221    """
222    staff_df = pd.DataFrame(staff_data)
223    department_df = pd.DataFrame(department_data)
224
225    del staff_df["created_at"]
226    del staff_df["last_updated"]
227    del department_df["created_at"]
228    del department_df["last_updated"]
229    del department_df["manager"]
230
231    df = pd.merge(staff_df, department_df, on="department_id", how="left")
232
233    del df["department_id"]
234    columns_in_order = [
235        "staff_id",
236        "first_name",
237        "last_name",
238        "department_name",
239        "location",
240        "email_address",
241    ]
242    df = df[columns_in_order]
243    return df
244
245
246def transform_dim_counterparty(counterparty_data, address_data):
247    """
248    Transforms data from the counterparty and address tables into the format required for dim_counterparty.
249
250    # Arguments:
251        counterparty_data: a list of dictionaries representing the contents of the counterparty table.
252        address_data: a list of dictionaries representing the contents of the address table.
253
254    # Returns:
255        A dataframe in the required format.
256    """
257    counterparty_df = pd.DataFrame(counterparty_data)
258    address_df = pd.DataFrame(address_data)
259    counterparty_df = counterparty_df.rename(columns={"legal_address_id": "address_id"})
260    df = pd.merge(counterparty_df, address_df, on="address_id", how="left")
261
262    df = df[
263        [
264            "counterparty_id",
265            "counterparty_legal_name",
266            "address_line_1",
267            "address_line_2",
268            "district",
269            "city",
270            "postal_code",
271            "country",
272            "phone",
273        ]
274    ]
275    df = df.rename(
276        columns={
277            "address_line_1": "counterparty_legal_address_line_1",
278            "address_line_2": "counterparty_legal_address_line_2",
279            "district": "counterparty_legal_district",
280            "city": "counterparty_legal_city",
281            "postal_code": "counterparty_legal_postal_code",
282            "country": "counterparty_legal_country",
283            "phone": "counterparty_legal_phone",
284        }
285    )
286
287    return df
288
289
290def upload_to_s3(dataframe, bucket_name, table_name):
291    """
292    This function takes a dataframe and uploads it in parquet format to a given bucket with a key that includes table name and datestamp.
293
294    # Arguments:
295        dataframe: a dataframe with the transformed data, suitable for storage as parquet.
296        bucket_name: a string representing the name of the s3 bucket to upload the parquet file to.
297        table_name: a string representing the table in the warehouse that the data corresponds to.
298
299    # Returns:
300        A message containing the location of the uploaded data.
301
302    # Raises:
303        RuntimeError: An error occurred during data upload.
304    """
305    now = datetime.now(timezone.utc)
306    date_path = now.strftime("%Y/%m/%d")
307    timestamp = now.strftime("%Y%m%dT%H%M%SZ")
308
309    key = f"{table_name}/{date_path}/{table_name}-{timestamp}.parquet"
310
311    s3_url = f"s3://{bucket_name}/{key}"
312    try:
313        wr.s3.to_parquet(
314            df=dataframe,
315            path=s3_url,
316        )
317
318        message = f"s3://{bucket_name}/{key}"
319        print(message)
320        return message
321
322    except Exception as e:
323        raise RuntimeError(f"Database query failed: {e}")
def get_table_data_from_ingest_bucket(table_name, bucket_name):
16def get_table_data_from_ingest_bucket(table_name, bucket_name):
17    """
18    Connects to the ingestion s3 bucket and retrieves the most recent data for the given table.
19
20    # Arguments:
21        table_name: the name of the table in the bucket to retrieve data from.
22        bucket_name: the name of the s3 bucket, which should be the ingestion bucket.
23
24    # Returns:
25        A list of dictionaries for a given table. The dictionaries' keys are column names from the ingested table and values are the most recent entries for that table.
26
27    # Raises:
28        RuntimeError: An error occurred during data retrieval.
29    """
30    try:
31        client = boto3.client("s3")
32        listing_response = client.list_objects_v2(Bucket=bucket_name, Prefix=table_name)
33        object_key = listing_response["Contents"][-1]["Key"]
34        retrieval_response = client.get_object(Bucket=bucket_name, Key=object_key)
35        body = retrieval_response["Body"].read().decode("utf-8")
36        return json.loads(body)
37
38    except Exception as e:
39        raise RuntimeError(f"Retrieval of data from ingest bucket failed: {e}")

Connects to the ingestion s3 bucket and retrieves the most recent data for the given table.

Arguments:

table_name: the name of the table in the bucket to retrieve data from.
bucket_name: the name of the s3 bucket, which should be the ingestion bucket.

Returns:

A list of dictionaries for a given table. The dictionaries' keys are column names from the ingested table and values are the most recent entries for that table.

Raises:

RuntimeError: An error occurred during data retrieval.
def get_all_table_data_from_ingest_bucket():
42def get_all_table_data_from_ingest_bucket():
43    """
44    Retrieves the most recent data for each table stored in the ingestion s3 bucket.
45
46    # Returns:
47        A dictionary whose keys are table names and whose values are lists of dictionaries, each dictionary representing a table row.
48    """
49    table_names = [
50        "sales_order",
51        "design",
52        "address",
53        "counterparty",
54        "staff",
55        "currency",
56        "department",
57    ]
58    ingested_data = {}
59    for table_name in table_names:
60        ingested_data[table_name] = get_table_data_from_ingest_bucket(
61            table_name, os.environ["INGESTION_BUCKET_NAME"]
62        )
63
64    return ingested_data

Retrieves the most recent data for each table stored in the ingestion s3 bucket.

Returns:

A dictionary whose keys are table names and whose values are lists of dictionaries, each dictionary representing a table row.
def transform_fact_sales_order(sales_order_data):
 67def transform_fact_sales_order(sales_order_data):
 68    """
 69    Transforms data from the sales_order table into the format required for fact_sales_order.
 70
 71    # Arguments:
 72        sales_order_data: a list of dictionaries representing the contents of the sales_order table.
 73
 74    # Returns:
 75        A dataframe in the required format.
 76    """
 77    df = pd.DataFrame(sales_order_data)
 78
 79    df[["created_date", "created_time"]] = df["created_at"].str.split("T", expand=True)
 80    df[["last_updated_date", "last_updated_time"]] = df["last_updated"].str.split(
 81        "T", expand=True
 82    )
 83
 84    df["created_date"] = pd.to_datetime(df["created_date"], format="%Y-%m-%d").dt.date
 85    df["last_updated_date"] = pd.to_datetime(
 86        df["last_updated_date"], format="%Y-%m-%d"
 87    ).dt.date
 88    df["agreed_payment_date"] = pd.to_datetime(
 89        df["agreed_payment_date"], format="%Y-%m-%d"
 90    ).dt.date
 91    df["agreed_delivery_date"] = pd.to_datetime(
 92        df["agreed_delivery_date"], format="%Y-%m-%d"
 93    ).dt.date
 94
 95    del df["created_at"]
 96    del df["last_updated"]
 97
 98    df.insert(0, "sales_record_id", df.index)
 99
100    df = df.rename(columns={"staff_id": "sales_staff_id"})
101
102    columns_in_order = [
103        "sales_record_id",
104        "sales_order_id",
105        "created_date",
106        "created_time",
107        "last_updated_date",
108        "last_updated_time",
109        "sales_staff_id",
110        "counterparty_id",
111        "units_sold",
112        "unit_price",
113        "currency_id",
114        "design_id",
115        "agreed_payment_date",
116        "agreed_delivery_date",
117        "agreed_delivery_location_id",
118    ]
119
120    df = df[columns_in_order]
121    return df

Transforms data from the sales_order table into the format required for fact_sales_order.

Arguments:

sales_order_data: a list of dictionaries representing the contents of the sales_order table.

Returns:

A dataframe in the required format.
def transform_dim_design(design_data):
124def transform_dim_design(design_data):
125    """
126    Transforms data from the design table into the format required for dim_design.
127
128    # Arguments:
129        design_data: a list of dictionaries representing the contents of the design table.
130
131    # Returns:
132        A dataframe in the required format.
133    """
134    df = pd.DataFrame(design_data)
135    del df["created_at"]
136    del df["last_updated"]
137
138    return df

Transforms data from the design table into the format required for dim_design.

Arguments:

design_data: a list of dictionaries representing the contents of the design table.

Returns:

A dataframe in the required format.
def transform_dim_currency(currency_data):
141def transform_dim_currency(currency_data):
142    """
143    Transforms data from the currency table into the format required for dim_currency.
144
145    # Arguments:
146        cuurency_data: a list of dictionaries representing the contents of the currency table.
147
148    # Returns:
149        A dataframe in the required format.
150    """
151    df = pd.DataFrame(currency_data)
152    del df["created_at"]
153    del df["last_updated"]
154
155    df["currency_name"] = (
156        df["currency_code"].apply(get_currency_by_code).apply(lambda x: x.name)
157    )
158
159    return df

Transforms data from the currency table into the format required for dim_currency.

Arguments:

cuurency_data: a list of dictionaries representing the contents of the currency table.

Returns:

A dataframe in the required format.
def transform_dim_location(location_data):
162def transform_dim_location(location_data):
163    """
164    Transforms data from the address table into the format required for dim_location.
165
166    # Arguments:
167        location_data: a list of dictionaries representing the contents of the address table.
168
169    # Returns:
170        A dataframe in the required format.
171    """
172    df = pd.DataFrame(location_data)
173    del df["created_at"]
174    del df["last_updated"]
175
176    df = df.rename(columns={"address_id": "location_id"})
177
178    return df

Transforms data from the address table into the format required for dim_location.

Arguments:

location_data: a list of dictionaries representing the contents of the address table.

Returns:

A dataframe in the required format.
def transform_dim_date(transformed_fact_sales_data):
181def transform_dim_date(transformed_fact_sales_data):
182    """
183    Transforms data from the sales_order table into the format required for dim_date.
184
185    # Arguments:
186        transformed_fact_sales_data: a dataframe returned from the transform_fact_sales_order function.
187
188    # Returns:
189        A dataframe in the required format.
190    """
191    old_df = transformed_fact_sales_data
192    dates_1 = old_df["created_date"]
193    dates_2 = old_df["last_updated_date"]
194    dates_3 = old_df["agreed_payment_date"]
195    dates_4 = old_df["agreed_delivery_date"]
196    all_dates = pd.concat([dates_1, dates_2, dates_3, dates_4]).drop_duplicates(
197        ignore_index=True
198    )
199
200    df = all_dates.to_frame(name="date_id")
201    df["year"] = pd.to_datetime(df["date_id"]).dt.year
202    df["month"] = pd.to_datetime(df["date_id"]).dt.month
203    df["day"] = pd.to_datetime(df["date_id"]).dt.day
204    df["day_of_week"] = pd.to_datetime(df["date_id"]).dt.day_of_week
205    df["day_name"] = pd.to_datetime(df["date_id"]).dt.day_name()
206    df["month_name"] = pd.to_datetime(df["date_id"]).dt.month_name()
207    df["quarter"] = pd.to_datetime(df["date_id"]).dt.quarter
208
209    return df

Transforms data from the sales_order table into the format required for dim_date.

Arguments:

transformed_fact_sales_data: a dataframe returned from the transform_fact_sales_order function.

Returns:

A dataframe in the required format.
def transform_dim_staff(staff_data, department_data):
212def transform_dim_staff(staff_data, department_data):
213    """
214    Transforms data from the staff and department tables into the format required for dim_staff.
215
216    # Arguments:
217        staff_data: a list of dictionaries representing the contents of the staff table.
218        department_data: a list of dictionaries representing the contents of the department table.
219
220    # Returns:
221        A dataframe in the required format.
222    """
223    staff_df = pd.DataFrame(staff_data)
224    department_df = pd.DataFrame(department_data)
225
226    del staff_df["created_at"]
227    del staff_df["last_updated"]
228    del department_df["created_at"]
229    del department_df["last_updated"]
230    del department_df["manager"]
231
232    df = pd.merge(staff_df, department_df, on="department_id", how="left")
233
234    del df["department_id"]
235    columns_in_order = [
236        "staff_id",
237        "first_name",
238        "last_name",
239        "department_name",
240        "location",
241        "email_address",
242    ]
243    df = df[columns_in_order]
244    return df

Transforms data from the staff and department tables into the format required for dim_staff.

Arguments:

staff_data: a list of dictionaries representing the contents of the staff table.
department_data: a list of dictionaries representing the contents of the department table.

Returns:

A dataframe in the required format.
def transform_dim_counterparty(counterparty_data, address_data):
247def transform_dim_counterparty(counterparty_data, address_data):
248    """
249    Transforms data from the counterparty and address tables into the format required for dim_counterparty.
250
251    # Arguments:
252        counterparty_data: a list of dictionaries representing the contents of the counterparty table.
253        address_data: a list of dictionaries representing the contents of the address table.
254
255    # Returns:
256        A dataframe in the required format.
257    """
258    counterparty_df = pd.DataFrame(counterparty_data)
259    address_df = pd.DataFrame(address_data)
260    counterparty_df = counterparty_df.rename(columns={"legal_address_id": "address_id"})
261    df = pd.merge(counterparty_df, address_df, on="address_id", how="left")
262
263    df = df[
264        [
265            "counterparty_id",
266            "counterparty_legal_name",
267            "address_line_1",
268            "address_line_2",
269            "district",
270            "city",
271            "postal_code",
272            "country",
273            "phone",
274        ]
275    ]
276    df = df.rename(
277        columns={
278            "address_line_1": "counterparty_legal_address_line_1",
279            "address_line_2": "counterparty_legal_address_line_2",
280            "district": "counterparty_legal_district",
281            "city": "counterparty_legal_city",
282            "postal_code": "counterparty_legal_postal_code",
283            "country": "counterparty_legal_country",
284            "phone": "counterparty_legal_phone",
285        }
286    )
287
288    return df

Transforms data from the counterparty and address tables into the format required for dim_counterparty.

Arguments:

counterparty_data: a list of dictionaries representing the contents of the counterparty table.
address_data: a list of dictionaries representing the contents of the address table.

Returns:

A dataframe in the required format.
def upload_to_s3(dataframe, bucket_name, table_name):
291def upload_to_s3(dataframe, bucket_name, table_name):
292    """
293    This function takes a dataframe and uploads it in parquet format to a given bucket with a key that includes table name and datestamp.
294
295    # Arguments:
296        dataframe: a dataframe with the transformed data, suitable for storage as parquet.
297        bucket_name: a string representing the name of the s3 bucket to upload the parquet file to.
298        table_name: a string representing the table in the warehouse that the data corresponds to.
299
300    # Returns:
301        A message containing the location of the uploaded data.
302
303    # Raises:
304        RuntimeError: An error occurred during data upload.
305    """
306    now = datetime.now(timezone.utc)
307    date_path = now.strftime("%Y/%m/%d")
308    timestamp = now.strftime("%Y%m%dT%H%M%SZ")
309
310    key = f"{table_name}/{date_path}/{table_name}-{timestamp}.parquet"
311
312    s3_url = f"s3://{bucket_name}/{key}"
313    try:
314        wr.s3.to_parquet(
315            df=dataframe,
316            path=s3_url,
317        )
318
319        message = f"s3://{bucket_name}/{key}"
320        print(message)
321        return message
322
323    except Exception as e:
324        raise RuntimeError(f"Database query failed: {e}")

This function takes a dataframe and uploads it in parquet format to a given bucket with a key that includes table name and datestamp.

Arguments:

dataframe: a dataframe with the transformed data, suitable for storage as parquet.
bucket_name: a string representing the name of the s3 bucket to upload the parquet file to.
table_name: a string representing the table in the warehouse that the data corresponds to.

Returns:

A message containing the location of the uploaded data.

Raises:

RuntimeError: An error occurred during data upload.