ETL Mini-Pipeline — Extract, Transform, Load with Python & SQL
ETL Mini-Pipeline — Extract, Transform, Load with Python & SQL
An ETL pipeline extracts data from a source, transforms it in Python + SQL, and loads it into a target table. We build a complete, idempotent, restartable pipeline that aggregates Twitter metrics into a daily summary table.
import psycopg2
from psycopg2.extras import execute_values, RealDictCursor
import datetime
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
DSN = dict(host="127.0.0.1", dbname="shared_db",
user="your_db_user",
password="your_db_password")
# ── Step 0: Ensure target table exists ───────────────────────────
CREATE_TARGET = """
CREATE TABLE IF NOT EXISTS tw_daily_metrics (
report_date DATE PRIMARY KEY,
tweet_count INT,
unique_authors INT,
total_likes BIGINT,
total_retweets BIGINT,
total_impressions BIGINT,
engagement_rate NUMERIC(8,4),
top_hashtag TEXT,
new_users INT,
processed_at TIMESTAMPTZ DEFAULT NOW()
);
"""
# ── Step 1: Extract — pull raw data for a given date ─────────────
EXTRACT_SQL = """
SELECT
t.created_at::DATE AS day,
COUNT(t.id) AS tweet_count,
COUNT(DISTINCT t.user_id) AS unique_authors,
SUM(t.like_count) AS total_likes,
SUM(t.retweet_count) AS total_retweets,
SUM(t.impression_count) AS total_impressions
FROM tw_tweets t
WHERE t.created_at::DATE = %s
AND t.retweet_of_id IS NULL
GROUP BY t.created_at::DATE
"""
TOP_HASHTAG_SQL = """
SELECT h.tag
FROM tw_tweet_hashtags th
JOIN tw_hashtags h ON h.id = th.hashtag_id
JOIN tw_tweets t ON t.id = th.tweet_id
WHERE t.created_at::DATE = %s
GROUP BY h.tag
ORDER BY COUNT(*) DESC
LIMIT 1
"""
NEW_USERS_SQL = """
SELECT COUNT(*) AS cnt
FROM tw_users
WHERE created_at::DATE = %s
"""
# ── Step 2: Transform — enrich and compute derived metrics ────────
def transform(raw: dict, top_hashtag: str | None, new_users: int) -> dict:
if not raw:
return None
impressions = raw["total_impressions"] or 1
engagement = (raw["total_likes"] + raw["total_retweets"]) / impressions * 100
return {
"report_date": raw["day"],
"tweet_count": raw["tweet_count"],
"unique_authors": raw["unique_authors"],
"total_likes": raw["total_likes"],
"total_retweets": raw["total_retweets"],
"total_impressions":raw["total_impressions"],
"engagement_rate": round(engagement, 4),
"top_hashtag": top_hashtag,
"new_users": new_users,
}
# ── Step 3: Load — upsert into target (idempotent) ───────────────
UPSERT_SQL = """
INSERT INTO tw_daily_metrics
(report_date, tweet_count, unique_authors, total_likes, total_retweets,
total_impressions, engagement_rate, top_hashtag, new_users, processed_at)
VALUES
(%(report_date)s, %(tweet_count)s, %(unique_authors)s, %(total_likes)s,
%(total_retweets)s, %(total_impressions)s, %(engagement_rate)s,
%(top_hashtag)s, %(new_users)s, NOW())
ON CONFLICT (report_date) DO UPDATE SET
tweet_count = EXCLUDED.tweet_count,
unique_authors = EXCLUDED.unique_authors,
total_likes = EXCLUDED.total_likes,
total_retweets = EXCLUDED.total_retweets,
total_impressions = EXCLUDED.total_impressions,
engagement_rate = EXCLUDED.engagement_rate,
top_hashtag = EXCLUDED.top_hashtag,
new_users = EXCLUDED.new_users,
processed_at = NOW()
"""
# ── Full pipeline ─────────────────────────────────────────────────
def run_pipeline(start_date: datetime.date, end_date: datetime.date) -> int:
loaded = 0
with psycopg2.connect(**DSN) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(CREATE_TARGET)
conn.commit()
log.info("Target table ready")
date = start_date
while date <= end_date:
log.info(f"Processing {date}")
# Extract
cur.execute(EXTRACT_SQL, (date,))
raw = cur.fetchone()
cur.execute(TOP_HASHTAG_SQL, (date,))
tag_row = cur.fetchone()
top_hashtag = tag_row["tag"] if tag_row else None
cur.execute(NEW_USERS_SQL, (date,))
new_users = cur.fetchone()["cnt"]
# Transform
record = transform(raw, top_hashtag, new_users)
if record is None:
log.warning(f"No data for {date}, skipping")
date += datetime.timedelta(days=1)
continue
# Load
cur.execute(UPSERT_SQL, record)
conn.commit()
loaded += 1
log.info(f" Loaded {date}: {record['tweet_count']} tweets, "
f"engagement={record['engagement_rate']}%")
date += datetime.timedelta(days=1)
return loaded
if __name__ == "__main__":
end = datetime.date.today()
start = end - datetime.timedelta(days=30)
n = run_pipeline(start, end)
print(f"Pipeline complete: {n} days loaded")