load.load_utils

Contains the utility functions for the load lambda_handler.

  1"""
  2Contains the utility functions for the load lambda_handler.
  3"""
  4
  5import awswrangler as wr
  6import boto3
  7
  8from src.utils.db_connection import close_conn, create_conn
  9
 10
 11def access_files_from_processed_bucket(table_name, bucket_name):
 12    """
 13    This function connects to the s3 bucket with transformed data and returns all the information in the given table as a dataframe.
 14
 15    # Arguments:
 16        table_name: a string representing the name of the table in the database that we want to extract.
 17        bucket_name: a string representing the name of the s3 bucket with transformed data to connect to.
 18
 19    # Returns:
 20        A dataframe containing the most recent data for the given table in the bucket.
 21
 22    # Raises:
 23        RuntimeError: An error occurred during data retrieval.
 24    """
 25    try:
 26        client = boto3.client("s3")
 27        listing_response = client.list_objects_v2(Bucket=bucket_name, Prefix=table_name)
 28        object_key = listing_response["Contents"][-1]["Key"]
 29        path = f"s3://{bucket_name}/{object_key}"
 30        response = wr.s3.read_parquet([path])
 31        return response
 32
 33    except Exception as e:
 34        raise RuntimeError(f"Retrieval of data from processed bucket failed: {e}")
 35
 36
 37def load_dim_dates_into_warehouse(df):
 38    """
 39    Loads data from a dataframe into the warehouse's dim_dates table.
 40
 41    # Arguments:
 42        df: a dataframe representing the contents of the dim_dates table.
 43
 44    # Returns:
 45        None.
 46    """
 47    conn = create_conn()
 48    if conn:
 49        try:
 50            query = """
 51            INSERT INTO dim_date
 52            VALUES
 53            """
 54
 55            for _, row in df.iterrows():
 56                query += f"('{row['date_id']}', {row['year']}, {row['month']}, {row['day']}, {row['day_of_week']}, '{row['day_name']}', '{row['month_name']}', {row['quarter']}), "
 57            query = query[:-2]
 58            query += "ON CONFLICT (date_id) DO NOTHING;"
 59
 60            conn.run(query)
 61
 62        except Exception as e:
 63            raise RuntimeError(f"Database query failed: {e}")
 64
 65        finally:
 66            close_conn(conn)
 67
 68
 69def load_dim_staff_into_warehouse(df):
 70    """
 71    Loads data from a dataframe into the warehouse's dim_staff table.
 72
 73    # Arguments:
 74        df: a dataframe representing the contents of the dim_staff table.
 75
 76    # Returns:
 77        None.
 78    """
 79    conn = create_conn()
 80    if conn:
 81        try:
 82            query = """
 83            INSERT INTO dim_staff
 84            VALUES
 85            """
 86
 87            for _, row in df.iterrows():
 88                query += f"({row['staff_id']}, $${row['first_name']}$$, $${row['last_name']}$$, '{row['department_name']}', '{row['location']}', $${row['email_address']}$$), "
 89            query = query[:-2]
 90            query += " ON CONFLICT (staff_id) DO NOTHING;"
 91
 92            conn.run(query)
 93
 94        except Exception as e:
 95            raise RuntimeError(f"Database query failed: {e}")
 96
 97        finally:
 98            close_conn(conn)
 99
