<aside>

Project By: Khaireddine Arbouch


Parallel ETL Pipeline with Apache Airflow - Khaireddine Arbouch

</aside>

<aside> Project Outline

Master parallel ETL orchestration with Airflow on AWS EC2, using data from FakeStoreAPI and storing it in AWS RDS Postgres—with a focus on parallel task execution, task groups, and AWS best practices.

MODULE 1: Project Setup on AWS


1.1. Launch AWS EC2 Instance (Ubuntu 20.04 preferred)

1.2. Create and Connect to AWS RDS Postgres

https://youtu.be/im2K-dME5gM


MODULE 2: Designing the Pipeline


2.1. Understand the FakeStoreAPI

API Docs: https://fakestoreapi.com

2.2. Design Your ETL Workflow

We'll orchestrate:

**start_pipeline
    |
group_a: extract categories and upload to Postgres
    |
group_b: extract products by category (in parallel), transform, and load
    |
join_products_with_categories
    |
save_to_s3
    |
end_pipeline**

Airflow ETL Pipeline with Parallel Tasks

Airflow ETL Pipeline with Parallel Tasks


MODULE 3: Airflow DAG Breakdown


Create a DAG similar to your original weather one.

DAG Components


MODULE 4: Parallel Task Execution


4.1. Dynamic Task Mapping (Airflow 2.3+)

**@dag(...)
def my_parallel_dag():
    categories = get_categories()

    extract_transform_load = extract_products_by_category.expand(
        category=categories
    )**

4.2. Or Use TaskGroups for Manual Parallelization

Split product categories like:

**with TaskGroup("load_products_group") as load_products_group:
    for category in categories:
        t1 = SimpleHttpOperator(task_id=f'extract_{category}', ...)
        t2 = PythonOperator(task_id=f'transform_{category}', ...)
        t3 = PythonOperator(task_id=f'load_{category}', ...)
        t1 >> t2 >> t3**

MODULE 5: Deploy on EC2 + Connect to RDS



MODULE 6: Advanced Enhancements



HANDS-ON PROJECT: "Use Airflow to Orchestrate a Parallel Processing ETL Pipeline on AWS EC2"

Project Objective:

Pull product categories from FakeStoreAPI, then in parallel pull product data by category, transform it, store in AWS RDS Postgres, and finally export it to S3.

Tech Stack:


Complete Airflow DAG for Parallel ETL Pipeline on AWS EC2

image.png

**from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
import json
import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook

# Constants
CATEGORIES = ["electronics", "jewelery", "men's clothing", "women's clothing"]
S3_BUCKET = "your-s3-bucket-name"  # change this
POSTGRES_CONN_ID = "postgres_conn"  # your Airflow connection to RDS
FAKESTORE_API_CONN_ID = "fakestore_api"  # define this HTTP connection in Airflow

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2025, 1, 1),
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

def transform_products_data(**context):
    """Transform raw product data from XCom, save CSV"""
    raw_data = context['ti'].xcom_pull(task_ids=context['task_ids'])
    
    # raw_data is a list of lists (one per category), flatten it
    all_products = []
    for category_data in raw_data:
        if category_data:
            all_products.extend(category_data)
    
    # Transform to DataFrame
    df = pd.DataFrame(all_products)
    # Basic cleaning / transformation if needed
    # e.g. convert price to float, fill missing values, etc.
    df['price'] = df['price'].astype(float)
    
    # Save CSV locally
    csv_file = "/tmp/fakestore_products.csv"
    df.to_csv(csv_file, index=False)
    
    return csv_file

def load_products_to_postgres(**context):
    """Load CSV into Postgres using COPY"""
    csv_file = context['ti'].xcom_pull(task_ids='transform_products')
    hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
    
    # Create table if not exists (id, title, price, description, category, image, rating)
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS fakestore_products (
        id INT PRIMARY KEY,
        title TEXT,
        price NUMERIC,
        description TEXT,
        category TEXT,
        image TEXT,
        rating_rate NUMERIC,
        rating_count INT
    );
    """
    hook.run(create_table_sql)
    
    # Copy CSV data into table
    with open(csv_file, 'r') as f:
        # Using copy_expert to specify columns
        hook.copy_expert(
            sql="COPY fakestore_products (id, title, price, description, category, image, rating_rate, rating_count) FROM STDIN WITH CSV HEADER",
            filename=csv_file
        )

def export_to_s3(**context):
    """Export joined data from Postgres to S3 (simulate here by saving a CSV)"""
    hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
    
    # Run a query to get all products (or join with other tables if you have)
    sql = "SELECT * FROM fakestore_products;"
    df = hook.get_pandas_df(sql)
    
    # Save CSV locally
    now_str = datetime.now().strftime("%Y%m%d%H%M%S")
    filename = f"/tmp/fakestore_products_export_{now_str}.csv"
    df.to_csv(filename, index=False)
    
    # Here, to upload to S3, you can use boto3 or AWS CLI - for now we just simulate saving
    
    print(f"Exported file saved at {filename}")
    return filename

with DAG(
    "fakestore_parallel_etl",
    default_args=default_args,
    schedule_interval="@daily",
    catchup=False,
    max_active_runs=1,
    tags=["fakestore", "etl", "parallel"],
) as dag:

    start = DummyOperator(task_id="start")

    # Sensor to check if API is available
    check_api = HttpSensor(
        task_id="check_fakestore_api",
        http_conn_id=FAKESTORE_API_CONN_ID,
        endpoint="products",
        method="GET",
        response_check=lambda response: response.status_code == 200,
        poke_interval=10,
        timeout=60,
    )

    # Task group for parallel extraction by category
    with TaskGroup("extract_by_category") as extract_group:

        extract_tasks = {}
        for cat in CATEGORIES:

            # Because FakeStoreAPI doesn't have category filter in the products endpoint,
            # we can fetch all and filter in transform or alternatively,
            # but for demonstration, we assume you have category-based endpoints:
            endpoint = f"products/category/{cat}"

            extract_task = SimpleHttpOperator(
                task_id=f"extract_{cat.replace(' ', '_')}",
                http_conn_id=FAKESTORE_API_CONN_ID,
                endpoint=endpoint,
                method="GET",
                response_filter=lambda r: json.loads(r.text),
                log_response=True,
            )
            extract_tasks[cat] = extract_task

    transform = PythonOperator(
        task_id="transform_products",
        python_callable=transform_products_data,
        op_kwargs={"task_ids": [t.task_id for t in extract_tasks.values()]},
        provide_context=True,
    )

    load = PythonOperator(
        task_id="load_products",
        python_callable=load_products_to_postgres,
        provide_context=True,
    )

    export = PythonOperator(
        task_id="export_to_s3",
        python_callable=export_to_s3,
        provide_context=True,
    )

    end = DummyOperator(task_id="end")

    # Define dependencies
    start >> check_api >> extract_group >> transform >> load >> export >> end**

</aside>