transform.transform_utils
Contains the utility functions for the transform lambda_handler.
1""" 2Contains the utility functions for the transform lambda_handler. 3""" 4 5import json 6import os 7from datetime import datetime, timezone 8 9import awswrangler as wr 10import boto3 11import pandas as pd 12from currency_codes import get_currency_by_code 13 14 15def get_table_data_from_ingest_bucket(table_name, bucket_name): 16 """ 17 Connects to the ingestion s3 bucket and retrieves the most recent data for the given table. 18 19 # Arguments: 20 table_name: the name of the table in the bucket to retrieve data from. 21 bucket_name: the name of the s3 bucket, which should be the ingestion bucket. 22 23 # Returns: 24 A list of dictionaries for a given table. The dictionaries' keys are column names from the ingested table and values are the most recent entries for that table. 25 26 # Raises: 27 RuntimeError: An error occurred during data retrieval. 28 """ 29 try: 30 client = boto3.client("s3") 31 listing_response = client.list_objects_v2(Bucket=bucket_name, Prefix=table_name) 32 object_key = listing_response["Contents"][-1]["Key"] 33 retrieval_response = client.get_object(Bucket=bucket_name, Key=object_key) 34 body = retrieval_response["Body"].read().decode("utf-8") 35 return json.loads(body) 36 37 except Exception as e: 38 raise RuntimeError(f"Retrieval of data from ingest bucket failed: {e}") 39 40 41def get_all_table_data_from_ingest_bucket(): 42 """ 43 Retrieves the most recent data for each table stored in the ingestion s3 bucket. 44 45 # Returns: 46 A dictionary whose keys are table names and whose values are lists of dictionaries, each dictionary representing a table row. 47 """ 48 table_names = [ 49 "sales_order", 50 "design", 51 "address", 52 "counterparty", 53 "staff", 54 "currency", 55 "department", 56 ] 57 ingested_data = {} 58 for table_name in table_names: 59 ingested_data[table_name] = get_table_data_from_ingest_bucket( 60 table_name, os.environ["INGESTION_BUCKET_NAME"] 61 ) 62 63 return ingested_data 64 65 66def transform_fact_sales_order(sales_order_data): 67 """ 68 Transforms data from the sales_order table into the format required for fact_sales_order. 69 70 # Arguments: 71 sales_order_data: a list of dictionaries representing the contents of the sales_order table. 72 73 # Returns: 74 A dataframe in the required format. 75 """ 76 df = pd.DataFrame(sales_order_data) 77 78 df[["created_date", "created_time"]] = df["created_at"].str.split("T", expand=True) 79 df[["last_updated_date", "last_updated_time"]] = df["last_updated"].str.split( 80 "T", expand=True 81 ) 82 83 df["created_date"] = pd.to_datetime(df["created_date"], format="%Y-%m-%d").dt.date 84 df["last_updated_date"] = pd.to_datetime( 85 df["last_updated_date"], format="%Y-%m-%d" 86 ).dt.date 87 df["agreed_payment_date"] = pd.to_datetime( 88 df["agreed_payment_date"], format="%Y-%m-%d" 89 ).dt.date 90 df["agreed_delivery_date"] = pd.to_datetime( 91 df["agreed_delivery_date"], format="%Y-%m-%d" 92 ).dt.date 93 94 del df["created_at"] 95 del df["last_updated"] 96 97 df.insert(0, "sales_record_id", df.index) 98 99 df = df.rename(columns={"staff_id": "sales_staff_id"}) 100 101 columns_in_order = [ 102 "sales_record_id", 103 "sales_order_id", 104 "created_date", 105 "created_time", 106 "last_updated_date", 107 "last_updated_time", 108 "sales_staff_id", 109 "counterparty_id", 110 "units_sold", 111 "unit_price", 112 "currency_id", 113 "design_id", 114 "agreed_payment_date", 115 "agreed_delivery_date", 116 "agreed_delivery_location_id", 117 ] 118 119 df = df[columns_in_order] 120 return df 121 122 123def transform_dim_design(design_data): 124 """ 125 Transforms data from the design table into the format required for dim_design. 126 127 # Arguments: 128 design_data: a list of dictionaries representing the contents of the design table. 129 130 # Returns: 131 A dataframe in the required format. 132 """ 133 df = pd.DataFrame(design_data) 134 del df["created_at"] 135 del df["last_updated"] 136 137 return df 138 139 140def transform_dim_currency(currency_data): 141 """ 142 Transforms data from the currency table into the format required for dim_currency. 143 144 # Arguments: 145 cuurency_data: a list of dictionaries representing the contents of the currency table. 146 147 # Returns: 148 A dataframe in the required format. 149 """ 150 df = pd.DataFrame(currency_data) 151 del df["created_at"] 152 del df["last_updated"] 153 154 df["currency_name"] = ( 155 df["currency_code"].apply(get_currency_by_code).apply(lambda x: x.name) 156 ) 157 158 return df 159 160 161def transform_dim_location(location_data): 162 """ 163 Transforms data from the address table into the format required for dim_location. 164 165 # Arguments: 166 location_data: a list of dictionaries representing the contents of the address table. 167 168 # Returns: 169 A dataframe in the required format. 170 """ 171 df = pd.DataFrame(location_data) 172 del df["created_at"] 173 del df["last_updated"] 174 175 df = df.rename(columns={"address_id": "location_id"}) 176 177 return df 178 179 180def transform_dim_date(transformed_fact_sales_data): 181 """ 182 Transforms data from the sales_order table into the format required for dim_date. 183 184 # Arguments: 185 transformed_fact_sales_data: a dataframe returned from the transform_fact_sales_order function. 186 187 # Returns: 188 A dataframe in the required format. 189 """ 190 old_df = transformed_fact_sales_data 191 dates_1 = old_df["created_date"] 192 dates_2 = old_df["last_updated_date"] 193 dates_3 = old_df["agreed_payment_date"] 194 dates_4 = old_df["agreed_delivery_date"] 195 all_dates = pd.concat([dates_1, dates_2, dates_3, dates_4]).drop_duplicates( 196 ignore_index=True 197 ) 198 199 df = all_dates.to_frame(name="date_id") 200 df["year"] = pd.to_datetime(df["date_id"]).dt.year 201 df["month"] = pd.to_datetime(df["date_id"]).dt.month 202 df["day"] = pd.to_datetime(df["date_id"]).dt.day 203 df["day_of_week"] = pd.to_datetime(df["date_id"]).dt.day_of_week 204 df["day_name"] = pd.to_datetime(df["date_id"]).dt.day_name() 205 df["month_name"] = pd.to_datetime(df["date_id"]).dt.month_name() 206 df["quarter"] = pd.to_datetime(df["date_id"]).dt.quarter 207 208 return df 209 210 211def transform_dim_staff(staff_data, department_data): 212 """ 213 Transforms data from the staff and department tables into the format required for dim_staff. 214 215 # Arguments: 216 staff_data: a list of dictionaries representing the contents of the staff table. 217 department_data: a list of dictionaries representing the contents of the department table. 218 219 # Returns: 220 A dataframe in the required format. 221 """ 222 staff_df = pd.DataFrame(staff_data) 223 department_df = pd.DataFrame(department_data) 224 225 del staff_df["created_at"] 226 del staff_df["last_updated"] 227 del department_df["created_at"] 228 del department_df["last_updated"] 229 del department_df["manager"] 230 231 df = pd.merge(staff_df, department_df, on="department_id", how="left") 232 233 del df["department_id"] 234 columns_in_order = [ 235 "staff_id", 236 "first_name", 237 "last_name", 238 "department_name", 239 "location", 240 "email_address", 241 ] 242 df = df[columns_in_order] 243 return df 244 245 246def transform_dim_counterparty(counterparty_data, address_data): 247 """ 248 Transforms data from the counterparty and address tables into the format required for dim_counterparty. 249 250 # Arguments: 251 counterparty_data: a list of dictionaries representing the contents of the counterparty table. 252 address_data: a list of dictionaries representing the contents of the address table. 253 254 # Returns: 255 A dataframe in the required format. 256 """ 257 counterparty_df = pd.DataFrame(counterparty_data) 258 address_df = pd.DataFrame(address_data) 259 counterparty_df = counterparty_df.rename(columns={"legal_address_id": "address_id"}) 260 df = pd.merge(counterparty_df, address_df, on="address_id", how="left") 261 262 df = df[ 263 [ 264 "counterparty_id", 265 "counterparty_legal_name", 266 "address_line_1", 267 "address_line_2", 268 "district", 269 "city", 270 "postal_code", 271 "country", 272 "phone", 273 ] 274 ] 275 df = df.rename( 276 columns={ 277 "address_line_1": "counterparty_legal_address_line_1", 278 "address_line_2": "counterparty_legal_address_line_2", 279 "district": "counterparty_legal_district", 280 "city": "counterparty_legal_city", 281 "postal_code": "counterparty_legal_postal_code", 282 "country": "counterparty_legal_country", 283 "phone": "counterparty_legal_phone", 284 } 285 ) 286 287 return df 288 289 290def upload_to_s3(dataframe, bucket_name, table_name): 291 """ 292 This function takes a dataframe and uploads it in parquet format to a given bucket with a key that includes table name and datestamp. 293 294 # Arguments: 295 dataframe: a dataframe with the transformed data, suitable for storage as parquet. 296 bucket_name: a string representing the name of the s3 bucket to upload the parquet file to. 297 table_name: a string representing the table in the warehouse that the data corresponds to. 298 299 # Returns: 300 A message containing the location of the uploaded data. 301 302 # Raises: 303 RuntimeError: An error occurred during data upload. 304 """ 305 now = datetime.now(timezone.utc) 306 date_path = now.strftime("%Y/%m/%d") 307 timestamp = now.strftime("%Y%m%dT%H%M%SZ") 308 309 key = f"{table_name}/{date_path}/{table_name}-{timestamp}.parquet" 310 311 s3_url = f"s3://{bucket_name}/{key}" 312 try: 313 wr.s3.to_parquet( 314 df=dataframe, 315 path=s3_url, 316 ) 317 318 message = f"s3://{bucket_name}/{key}" 319 print(message) 320 return message 321 322 except Exception as e: 323 raise RuntimeError(f"Database query failed: {e}")
16def get_table_data_from_ingest_bucket(table_name, bucket_name): 17 """ 18 Connects to the ingestion s3 bucket and retrieves the most recent data for the given table. 19 20 # Arguments: 21 table_name: the name of the table in the bucket to retrieve data from. 22 bucket_name: the name of the s3 bucket, which should be the ingestion bucket. 23 24 # Returns: 25 A list of dictionaries for a given table. The dictionaries' keys are column names from the ingested table and values are the most recent entries for that table. 26 27 # Raises: 28 RuntimeError: An error occurred during data retrieval. 29 """ 30 try: 31 client = boto3.client("s3") 32 listing_response = client.list_objects_v2(Bucket=bucket_name, Prefix=table_name) 33 object_key = listing_response["Contents"][-1]["Key"] 34 retrieval_response = client.get_object(Bucket=bucket_name, Key=object_key) 35 body = retrieval_response["Body"].read().decode("utf-8") 36 return json.loads(body) 37 38 except Exception as e: 39 raise RuntimeError(f"Retrieval of data from ingest bucket failed: {e}")
Connects to the ingestion s3 bucket and retrieves the most recent data for the given table.
Arguments:
table_name: the name of the table in the bucket to retrieve data from.
bucket_name: the name of the s3 bucket, which should be the ingestion bucket.
Returns:
A list of dictionaries for a given table. The dictionaries' keys are column names from the ingested table and values are the most recent entries for that table.
Raises:
RuntimeError: An error occurred during data retrieval.
42def get_all_table_data_from_ingest_bucket(): 43 """ 44 Retrieves the most recent data for each table stored in the ingestion s3 bucket. 45 46 # Returns: 47 A dictionary whose keys are table names and whose values are lists of dictionaries, each dictionary representing a table row. 48 """ 49 table_names = [ 50 "sales_order", 51 "design", 52 "address", 53 "counterparty", 54 "staff", 55 "currency", 56 "department", 57 ] 58 ingested_data = {} 59 for table_name in table_names: 60 ingested_data[table_name] = get_table_data_from_ingest_bucket( 61 table_name, os.environ["INGESTION_BUCKET_NAME"] 62 ) 63 64 return ingested_data
Retrieves the most recent data for each table stored in the ingestion s3 bucket.
Returns:
A dictionary whose keys are table names and whose values are lists of dictionaries, each dictionary representing a table row.
67def transform_fact_sales_order(sales_order_data): 68 """ 69 Transforms data from the sales_order table into the format required for fact_sales_order. 70 71 # Arguments: 72 sales_order_data: a list of dictionaries representing the contents of the sales_order table. 73 74 # Returns: 75 A dataframe in the required format. 76 """ 77 df = pd.DataFrame(sales_order_data) 78 79 df[["created_date", "created_time"]] = df["created_at"].str.split("T", expand=True) 80 df[["last_updated_date", "last_updated_time"]] = df["last_updated"].str.split( 81 "T", expand=True 82 ) 83 84 df["created_date"] = pd.to_datetime(df["created_date"], format="%Y-%m-%d").dt.date 85 df["last_updated_date"] = pd.to_datetime( 86 df["last_updated_date"], format="%Y-%m-%d" 87 ).dt.date 88 df["agreed_payment_date"] = pd.to_datetime( 89 df["agreed_payment_date"], format="%Y-%m-%d" 90 ).dt.date 91 df["agreed_delivery_date"] = pd.to_datetime( 92 df["agreed_delivery_date"], format="%Y-%m-%d" 93 ).dt.date 94 95 del df["created_at"] 96 del df["last_updated"] 97 98 df.insert(0, "sales_record_id", df.index) 99 100 df = df.rename(columns={"staff_id": "sales_staff_id"}) 101 102 columns_in_order = [ 103 "sales_record_id", 104 "sales_order_id", 105 "created_date", 106 "created_time", 107 "last_updated_date", 108 "last_updated_time", 109 "sales_staff_id", 110 "counterparty_id", 111 "units_sold", 112 "unit_price", 113 "currency_id", 114 "design_id", 115 "agreed_payment_date", 116 "agreed_delivery_date", 117 "agreed_delivery_location_id", 118 ] 119 120 df = df[columns_in_order] 121 return df
Transforms data from the sales_order table into the format required for fact_sales_order.
Arguments:
sales_order_data: a list of dictionaries representing the contents of the sales_order table.
Returns:
A dataframe in the required format.
124def transform_dim_design(design_data): 125 """ 126 Transforms data from the design table into the format required for dim_design. 127 128 # Arguments: 129 design_data: a list of dictionaries representing the contents of the design table. 130 131 # Returns: 132 A dataframe in the required format. 133 """ 134 df = pd.DataFrame(design_data) 135 del df["created_at"] 136 del df["last_updated"] 137 138 return df
Transforms data from the design table into the format required for dim_design.
Arguments:
design_data: a list of dictionaries representing the contents of the design table.
Returns:
A dataframe in the required format.
141def transform_dim_currency(currency_data): 142 """ 143 Transforms data from the currency table into the format required for dim_currency. 144 145 # Arguments: 146 cuurency_data: a list of dictionaries representing the contents of the currency table. 147 148 # Returns: 149 A dataframe in the required format. 150 """ 151 df = pd.DataFrame(currency_data) 152 del df["created_at"] 153 del df["last_updated"] 154 155 df["currency_name"] = ( 156 df["currency_code"].apply(get_currency_by_code).apply(lambda x: x.name) 157 ) 158 159 return df
Transforms data from the currency table into the format required for dim_currency.
Arguments:
cuurency_data: a list of dictionaries representing the contents of the currency table.
Returns:
A dataframe in the required format.
162def transform_dim_location(location_data): 163 """ 164 Transforms data from the address table into the format required for dim_location. 165 166 # Arguments: 167 location_data: a list of dictionaries representing the contents of the address table. 168 169 # Returns: 170 A dataframe in the required format. 171 """ 172 df = pd.DataFrame(location_data) 173 del df["created_at"] 174 del df["last_updated"] 175 176 df = df.rename(columns={"address_id": "location_id"}) 177 178 return df
Transforms data from the address table into the format required for dim_location.
Arguments:
location_data: a list of dictionaries representing the contents of the address table.
Returns:
A dataframe in the required format.
181def transform_dim_date(transformed_fact_sales_data): 182 """ 183 Transforms data from the sales_order table into the format required for dim_date. 184 185 # Arguments: 186 transformed_fact_sales_data: a dataframe returned from the transform_fact_sales_order function. 187 188 # Returns: 189 A dataframe in the required format. 190 """ 191 old_df = transformed_fact_sales_data 192 dates_1 = old_df["created_date"] 193 dates_2 = old_df["last_updated_date"] 194 dates_3 = old_df["agreed_payment_date"] 195 dates_4 = old_df["agreed_delivery_date"] 196 all_dates = pd.concat([dates_1, dates_2, dates_3, dates_4]).drop_duplicates( 197 ignore_index=True 198 ) 199 200 df = all_dates.to_frame(name="date_id") 201 df["year"] = pd.to_datetime(df["date_id"]).dt.year 202 df["month"] = pd.to_datetime(df["date_id"]).dt.month 203 df["day"] = pd.to_datetime(df["date_id"]).dt.day 204 df["day_of_week"] = pd.to_datetime(df["date_id"]).dt.day_of_week 205 df["day_name"] = pd.to_datetime(df["date_id"]).dt.day_name() 206 df["month_name"] = pd.to_datetime(df["date_id"]).dt.month_name() 207 df["quarter"] = pd.to_datetime(df["date_id"]).dt.quarter 208 209 return df
Transforms data from the sales_order table into the format required for dim_date.
Arguments:
transformed_fact_sales_data: a dataframe returned from the transform_fact_sales_order function.
Returns:
A dataframe in the required format.
212def transform_dim_staff(staff_data, department_data): 213 """ 214 Transforms data from the staff and department tables into the format required for dim_staff. 215 216 # Arguments: 217 staff_data: a list of dictionaries representing the contents of the staff table. 218 department_data: a list of dictionaries representing the contents of the department table. 219 220 # Returns: 221 A dataframe in the required format. 222 """ 223 staff_df = pd.DataFrame(staff_data) 224 department_df = pd.DataFrame(department_data) 225 226 del staff_df["created_at"] 227 del staff_df["last_updated"] 228 del department_df["created_at"] 229 del department_df["last_updated"] 230 del department_df["manager"] 231 232 df = pd.merge(staff_df, department_df, on="department_id", how="left") 233 234 del df["department_id"] 235 columns_in_order = [ 236 "staff_id", 237 "first_name", 238 "last_name", 239 "department_name", 240 "location", 241 "email_address", 242 ] 243 df = df[columns_in_order] 244 return df
Transforms data from the staff and department tables into the format required for dim_staff.
Arguments:
staff_data: a list of dictionaries representing the contents of the staff table.
department_data: a list of dictionaries representing the contents of the department table.
Returns:
A dataframe in the required format.
247def transform_dim_counterparty(counterparty_data, address_data): 248 """ 249 Transforms data from the counterparty and address tables into the format required for dim_counterparty. 250 251 # Arguments: 252 counterparty_data: a list of dictionaries representing the contents of the counterparty table. 253 address_data: a list of dictionaries representing the contents of the address table. 254 255 # Returns: 256 A dataframe in the required format. 257 """ 258 counterparty_df = pd.DataFrame(counterparty_data) 259 address_df = pd.DataFrame(address_data) 260 counterparty_df = counterparty_df.rename(columns={"legal_address_id": "address_id"}) 261 df = pd.merge(counterparty_df, address_df, on="address_id", how="left") 262 263 df = df[ 264 [ 265 "counterparty_id", 266 "counterparty_legal_name", 267 "address_line_1", 268 "address_line_2", 269 "district", 270 "city", 271 "postal_code", 272 "country", 273 "phone", 274 ] 275 ] 276 df = df.rename( 277 columns={ 278 "address_line_1": "counterparty_legal_address_line_1", 279 "address_line_2": "counterparty_legal_address_line_2", 280 "district": "counterparty_legal_district", 281 "city": "counterparty_legal_city", 282 "postal_code": "counterparty_legal_postal_code", 283 "country": "counterparty_legal_country", 284 "phone": "counterparty_legal_phone", 285 } 286 ) 287 288 return df
Transforms data from the counterparty and address tables into the format required for dim_counterparty.
Arguments:
counterparty_data: a list of dictionaries representing the contents of the counterparty table.
address_data: a list of dictionaries representing the contents of the address table.
Returns:
A dataframe in the required format.
291def upload_to_s3(dataframe, bucket_name, table_name): 292 """ 293 This function takes a dataframe and uploads it in parquet format to a given bucket with a key that includes table name and datestamp. 294 295 # Arguments: 296 dataframe: a dataframe with the transformed data, suitable for storage as parquet. 297 bucket_name: a string representing the name of the s3 bucket to upload the parquet file to. 298 table_name: a string representing the table in the warehouse that the data corresponds to. 299 300 # Returns: 301 A message containing the location of the uploaded data. 302 303 # Raises: 304 RuntimeError: An error occurred during data upload. 305 """ 306 now = datetime.now(timezone.utc) 307 date_path = now.strftime("%Y/%m/%d") 308 timestamp = now.strftime("%Y%m%dT%H%M%SZ") 309 310 key = f"{table_name}/{date_path}/{table_name}-{timestamp}.parquet" 311 312 s3_url = f"s3://{bucket_name}/{key}" 313 try: 314 wr.s3.to_parquet( 315 df=dataframe, 316 path=s3_url, 317 ) 318 319 message = f"s3://{bucket_name}/{key}" 320 print(message) 321 return message 322 323 except Exception as e: 324 raise RuntimeError(f"Database query failed: {e}")
This function takes a dataframe and uploads it in parquet format to a given bucket with a key that includes table name and datestamp.
Arguments:
dataframe: a dataframe with the transformed data, suitable for storage as parquet.
bucket_name: a string representing the name of the s3 bucket to upload the parquet file to.
table_name: a string representing the table in the warehouse that the data corresponds to.
Returns:
A message containing the location of the uploaded data.
Raises:
RuntimeError: An error occurred during data upload.