ETL Mini-Pipeline — Extract, Transform, Load with Python & SQL

ETL Mini-Pipeline — Extract, Transform, Load with Python & SQL

An ETL pipeline extracts data from a source, transforms it in Python + SQL, and loads it into a target table. We build a complete, idempotent, restartable pipeline that aggregates Twitter metrics into a daily summary table.

import psycopg2
from psycopg2.extras import execute_values, RealDictCursor
import datetime
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)

DSN = dict(host="127.0.0.1", dbname="shared_db",
           user="your_db_user",
           password="your_db_password")

# ── Step 0: Ensure target table exists ───────────────────────────
CREATE_TARGET = """
CREATE TABLE IF NOT EXISTS tw_daily_metrics (
    report_date     DATE PRIMARY KEY,
    tweet_count     INT,
    unique_authors  INT,
    total_likes     BIGINT,
    total_retweets  BIGINT,
    total_impressions BIGINT,
    engagement_rate NUMERIC(8,4),
    top_hashtag     TEXT,
    new_users       INT,
    processed_at    TIMESTAMPTZ DEFAULT NOW()
);
"""

# ── Step 1: Extract — pull raw data for a given date ─────────────
EXTRACT_SQL = """
SELECT
    t.created_at::DATE                                  AS day,
    COUNT(t.id)                                         AS tweet_count,
    COUNT(DISTINCT t.user_id)                           AS unique_authors,
    SUM(t.like_count)                                   AS total_likes,
    SUM(t.retweet_count)                                AS total_retweets,
    SUM(t.impression_count)                             AS total_impressions
FROM tw_tweets t
WHERE t.created_at::DATE = %s
  AND t.retweet_of_id IS NULL
GROUP BY t.created_at::DATE
"""

TOP_HASHTAG_SQL = """
SELECT h.tag
FROM tw_tweet_hashtags th
JOIN tw_hashtags h ON h.id = th.hashtag_id
JOIN tw_tweets t   ON t.id = th.tweet_id
WHERE t.created_at::DATE = %s
GROUP BY h.tag
ORDER BY COUNT(*) DESC
LIMIT 1
"""

NEW_USERS_SQL = """
SELECT COUNT(*) AS cnt
FROM tw_users
WHERE created_at::DATE = %s
"""

# ── Step 2: Transform — enrich and compute derived metrics ────────
def transform(raw: dict, top_hashtag: str | None, new_users: int) -> dict:
    if not raw:
        return None
    impressions = raw["total_impressions"] or 1
    engagement  = (raw["total_likes"] + raw["total_retweets"]) / impressions * 100

    return {
        "report_date":      raw["day"],
        "tweet_count":      raw["tweet_count"],
        "unique_authors":   raw["unique_authors"],
        "total_likes":      raw["total_likes"],
        "total_retweets":   raw["total_retweets"],
        "total_impressions":raw["total_impressions"],
        "engagement_rate":  round(engagement, 4),
        "top_hashtag":      top_hashtag,
        "new_users":        new_users,
    }

# ── Step 3: Load — upsert into target (idempotent) ───────────────
UPSERT_SQL = """
INSERT INTO tw_daily_metrics
    (report_date, tweet_count, unique_authors, total_likes, total_retweets,
     total_impressions, engagement_rate, top_hashtag, new_users, processed_at)
VALUES
    (%(report_date)s, %(tweet_count)s, %(unique_authors)s, %(total_likes)s,
     %(total_retweets)s, %(total_impressions)s, %(engagement_rate)s,
     %(top_hashtag)s, %(new_users)s, NOW())
ON CONFLICT (report_date) DO UPDATE SET
    tweet_count       = EXCLUDED.tweet_count,
    unique_authors    = EXCLUDED.unique_authors,
    total_likes       = EXCLUDED.total_likes,
    total_retweets    = EXCLUDED.total_retweets,
    total_impressions = EXCLUDED.total_impressions,
    engagement_rate   = EXCLUDED.engagement_rate,
    top_hashtag       = EXCLUDED.top_hashtag,
    new_users         = EXCLUDED.new_users,
    processed_at      = NOW()
"""

# ── Full pipeline ─────────────────────────────────────────────────
def run_pipeline(start_date: datetime.date, end_date: datetime.date) -> int:
    loaded = 0
    with psycopg2.connect(**DSN) as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(CREATE_TARGET)
            conn.commit()
            log.info("Target table ready")

            date = start_date
            while date <= end_date:
                log.info(f"Processing {date}")

                # Extract
                cur.execute(EXTRACT_SQL, (date,))
                raw = cur.fetchone()

                cur.execute(TOP_HASHTAG_SQL, (date,))
                tag_row = cur.fetchone()
                top_hashtag = tag_row["tag"] if tag_row else None

                cur.execute(NEW_USERS_SQL, (date,))
                new_users = cur.fetchone()["cnt"]

                # Transform
                record = transform(raw, top_hashtag, new_users)
                if record is None:
                    log.warning(f"No data for {date}, skipping")
                    date += datetime.timedelta(days=1)
                    continue

                # Load
                cur.execute(UPSERT_SQL, record)
                conn.commit()
                loaded += 1
                log.info(f"  Loaded {date}: {record['tweet_count']} tweets, "
                         f"engagement={record['engagement_rate']}%")

                date += datetime.timedelta(days=1)

    return loaded


if __name__ == "__main__":
    end   = datetime.date.today()
    start = end - datetime.timedelta(days=30)
    n = run_pipeline(start, end)
    print(f"Pipeline complete: {n} days loaded")

Purchase this course to unlock the full lesson.

Sign up