<aside>

Project by: Khaireddine Arbouch


Scalable Data Lakes and Data Warehouses on AWS - Khaireddine Arbouch

</aside>

<aside>

Modern AWS Data Engineering Stack — Breakdown

Layer Tool/Service Role
Orchestration Airflow (MWAA) Centralized DAG control for jobs across Glue, EMR, Redshift
Ingestion AWS S3 Raw object storage for input data
Schema Discovery Glue Crawler Automatically catalogs CSV/JSON/Parquet into Glue Data Catalog
Transform (1) Glue + PySpark Bronze → Silver: light cleanup, normalization
Transform (2) EMR + PySpark + Hudi Silver → Gold: heavy transformation, upserts, partitioning
Warehouse Redshift Structured tables for reporting, BI dashboards
Monitoring CloudWatch + Airflow Logs Track job status, failures
IAM + Roles IAM Secure access to S3, Glue, EMR, Redshift

https://youtu.be/rVrTiJRTviA


Component Role Diagram

aws_etl_pipeline.png


End-to-End Pipeline Modules

1. Ingestion + Schema Discovery

Inputs:

Tool:


2. Bronze ETL — Glue Job (Light Cleansing)

Tool:

Goals:

Sample Code (Glue_job.py):

**from pyspark.context import SparkContext
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

df = spark.read.csv("s3://your-bucket/raw/raw_customers.csv", header=True)
df_clean = df.dropDuplicates().na.drop()

df_clean.write.parquet("s3://your-bucket/bronze/customers/", mode="overwrite")**

3. Silver ETL — Enrich, Normalize (Glue or EMR)

This can either be in Glue or in PySpark on EMR:

**from pyspark.sql.functions import to_timestamp, col

df = spark.read.parquet("s3://your-bucket/bronze/customers/")
df_silver = df.withColumn("signup_date", to_timestamp(col("signup_date")))
df_silver.write.parquet("s3://your-bucket/silver/customers/", mode="overwrite")**

4. Gold ETL — Hudi on EMR (Heavy Lifting)

Tool:

Features:

Code (EMR_job.py):

**hudi_options = {
    'hoodie.table.name': 'gold_transactions',
    'hoodie.datasource.write.recordkey.field': 'transaction_id',
    'hoodie.datasource.write.partitionpath.field': 'transaction_date',
    'hoodie.datasource.write.table.name': 'gold_transactions',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.hive.sync.enable': 'true',
}

df.write.format("hudi").options(**hudi_options).mode("overwrite").save("s3://your-bucket/gold/transactions/")**

5. Load to Redshift

Tool:

File: Redshift_job.sql

**COPY reporting.transactions
FROM 's3://your-bucket/gold/transactions/'
IAM_ROLE 'arn:aws:iam::<account>:role/MyRedshiftRole'
FORMAT AS PARQUET;**

6. Orchestration with Airflow (MWAA)

Tool:

Code: mwaa_dag_demo.py

**from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def run_glue():
    boto3.client('glue').start_job_run(JobName='bronze_cleanser')

def run_emr():
    # Trigger EMR Step (e.g., using AddJobFlowSteps)

with DAG("end_to_end_etl", start_date=datetime(2023,1,1)) as dag:
    glue_task = PythonOperator(task_id="glue_clean", python_callable=run_glue)
    emr_task = PythonOperator(task_id="emr_transform", python_callable=run_emr)
    glue_task >> emr_task**

Data Lake Zones Summary

Layer Purpose Format Storage (S3 Path)
Raw Raw ingested CSVs CSV /raw/*.csv
Bronze Cleaned, typed data Parquet /bronze/
Silver Enriched, normalized datasets Parquet /silver/
Gold Final Hudi upserted datasets Hudi/Parquet /gold/
Reporting Redshift query-ready tables Redshift reporting.transactions
</aside>