<aside>
Project By: Khaireddine Arbouch
Parallel ETL Pipeline with Apache Airflow - Khaireddine Arbouch
</aside>
<aside> Project Outline
Master parallel ETL orchestration with Airflow on AWS EC2, using data from FakeStoreAPI and storing it in AWS RDS Postgres—with a focus on parallel task execution, task groups, and AWS best practices.
Type: t2.medium
or better
Open ports:
Install Airflow with pip, use:
**python3 -m venv airflow_env
source airflow_env/bin/activate
pip install apache-airflow[postgres,aws]**
products
, categories
, etc.)API Docs: https://fakestoreapi.com
/products
/products/categories
/products/category/electronics
We'll orchestrate:
**start_pipeline
|
group_a: extract categories and upload to Postgres
|
group_b: extract products by category (in parallel), transform, and load
|
join_products_with_categories
|
save_to_s3
|
end_pipeline**
Airflow ETL Pipeline with Parallel Tasks
Create a DAG similar to your original weather one.
**@dag(...)
def my_parallel_dag():
categories = get_categories()
extract_transform_load = extract_products_by_category.expand(
category=categories
)**
Split product categories like:
**with TaskGroup("load_products_group") as load_products_group:
for category in categories:
t1 = SimpleHttpOperator(task_id=f'extract_{category}', ...)
t2 = PythonOperator(task_id=f'transform_{category}', ...)
t3 = PythonOperator(task_id=f'load_{category}', ...)
t1 >> t2 >> t3**
airflow.cfg
to use RDS as metadata DBpostgres_conn
)airflow dags trigger <dag_id>
boto3
)pytest
for Python callablesPull product categories from FakeStoreAPI, then in parallel pull product data by category, transform it, store in AWS RDS Postgres, and finally export it to S3.
**from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
import json
import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook
# Constants
CATEGORIES = ["electronics", "jewelery", "men's clothing", "women's clothing"]
S3_BUCKET = "your-s3-bucket-name" # change this
POSTGRES_CONN_ID = "postgres_conn" # your Airflow connection to RDS
FAKESTORE_API_CONN_ID = "fakestore_api" # define this HTTP connection in Airflow
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 1, 1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
def transform_products_data(**context):
"""Transform raw product data from XCom, save CSV"""
raw_data = context['ti'].xcom_pull(task_ids=context['task_ids'])
# raw_data is a list of lists (one per category), flatten it
all_products = []
for category_data in raw_data:
if category_data:
all_products.extend(category_data)
# Transform to DataFrame
df = pd.DataFrame(all_products)
# Basic cleaning / transformation if needed
# e.g. convert price to float, fill missing values, etc.
df['price'] = df['price'].astype(float)
# Save CSV locally
csv_file = "/tmp/fakestore_products.csv"
df.to_csv(csv_file, index=False)
return csv_file
def load_products_to_postgres(**context):
"""Load CSV into Postgres using COPY"""
csv_file = context['ti'].xcom_pull(task_ids='transform_products')
hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
# Create table if not exists (id, title, price, description, category, image, rating)
create_table_sql = """
CREATE TABLE IF NOT EXISTS fakestore_products (
id INT PRIMARY KEY,
title TEXT,
price NUMERIC,
description TEXT,
category TEXT,
image TEXT,
rating_rate NUMERIC,
rating_count INT
);
"""
hook.run(create_table_sql)
# Copy CSV data into table
with open(csv_file, 'r') as f:
# Using copy_expert to specify columns
hook.copy_expert(
sql="COPY fakestore_products (id, title, price, description, category, image, rating_rate, rating_count) FROM STDIN WITH CSV HEADER",
filename=csv_file
)
def export_to_s3(**context):
"""Export joined data from Postgres to S3 (simulate here by saving a CSV)"""
hook = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
# Run a query to get all products (or join with other tables if you have)
sql = "SELECT * FROM fakestore_products;"
df = hook.get_pandas_df(sql)
# Save CSV locally
now_str = datetime.now().strftime("%Y%m%d%H%M%S")
filename = f"/tmp/fakestore_products_export_{now_str}.csv"
df.to_csv(filename, index=False)
# Here, to upload to S3, you can use boto3 or AWS CLI - for now we just simulate saving
print(f"Exported file saved at {filename}")
return filename
with DAG(
"fakestore_parallel_etl",
default_args=default_args,
schedule_interval="@daily",
catchup=False,
max_active_runs=1,
tags=["fakestore", "etl", "parallel"],
) as dag:
start = DummyOperator(task_id="start")
# Sensor to check if API is available
check_api = HttpSensor(
task_id="check_fakestore_api",
http_conn_id=FAKESTORE_API_CONN_ID,
endpoint="products",
method="GET",
response_check=lambda response: response.status_code == 200,
poke_interval=10,
timeout=60,
)
# Task group for parallel extraction by category
with TaskGroup("extract_by_category") as extract_group:
extract_tasks = {}
for cat in CATEGORIES:
# Because FakeStoreAPI doesn't have category filter in the products endpoint,
# we can fetch all and filter in transform or alternatively,
# but for demonstration, we assume you have category-based endpoints:
endpoint = f"products/category/{cat}"
extract_task = SimpleHttpOperator(
task_id=f"extract_{cat.replace(' ', '_')}",
http_conn_id=FAKESTORE_API_CONN_ID,
endpoint=endpoint,
method="GET",
response_filter=lambda r: json.loads(r.text),
log_response=True,
)
extract_tasks[cat] = extract_task
transform = PythonOperator(
task_id="transform_products",
python_callable=transform_products_data,
op_kwargs={"task_ids": [t.task_id for t in extract_tasks.values()]},
provide_context=True,
)
load = PythonOperator(
task_id="load_products",
python_callable=load_products_to_postgres,
provide_context=True,
)
export = PythonOperator(
task_id="export_to_s3",
python_callable=export_to_s3,
provide_context=True,
)
end = DummyOperator(task_id="end")
# Define dependencies
start >> check_api >> extract_group >> transform >> load >> export >> end**
</aside>