100
101def load_dim_location_into_warehouse(df):
102    """
103    Loads data from a dataframe into the warehouse's dim_location table.
104
105    # Arguments:
106        df: a dataframe representing the contents of the dim_location table.
107
108    # Returns:
109        None.
110    """
111    conn = create_conn()
112    if conn:
113        try:
114            query = """
115            INSERT INTO dim_location
116            VALUES
117            """
118
119            for _, row in df.iterrows():
120                query += f"({row['location_id']}, $${row['address_line_1']}$$, $${row['address_line_2']}$$, $${row['district']}$$, $${row['city']}$$, $${row['postal_code']}$$, $${row['country']}$$, $${row['phone']}$$), "
121            query = query[:-2]
122            query += " ON CONFLICT (location_id) DO NOTHING;"
123
124            conn.run(query)
125
126        except Exception as e:
127            raise RuntimeError(f"Database query failed: {e}")
128
129        finally:
130            close_conn(conn)
131
132
133def load_dim_currency_into_warehouse(df):
134    """
135    Loads data from a dataframe into the warehouse's dim_currency table.
136
137    # Arguments:
138        df: a dataframe representing the contents of the dim_currency table.
139
140    # Returns:
141        None.
142    """
143    conn = create_conn()
144    if conn:
145        try:
146            query = """
147            INSERT INTO dim_currency
148            VALUES
149            """
150
151            for _, row in df.iterrows():
152                query += f"({row['currency_id']}, $${row['currency_code']}$$, $${row['currency_name']}$$), "
153            query = query[:-2]
154            query += " ON CONFLICT (currency_id) DO NOTHING;"
155
156            conn.run(query)
157
158        except Exception as e:
159            raise RuntimeError(f"Database query failed: {e}")
160
161        finally:
162            close_conn(conn)
163
164
165def load_dim_design_into_warehouse(df):
166    """
167    Loads data from a dataframe into the warehouse's dim_design table.
168
169    # Arguments:
170        df: a dataframe representing the contents of the dim_design table.
171
172    # Returns:
173        None.
174    """
175    conn = create_conn()
176    if conn:
177        try:
178            query = """
179            INSERT INTO dim_design
180            VALUES
181            """
182
183            for _, row in df.iterrows():
184                query += f"({row['design_id']}, $${row['design_name']}$$, $${row['file_location']}$$, $${row['file_name']}$$), "
185            query = query[:-2]
186            query += " ON CONFLICT (design_id) DO NOTHING;"
187
188            conn.run(query)
189
190        except Exception as e:
191            raise RuntimeError(f"Database query failed: {e}")
192
193        finally:
194            close_conn(conn)
195
196
197def load_dim_counterparty_into_warehouse(df):
198    """
199    Loads data from a dataframe into the warehouse's dim_counterparty table.
200
201    # Arguments:
202        df: a dataframe representing the contents of the dim_counterparty table.
203
204    # Returns:
205        None.
206    """
207    conn = create_conn()
208    if conn:
209        try:
210            query = """
211            INSERT INTO dim_counterparty
212            VALUES
213            """
214
215            for _, row in df.iterrows():
216                query += f"({row['counterparty_id']}, $${row['counterparty_legal_name']}$$, $${row['counterparty_legal_address_line_1']}$$, $${row['counterparty_legal_address_line_2']}$$, $${row['counterparty_legal_district']}$$, $${row['counterparty_legal_city']}$$, $${row['counterparty_legal_postal_code']}$$, $${row['counterparty_legal_country']}$$, $${row['counterparty_legal_phone']}$$), "
217            query = query[:-2]
218            query += " ON CONFLICT (counterparty_id) DO NOTHING;"
219
220            conn.run(query)
221
222        except Exception as e:
223            raise RuntimeError(f"Database query failed: {e}")
224
225        finally:
226            close_conn(conn)
227
228
229def load_fact_sales_order_into_warehouse(df):
230    """
231    Loads data from a dataframe into the warehouse's fact_sales_order table. Updates to a sales order are stored as new rows, without overwriting previous data.
232
233    # Arguments:
234        df: a dataframe representing the contents of the fact_sales_order table.
235
236    # Returns:
237        None.
238    """
239    conn = create_conn()
240    if conn:
241        try:
242            temp_create_query = """
243            CREATE TEMP TABLE temp_sales_order (
244                sales_record_id SERIAL PRIMARY KEY NOT NULL,
245                sales_order_id INT NOT NULL,
246                created_date DATE NOT NULL,
247                created_time TIME NOT NULL DEFAULT CURRENT_TIME,
248                last_updated_date DATE NOT NULL,
249                last_updated_time TIME NOT NULL DEFAULT CURRENT_TIME,
250                sales_staff_id INT NOT NULL,
251                counterparty_id INT NOT NULL,
252                units_sold INT NOT NULL,
253                unit_price NUMERIC(10, 2) NOT NULL,
254                currency_id INT NOT NULL,
255                design_id INT NOT NULL,
256                agreed_payment_date DATE NOT NULL,
257                agreed_delivery_date DATE NOT NULL,
258                agreed_delivery_location_id INT NOT NULL
259                );
260            """
261
262            temp_insert_query = """
263            INSERT INTO temp_sales_order
264            (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id)
265            VALUES
266            """
267
268            for _, row in df.iterrows():
269                temp_insert_query += f"({row['sales_order_id']}, '{row['created_date']}', $${row['created_time']}$$, '{row['last_updated_date']}', $${row['last_updated_time']}$$, {row['sales_staff_id']}, {row['counterparty_id']}, {row['units_sold']}, {row['unit_price']}, {row['currency_id']}, {row['design_id']}, '{row['agreed_payment_date']}', '{row['agreed_delivery_date']}', {row['agreed_delivery_location_id']}), "
270
271            temp_insert_query = temp_insert_query[:-2]
272            temp_insert_query += ";"
273
274            query = """
275            INSERT INTO fact_sales_order
276            (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id)
277
278            SELECT
279            sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id
280            FROM temp_sales_order
281
282            WHERE
283            (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id)
284
285            NOT IN
286            (SELECT
287            sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id
288            FROM fact_sales_order)
289            ;
290            """
291            conn.run(temp_create_query)
292            conn.run(temp_insert_query)
293            conn.run(query)
294
295        except Exception as e:
296            raise RuntimeError(f"Database query failed: {e}")
297
298        finally:
299            close_conn(conn)
def access_files_from_processed_bucket(table_name, bucket_name):
12def access_files_from_processed_bucket(table_name, bucket_name):
13    """
14    This function connects to the s3 bucket with transformed data and returns all the information in the given table as a dataframe.
15
16    # Arguments:
17        table_name: a string representing the name of the table in the database that we want to extract.
18        bucket_name: a string representing the name of the s3 bucket with transformed data to connect to.
19
20    # Returns:
21        A dataframe containing the most recent data for the given table in the bucket.
22
23    # Raises:
24        RuntimeError: An error occurred during data retrieval.
25    """
26    try:
27        client = boto3.client("s3")
28        listing_response = client.list_objects_v2(Bucket=bucket_name, Prefix=table_name)
29        object_key = listing_response["Contents"][-1]["Key"]
30        path = f"s3://{bucket_name}/{object_key}"
31        response = wr.s3.read_parquet([path])
32        return response
33
34    except Exception as e:
35        raise RuntimeError(f"Retrieval of data from processed bucket failed: {e}")

