<aside>
Project by: Khaireddine Arbouch
Scalable Data Lakes and Data Warehouses on AWS - Khaireddine Arbouch
</aside>
<aside>
Layer | Tool/Service | Role |
---|---|---|
Orchestration | Airflow (MWAA) | Centralized DAG control for jobs across Glue, EMR, Redshift |
Ingestion | AWS S3 | Raw object storage for input data |
Schema Discovery | Glue Crawler | Automatically catalogs CSV/JSON/Parquet into Glue Data Catalog |
Transform (1) | Glue + PySpark | Bronze → Silver: light cleanup, normalization |
Transform (2) | EMR + PySpark + Hudi | Silver → Gold: heavy transformation, upserts, partitioning |
Warehouse | Redshift | Structured tables for reporting, BI dashboards |
Monitoring | CloudWatch + Airflow Logs | Track job status, failures |
IAM + Roles | IAM | Secure access to S3, Glue, EMR, Redshift |
s3://your-bucket/raw/
raw_customers.csv
, raw_products.csv
, raw_transactions.csv
raw.customers
, raw.products
Glue_job.py
):**from pyspark.context import SparkContext
from awsglue.context import GlueContext
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
df = spark.read.csv("s3://your-bucket/raw/raw_customers.csv", header=True)
df_clean = df.dropDuplicates().na.drop()
df_clean.write.parquet("s3://your-bucket/bronze/customers/", mode="overwrite")**
This can either be in Glue or in PySpark on EMR:
**from pyspark.sql.functions import to_timestamp, col
df = spark.read.parquet("s3://your-bucket/bronze/customers/")
df_silver = df.withColumn("signup_date", to_timestamp(col("signup_date")))
df_silver.write.parquet("s3://your-bucket/silver/customers/", mode="overwrite")**
EMR_job.py
):**hudi_options = {
'hoodie.table.name': 'gold_transactions',
'hoodie.datasource.write.recordkey.field': 'transaction_id',
'hoodie.datasource.write.partitionpath.field': 'transaction_date',
'hoodie.datasource.write.table.name': 'gold_transactions',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.hive.sync.enable': 'true',
}
df.write.format("hudi").options(**hudi_options).mode("overwrite").save("s3://your-bucket/gold/transactions/")**
Redshift_job.sql
**COPY reporting.transactions
FROM 's3://your-bucket/gold/transactions/'
IAM_ROLE 'arn:aws:iam::<account>:role/MyRedshiftRole'
FORMAT AS PARQUET;**
mwaa_dag_demo.py
**from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def run_glue():
boto3.client('glue').start_job_run(JobName='bronze_cleanser')
def run_emr():
# Trigger EMR Step (e.g., using AddJobFlowSteps)
with DAG("end_to_end_etl", start_date=datetime(2023,1,1)) as dag:
glue_task = PythonOperator(task_id="glue_clean", python_callable=run_glue)
emr_task = PythonOperator(task_id="emr_transform", python_callable=run_emr)
glue_task >> emr_task**
Layer | Purpose | Format | Storage (S3 Path) |
---|---|---|---|
Raw | Raw ingested CSVs | CSV | /raw/*.csv |
Bronze | Cleaned, typed data | Parquet | /bronze/ |
Silver | Enriched, normalized datasets | Parquet | /silver/ |
Gold | Final Hudi upserted datasets | Hudi/Parquet | /gold/ |
Reporting | Redshift query-ready tables | Redshift | reporting.transactions |
</aside> |