load.load_utils
Contains the utility functions for the load lambda_handler.
1""" 2Contains the utility functions for the load lambda_handler. 3""" 4 5import awswrangler as wr 6import boto3 7 8from src.utils.db_connection import close_conn, create_conn 9 10 11def access_files_from_processed_bucket(table_name, bucket_name): 12 """ 13 This function connects to the s3 bucket with transformed data and returns all the information in the given table as a dataframe. 14 15 # Arguments: 16 table_name: a string representing the name of the table in the database that we want to extract. 17 bucket_name: a string representing the name of the s3 bucket with transformed data to connect to. 18 19 # Returns: 20 A dataframe containing the most recent data for the given table in the bucket. 21 22 # Raises: 23 RuntimeError: An error occurred during data retrieval. 24 """ 25 try: 26 client = boto3.client("s3") 27 listing_response = client.list_objects_v2(Bucket=bucket_name, Prefix=table_name) 28 object_key = listing_response["Contents"][-1]["Key"] 29 path = f"s3://{bucket_name}/{object_key}" 30 response = wr.s3.read_parquet([path]) 31 return response 32 33 except Exception as e: 34 raise RuntimeError(f"Retrieval of data from processed bucket failed: {e}") 35 36 37def load_dim_dates_into_warehouse(df): 38 """ 39 Loads data from a dataframe into the warehouse's dim_dates table. 40 41 # Arguments: 42 df: a dataframe representing the contents of the dim_dates table. 43 44 # Returns: 45 None. 46 """ 47 conn = create_conn() 48 if conn: 49 try: 50 query = """ 51 INSERT INTO dim_date 52 VALUES 53 """ 54 55 for _, row in df.iterrows(): 56 query += f"('{row['date_id']}', {row['year']}, {row['month']}, {row['day']}, {row['day_of_week']}, '{row['day_name']}', '{row['month_name']}', {row['quarter']}), " 57 query = query[:-2] 58 query += "ON CONFLICT (date_id) DO NOTHING;" 59 60 conn.run(query) 61 62 except Exception as e: 63 raise RuntimeError(f"Database query failed: {e}") 64 65 finally: 66 close_conn(conn) 67 68 69def load_dim_staff_into_warehouse(df): 70 """ 71 Loads data from a dataframe into the warehouse's dim_staff table. 72 73 # Arguments: 74 df: a dataframe representing the contents of the dim_staff table. 75 76 # Returns: 77 None. 78 """ 79 conn = create_conn() 80 if conn: 81 try: 82 query = """ 83 INSERT INTO dim_staff 84 VALUES 85 """ 86 87 for _, row in df.iterrows(): 88 query += f"({row['staff_id']}, $${row['first_name']}$$, $${row['last_name']}$$, '{row['department_name']}', '{row['location']}', $${row['email_address']}$$), " 89 query = query[:-2] 90 query += " ON CONFLICT (staff_id) DO NOTHING;" 91 92 conn.run(query) 93 94 except Exception as e: 95 raise RuntimeError(f"Database query failed: {e}") 96 97 finally: 98 close_conn(conn) 99 100 101def load_dim_location_into_warehouse(df): 102 """ 103 Loads data from a dataframe into the warehouse's dim_location table. 104 105 # Arguments: 106 df: a dataframe representing the contents of the dim_location table. 107 108 # Returns: 109 None. 110 """ 111 conn = create_conn() 112 if conn: 113 try: 114 query = """ 115 INSERT INTO dim_location 116 VALUES 117 """ 118 119 for _, row in df.iterrows(): 120 query += f"({row['location_id']}, $${row['address_line_1']}$$, $${row['address_line_2']}$$, $${row['district']}$$, $${row['city']}$$, $${row['postal_code']}$$, $${row['country']}$$, $${row['phone']}$$), " 121 query = query[:-2] 122 query += " ON CONFLICT (location_id) DO NOTHING;" 123 124 conn.run(query) 125 126 except Exception as e: 127 raise RuntimeError(f"Database query failed: {e}") 128 129 finally: 130 close_conn(conn) 131 132 133def load_dim_currency_into_warehouse(df): 134 """ 135 Loads data from a dataframe into the warehouse's dim_currency table. 136 137 # Arguments: 138 df: a dataframe representing the contents of the dim_currency table. 139 140 # Returns: 141 None. 142 """ 143 conn = create_conn() 144 if conn: 145 try: 146 query = """ 147 INSERT INTO dim_currency 148 VALUES 149 """ 150 151 for _, row in df.iterrows(): 152 query += f"({row['currency_id']}, $${row['currency_code']}$$, $${row['currency_name']}$$), " 153 query = query[:-2] 154 query += " ON CONFLICT (currency_id) DO NOTHING;" 155 156 conn.run(query) 157 158 except Exception as e: 159 raise RuntimeError(f"Database query failed: {e}") 160 161 finally: 162 close_conn(conn) 163 164 165def load_dim_design_into_warehouse(df): 166 """ 167 Loads data from a dataframe into the warehouse's dim_design table. 168 169 # Arguments: 170 df: a dataframe representing the contents of the dim_design table. 171 172 # Returns: 173 None. 174 """ 175 conn = create_conn() 176 if conn: 177 try: 178 query = """ 179 INSERT INTO dim_design 180 VALUES 181 """ 182 183 for _, row in df.iterrows(): 184 query += f"({row['design_id']}, $${row['design_name']}$$, $${row['file_location']}$$, $${row['file_name']}$$), " 185 query = query[:-2] 186 query += " ON CONFLICT (design_id) DO NOTHING;" 187 188 conn.run(query) 189 190 except Exception as e: 191 raise RuntimeError(f"Database query failed: {e}") 192 193 finally: 194 close_conn(conn) 195 196 197def load_dim_counterparty_into_warehouse(df): 198 """ 199 Loads data from a dataframe into the warehouse's dim_counterparty table. 200 201 # Arguments: 202 df: a dataframe representing the contents of the dim_counterparty table. 203 204 # Returns: 205 None. 206 """ 207 conn = create_conn() 208 if conn: 209 try: 210 query = """ 211 INSERT INTO dim_counterparty 212 VALUES 213 """ 214 215 for _, row in df.iterrows(): 216 query += f"({row['counterparty_id']}, $${row['counterparty_legal_name']}$$, $${row['counterparty_legal_address_line_1']}$$, $${row['counterparty_legal_address_line_2']}$$, $${row['counterparty_legal_district']}$$, $${row['counterparty_legal_city']}$$, $${row['counterparty_legal_postal_code']}$$, $${row['counterparty_legal_country']}$$, $${row['counterparty_legal_phone']}$$), " 217 query = query[:-2] 218 query += " ON CONFLICT (counterparty_id) DO NOTHING;" 219 220 conn.run(query) 221 222 except Exception as e: 223 raise RuntimeError(f"Database query failed: {e}") 224 225 finally: 226 close_conn(conn) 227 228 229def load_fact_sales_order_into_warehouse(df): 230 """ 231 Loads data from a dataframe into the warehouse's fact_sales_order table. Updates to a sales order are stored as new rows, without overwriting previous data. 232 233 # Arguments: 234 df: a dataframe representing the contents of the fact_sales_order table. 235 236 # Returns: 237 None. 238 """ 239 conn = create_conn() 240 if conn: 241 try: 242 temp_create_query = """ 243 CREATE TEMP TABLE temp_sales_order ( 244 sales_record_id SERIAL PRIMARY KEY NOT NULL, 245 sales_order_id INT NOT NULL, 246 created_date DATE NOT NULL, 247 created_time TIME NOT NULL DEFAULT CURRENT_TIME, 248 last_updated_date DATE NOT NULL, 249 last_updated_time TIME NOT NULL DEFAULT CURRENT_TIME, 250 sales_staff_id INT NOT NULL, 251 counterparty_id INT NOT NULL, 252 units_sold INT NOT NULL, 253 unit_price NUMERIC(10, 2) NOT NULL, 254 currency_id INT NOT NULL, 255 design_id INT NOT NULL, 256 agreed_payment_date DATE NOT NULL, 257 agreed_delivery_date DATE NOT NULL, 258 agreed_delivery_location_id INT NOT NULL 259 ); 260 """ 261 262 temp_insert_query = """ 263 INSERT INTO temp_sales_order 264 (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id) 265 VALUES 266 """ 267 268 for _, row in df.iterrows(): 269 temp_insert_query += f"({row['sales_order_id']}, '{row['created_date']}', $${row['created_time']}$$, '{row['last_updated_date']}', $${row['last_updated_time']}$$, {row['sales_staff_id']}, {row['counterparty_id']}, {row['units_sold']}, {row['unit_price']}, {row['currency_id']}, {row['design_id']}, '{row['agreed_payment_date']}', '{row['agreed_delivery_date']}', {row['agreed_delivery_location_id']}), " 270 271 temp_insert_query = temp_insert_query[:-2] 272 temp_insert_query += ";" 273 274 query = """ 275 INSERT INTO fact_sales_order 276 (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id) 277 278 SELECT 279 sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id 280 FROM temp_sales_order 281 282 WHERE 283 (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id) 284 285 NOT IN 286 (SELECT 287 sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id 288 FROM fact_sales_order) 289 ; 290 """ 291 conn.run(temp_create_query) 292 conn.run(temp_insert_query) 293 conn.run(query) 294 295 except Exception as e: 296 raise RuntimeError(f"Database query failed: {e}") 297 298 finally: 299 close_conn(conn)
def
access_files_from_processed_bucket(table_name, bucket_name):
12def access_files_from_processed_bucket(table_name, bucket_name): 13 """ 14 This function connects to the s3 bucket with transformed data and returns all the information in the given table as a dataframe. 15 16 # Arguments: 17 table_name: a string representing the name of the table in the database that we want to extract. 18 bucket_name: a string representing the name of the s3 bucket with transformed data to connect to. 19 20 # Returns: 21 A dataframe containing the most recent data for the given table in the bucket. 22 23 # Raises: 24 RuntimeError: An error occurred during data retrieval. 25 """ 26 try: 27 client = boto3.client("s3") 28 listing_response = client.list_objects_v2(Bucket=bucket_name, Prefix=table_name) 29 object_key = listing_response["Contents"][-1]["Key"] 30 path = f"s3://{bucket_name}/{object_key}" 31 response = wr.s3.read_parquet([path]) 32 return response 33 34 except Exception as e: 35 raise RuntimeError(f"Retrieval of data from processed bucket failed: {e}")
This function connects to the s3 bucket with transformed data and returns all the information in the given table as a dataframe.
Arguments:
table_name: a string representing the name of the table in the database that we want to extract.
bucket_name: a string representing the name of the s3 bucket with transformed data to connect to.
Returns:
A dataframe containing the most recent data for the given table in the bucket.
Raises:
RuntimeError: An error occurred during data retrieval.
def
load_dim_dates_into_warehouse(df):
38def load_dim_dates_into_warehouse(df): 39 """ 40 Loads data from a dataframe into the warehouse's dim_dates table. 41 42 # Arguments: 43 df: a dataframe representing the contents of the dim_dates table. 44 45 # Returns: 46 None. 47 """ 48 conn = create_conn() 49 if conn: 50 try: 51 query = """ 52 INSERT INTO dim_date 53 VALUES 54 """ 55 56 for _, row in df.iterrows(): 57 query += f"('{row['date_id']}', {row['year']}, {row['month']}, {row['day']}, {row['day_of_week']}, '{row['day_name']}', '{row['month_name']}', {row['quarter']}), " 58 query = query[:-2] 59 query += "ON CONFLICT (date_id) DO NOTHING;" 60 61 conn.run(query) 62 63 except Exception as e: 64 raise RuntimeError(f"Database query failed: {e}") 65 66 finally: 67 close_conn(conn)
Loads data from a dataframe into the warehouse's dim_dates table.
Arguments:
df: a dataframe representing the contents of the dim_dates table.
Returns:
None.
def
load_dim_staff_into_warehouse(df):
70def load_dim_staff_into_warehouse(df): 71 """ 72 Loads data from a dataframe into the warehouse's dim_staff table. 73 74 # Arguments: 75 df: a dataframe representing the contents of the dim_staff table. 76 77 # Returns: 78 None. 79 """ 80 conn = create_conn() 81 if conn: 82 try: 83 query = """ 84 INSERT INTO dim_staff 85 VALUES 86 """ 87 88 for _, row in df.iterrows(): 89 query += f"({row['staff_id']}, $${row['first_name']}$$, $${row['last_name']}$$, '{row['department_name']}', '{row['location']}', $${row['email_address']}$$), " 90 query = query[:-2] 91 query += " ON CONFLICT (staff_id) DO NOTHING;" 92 93 conn.run(query) 94 95 except Exception as e: 96 raise RuntimeError(f"Database query failed: {e}") 97 98 finally: 99 close_conn(conn)
Loads data from a dataframe into the warehouse's dim_staff table.
Arguments:
df: a dataframe representing the contents of the dim_staff table.
Returns:
None.
def
load_dim_location_into_warehouse(df):
102def load_dim_location_into_warehouse(df): 103 """ 104 Loads data from a dataframe into the warehouse's dim_location table. 105 106 # Arguments: 107 df: a dataframe representing the contents of the dim_location table. 108 109 # Returns: 110 None. 111 """ 112 conn = create_conn() 113 if conn: 114 try: 115 query = """ 116 INSERT INTO dim_location 117 VALUES 118 """ 119 120 for _, row in df.iterrows(): 121 query += f"({row['location_id']}, $${row['address_line_1']}$$, $${row['address_line_2']}$$, $${row['district']}$$, $${row['city']}$$, $${row['postal_code']}$$, $${row['country']}$$, $${row['phone']}$$), " 122 query = query[:-2] 123 query += " ON CONFLICT (location_id) DO NOTHING;" 124 125 conn.run(query) 126 127 except Exception as e: 128 raise RuntimeError(f"Database query failed: {e}") 129 130 finally: 131 close_conn(conn)
Loads data from a dataframe into the warehouse's dim_location table.
Arguments:
df: a dataframe representing the contents of the dim_location table.
Returns:
None.
def
load_dim_currency_into_warehouse(df):
134def load_dim_currency_into_warehouse(df): 135 """ 136 Loads data from a dataframe into the warehouse's dim_currency table. 137 138 # Arguments: 139 df: a dataframe representing the contents of the dim_currency table. 140 141 # Returns: 142 None. 143 """ 144 conn = create_conn() 145 if conn: 146 try: 147 query = """ 148 INSERT INTO dim_currency 149 VALUES 150 """ 151 152 for _, row in df.iterrows(): 153 query += f"({row['currency_id']}, $${row['currency_code']}$$, $${row['currency_name']}$$), " 154 query = query[:-2] 155 query += " ON CONFLICT (currency_id) DO NOTHING;" 156 157 conn.run(query) 158 159 except Exception as e: 160 raise RuntimeError(f"Database query failed: {e}") 161 162 finally: 163 close_conn(conn)
Loads data from a dataframe into the warehouse's dim_currency table.
Arguments:
df: a dataframe representing the contents of the dim_currency table.
Returns:
None.
def
load_dim_design_into_warehouse(df):
166def load_dim_design_into_warehouse(df): 167 """ 168 Loads data from a dataframe into the warehouse's dim_design table. 169 170 # Arguments: 171 df: a dataframe representing the contents of the dim_design table. 172 173 # Returns: 174 None. 175 """ 176 conn = create_conn() 177 if conn: 178 try: 179 query = """ 180 INSERT INTO dim_design 181 VALUES 182 """ 183 184 for _, row in df.iterrows(): 185 query += f"({row['design_id']}, $${row['design_name']}$$, $${row['file_location']}$$, $${row['file_name']}$$), " 186 query = query[:-2] 187 query += " ON CONFLICT (design_id) DO NOTHING;" 188 189 conn.run(query) 190 191 except Exception as e: 192 raise RuntimeError(f"Database query failed: {e}") 193 194 finally: 195 close_conn(conn)
Loads data from a dataframe into the warehouse's dim_design table.
Arguments:
df: a dataframe representing the contents of the dim_design table.
Returns:
None.
def
load_dim_counterparty_into_warehouse(df):
198def load_dim_counterparty_into_warehouse(df): 199 """ 200 Loads data from a dataframe into the warehouse's dim_counterparty table. 201 202 # Arguments: 203 df: a dataframe representing the contents of the dim_counterparty table. 204 205 # Returns: 206 None. 207 """ 208 conn = create_conn() 209 if conn: 210 try: 211 query = """ 212 INSERT INTO dim_counterparty 213 VALUES 214 """ 215 216 for _, row in df.iterrows(): 217 query += f"({row['counterparty_id']}, $${row['counterparty_legal_name']}$$, $${row['counterparty_legal_address_line_1']}$$, $${row['counterparty_legal_address_line_2']}$$, $${row['counterparty_legal_district']}$$, $${row['counterparty_legal_city']}$$, $${row['counterparty_legal_postal_code']}$$, $${row['counterparty_legal_country']}$$, $${row['counterparty_legal_phone']}$$), " 218 query = query[:-2] 219 query += " ON CONFLICT (counterparty_id) DO NOTHING;" 220 221 conn.run(query) 222 223 except Exception as e: 224 raise RuntimeError(f"Database query failed: {e}") 225 226 finally: 227 close_conn(conn)
Loads data from a dataframe into the warehouse's dim_counterparty table.
Arguments:
df: a dataframe representing the contents of the dim_counterparty table.
Returns:
None.
def
load_fact_sales_order_into_warehouse(df):
230def load_fact_sales_order_into_warehouse(df): 231 """ 232 Loads data from a dataframe into the warehouse's fact_sales_order table. Updates to a sales order are stored as new rows, without overwriting previous data. 233 234 # Arguments: 235 df: a dataframe representing the contents of the fact_sales_order table. 236 237 # Returns: 238 None. 239 """ 240 conn = create_conn() 241 if conn: 242 try: 243 temp_create_query = """ 244 CREATE TEMP TABLE temp_sales_order ( 245 sales_record_id SERIAL PRIMARY KEY NOT NULL, 246 sales_order_id INT NOT NULL, 247 created_date DATE NOT NULL, 248 created_time TIME NOT NULL DEFAULT CURRENT_TIME, 249 last_updated_date DATE NOT NULL, 250 last_updated_time TIME NOT NULL DEFAULT CURRENT_TIME, 251 sales_staff_id INT NOT NULL, 252 counterparty_id INT NOT NULL, 253 units_sold INT NOT NULL, 254 unit_price NUMERIC(10, 2) NOT NULL, 255 currency_id INT NOT NULL, 256 design_id INT NOT NULL, 257 agreed_payment_date DATE NOT NULL, 258 agreed_delivery_date DATE NOT NULL, 259 agreed_delivery_location_id INT NOT NULL 260 ); 261 """ 262 263 temp_insert_query = """ 264 INSERT INTO temp_sales_order 265 (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id) 266 VALUES 267 """ 268 269 for _, row in df.iterrows(): 270 temp_insert_query += f"({row['sales_order_id']}, '{row['created_date']}', $${row['created_time']}$$, '{row['last_updated_date']}', $${row['last_updated_time']}$$, {row['sales_staff_id']}, {row['counterparty_id']}, {row['units_sold']}, {row['unit_price']}, {row['currency_id']}, {row['design_id']}, '{row['agreed_payment_date']}', '{row['agreed_delivery_date']}', {row['agreed_delivery_location_id']}), " 271 272 temp_insert_query = temp_insert_query[:-2] 273 temp_insert_query += ";" 274 275 query = """ 276 INSERT INTO fact_sales_order 277 (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id) 278 279 SELECT 280 sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id 281 FROM temp_sales_order 282 283 WHERE 284 (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id) 285 286 NOT IN 287 (SELECT 288 sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id 289 FROM fact_sales_order) 290 ; 291 """ 292 conn.run(temp_create_query) 293 conn.run(temp_insert_query) 294 conn.run(query) 295 296 except Exception as e: 297 raise RuntimeError(f"Database query failed: {e}") 298 299 finally: 300 close_conn(conn)
Loads data from a dataframe into the warehouse's fact_sales_order table. Updates to a sales order are stored as new rows, without overwriting previous data.
Arguments:
df: a dataframe representing the contents of the fact_sales_order table.
Returns:
None.