This function connects to the s3 bucket with transformed data and returns all the information in the given table as a dataframe.

Arguments:

table_name: a string representing the name of the table in the database that we want to extract.
bucket_name: a string representing the name of the s3 bucket with transformed data to connect to.

Returns:

A dataframe containing the most recent data for the given table in the bucket.

Raises:

RuntimeError: An error occurred during data retrieval.
def load_dim_dates_into_warehouse(df):
38def load_dim_dates_into_warehouse(df):
39    """
40    Loads data from a dataframe into the warehouse's dim_dates table.
41
42    # Arguments:
43        df: a dataframe representing the contents of the dim_dates table.
44
45    # Returns:
46        None.
47    """
48    conn = create_conn()
49    if conn:
50        try:
51            query = """
52            INSERT INTO dim_date
53            VALUES
54            """
55
56            for _, row in df.iterrows():
57                query += f"('{row['date_id']}', {row['year']}, {row['month']}, {row['day']}, {row['day_of_week']}, '{row['day_name']}', '{row['month_name']}', {row['quarter']}), "
58            query = query[:-2]
59            query += "ON CONFLICT (date_id) DO NOTHING;"
60
61            conn.run(query)
62
63        except Exception as e:
64            raise RuntimeError(f"Database query failed: {e}")
65
66        finally:
67            close_conn(conn)

Loads data from a dataframe into the warehouse's dim_dates table.

Arguments:

df: a dataframe representing the contents of the dim_dates table.

Returns:

None.
def load_dim_staff_into_warehouse(df):
70def load_dim_staff_into_warehouse(df):
71    """
72    Loads data from a dataframe into the warehouse's dim_staff table.
73
74    # Arguments:
75        df: a dataframe representing the contents of the dim_staff table.
76
77    # Returns:
78        None.
79    """
80    conn = create_conn()
81    if conn:
82        try:
83            query = """
84            INSERT INTO dim_staff
85            VALUES
86            """
87
88            for _, row in df.iterrows():
89                query += f"({row['staff_id']}, $${row['first_name']}$$, $${row['last_name']}$$, '{row['department_name']}', '{row['location']}', $${row['email_address']}$$), "
90            query = query[:-2]
91            query += " ON CONFLICT (staff_id) DO NOTHING;"
92
93            conn.run(query)
94
95        except Exception as e:
96            raise RuntimeError(f"Database query failed: {e}")
97
98        finally:
99            close_conn(conn)

Loads data from a dataframe into the warehouse's dim_staff table.

Arguments:

df: a dataframe representing the contents of the dim_staff table.

Returns:

None.
def load_dim_location_into_warehouse(df):
102def load_dim_location_into_warehouse(df):
103    """
104    Loads data from a dataframe into the warehouse's dim_location table.
105
106    # Arguments:
107        df: a dataframe representing the contents of the dim_location table.
108
109    # Returns:
110        None.
111    """
112    conn = create_conn()
113    if conn:
114        try:
115            query = """
116            INSERT INTO dim_location
117            VALUES
118            """
119
120            for _, row in df.iterrows():
121                query += f"({row['location_id']}, $${row['address_line_1']}$$, $${row['address_line_2']}$$, $${row['district']}$$, $${row['city']}$$, $${row['postal_code']}$$, $${row['country']}$$, $${row['phone']}$$), "
122            query = query[:-2]
123            query += " ON CONFLICT (location_id) DO NOTHING;"
124
125            conn.run(query)
126
127        except Exception as e:
128            raise RuntimeError(f"Database query failed: {e}")
129
130        finally:
131            close_conn(conn)

Loads data from a dataframe into the warehouse's dim_location table.

Arguments:

df: a dataframe representing the contents of the dim_location table.

Returns:

None.
def load_dim_currency_into_warehouse(df):
134def load_dim_currency_into_warehouse(df):
135    """
136    Loads data from a dataframe into the warehouse's dim_currency table.
137
138    # Arguments:
139        df: a dataframe representing the contents of the dim_currency table.
140
141    # Returns:
142        None.
143    """
144    conn = create_conn()
145    if conn:
146        try:
147            query = """
148            INSERT INTO dim_currency
149            VALUES
150            """
151
152            for _, row in df.iterrows():
153                query += f"({row['currency_id']}, $${row['currency_code']}$$, $${row['currency_name']}$$), "
154            query = query[:-2]
155            query += " ON CONFLICT (currency_id) DO NOTHING;"
156
157            conn.run(query)
158
159        except Exception as e:
160            raise RuntimeError(f"Database query failed: {e}")
161
162        finally:
163            close_conn(conn)

Loads data from a dataframe into the warehouse's dim_currency table.

Arguments:

df: a dataframe representing the contents of the dim_currency table.

Returns:

None.
def load_dim_design_into_warehouse(df):
166def load_dim_design_into_warehouse(df):
167    """
168    Loads data from a dataframe into the warehouse's dim_design table.
169
170    # Arguments:
171        df: a dataframe representing the contents of the dim_design table.
172
173    # Returns:
174        None.
175    """
176    conn = create_conn()
177    if conn:
178        try:
179            query = """
180            INSERT INTO dim_design
181            VALUES
182            """
183
184            for _, row in df.iterrows():
185                query += f"({row['design_id']}, $${row['design_name']}$$, $${row['file_location']}$$, $${row['file_name']}$$), "
186            query = query[:-2]
187            query += " ON CONFLICT (design_id) DO NOTHING;"
188
189            conn.run(query)
190
191        except Exception as e:
192            raise RuntimeError(f"Database query failed: {e}")
193
194        finally:
195            close_conn(conn)

Loads data from a dataframe into the warehouse's dim_design table.

Arguments:

df: a dataframe representing the contents of the dim_design table.

Returns:

None.
def load_dim_counterparty_into_warehouse(df):
198def load_dim_counterparty_into_warehouse(df):
199    """
200    Loads data from a dataframe into the warehouse's dim_counterparty table.
201
202    # Arguments:
203        df: a dataframe representing the contents of the dim_counterparty table.
204
205    # Returns:
206        None.
207    """
208    conn = create_conn()
209    if conn:
210        try:
211            query = """
212            INSERT INTO dim_counterparty
213            VALUES
214            """
215
216            for _, row in df.iterrows():
217                query += f"({row['counterparty_id']}, $${row['counterparty_legal_name']}$$, $${row['counterparty_legal_address_line_1']}$$, $${row['counterparty_legal_address_line_2']}$$, $${row['counterparty_legal_district']}$$, $${row['counterparty_legal_city']}$$, $${row['counterparty_legal_postal_code']}$$, $${row['counterparty_legal_country']}$$, $${row['counterparty_legal_phone']}$$), "
218            query = query[:-2]
219            query += " ON CONFLICT (counterparty_id) DO NOTHING;"
220
221            conn.run(query)
222
223        except Exception as e:
224            raise RuntimeError(f"Database query failed: {e}")
225
226        finally:
227            close_conn(conn)

Loads data from a dataframe into the warehouse's dim_counterparty table.

Arguments:

df: a dataframe representing the contents of the dim_counterparty table.

Returns:

None.
def load_fact_sales_order_into_warehouse(df):
230def load_fact_sales_order_into_warehouse(df):
231    """
232    Loads data from a dataframe into the warehouse's fact_sales_order table. Updates to a sales order are stored as new rows, without overwriting previous data.
233
234    # Arguments:
235        df: a dataframe representing the contents of the fact_sales_order table.
236
237    # Returns:
238        None.
239    """
240    conn = create_conn()
241    if conn:
242        try:
243            temp_create_query = """
244            CREATE TEMP TABLE temp_sales_order (
245                sales_record_id SERIAL PRIMARY KEY NOT NULL,
246                sales_order_id INT NOT NULL,
247                created_date DATE NOT NULL,
248                created_time TIME NOT NULL DEFAULT CURRENT_TIME,
249                last_updated_date DATE NOT NULL,
250                last_updated_time TIME NOT NULL DEFAULT CURRENT_TIME,
251                sales_staff_id INT NOT NULL,
252                counterparty_id INT NOT NULL,
253                units_sold INT NOT NULL,
254                unit_price NUMERIC(10, 2) NOT NULL,
255                currency_id INT NOT NULL,
256                design_id INT NOT NULL,
257                agreed_payment_date DATE NOT NULL,
258                agreed_delivery_date DATE NOT NULL,
259                agreed_delivery_location_id INT NOT NULL
260                );
261            """
262
263            temp_insert_query = """
264            INSERT INTO temp_sales_order
265            (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id)
266            VALUES
267            """
268
269            for _, row in df.iterrows():
270                temp_insert_query += f"({row['sales_order_id']}, '{row['created_date']}', $${row['created_time']}$$, '{row['last_updated_date']}', $${row['last_updated_time']}$$, {row['sales_staff_id']}, {row['counterparty_id']}, {row['units_sold']}, {row['unit_price']}, {row['currency_id']}, {row['design_id']}, '{row['agreed_payment_date']}', '{row['agreed_delivery_date']}', {row['agreed_delivery_location_id']}), "
271
272            temp_insert_query = temp_insert_query[:-2]
273            temp_insert_query += ";"
274
275            query = """
276            INSERT INTO fact_sales_order
277            (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id)
278
279            SELECT
280            sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id
281            FROM temp_sales_order
282
283            WHERE
284            (sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id)
285
286            NOT IN
287            (SELECT
288            sales_order_id, created_date, created_time, last_updated_date, last_updated_time, sales_staff_id, counterparty_id, units_sold, unit_price, currency_id, design_id, agreed_payment_date, agreed_delivery_date, agreed_delivery_location_id
289            FROM fact_sales_order)
290            ;
291            """
292            conn.run(temp_create_query)
293            conn.run(temp_insert_query)
294            conn.run(query)
295
296        except Exception as e:
297            raise RuntimeError(f"Database query failed: {e}")
298
299        finally:
300            close_conn(conn)

Loads data from a dataframe into the warehouse's fact_sales_order table. Updates to a sales order are stored as new rows, without overwriting previous data.

Arguments:

df: a dataframe representing the contents of the fact_sales_order table.

Returns:

None.