Async Queries — Concurrent SQL with asyncpg & asyncio

Async Queries — Concurrent SQL with asyncpg & asyncio

asyncpg is a high-performance async PostgreSQL driver. Combined with asyncio.gather, you can run multiple independent queries in parallel — critical for APIs and dashboards that aggregate data from several queries simultaneously.

import asyncio
import asyncpg

DSN = "postgresql://your_db_user:your_db_password@127.0.0.1/shared_db"

# ── Basic async query ─────────────────────────────────────────────
async def get_top_users(conn, limit: int = 10):
    rows = await conn.fetch("""
        SELECT username, follower_count, verified
        FROM tw_users
        ORDER BY follower_count DESC
        LIMIT $1
    """, limit)   # asyncpg uses $1, $2 (not %s)
    return [dict(r) for r in rows]

async def get_trending_hashtags(conn, days: int = 7):
    rows = await conn.fetch("""
        SELECT h.tag, COUNT(*) AS tweet_count
        FROM tw_hashtags h
        JOIN tw_tweet_hashtags th ON th.hashtag_id = h.id
        JOIN tw_tweets t          ON t.id = th.tweet_id
        WHERE t.created_at >= NOW() - ($1 || ' days')::INTERVAL
        GROUP BY h.tag
        ORDER BY tweet_count DESC
        LIMIT 10
    """, str(days))
    return [dict(r) for r in rows]

async def get_platform_stats(conn):
    row = await conn.fetchrow("""
        SELECT
            COUNT(*)                   AS total_tweets,
            COUNT(DISTINCT user_id)    AS active_users,
            SUM(like_count)            AS total_likes,
            AVG(like_count)            AS avg_likes
        FROM tw_tweets
        WHERE created_at >= NOW() - INTERVAL '7 days'
    """)
    return dict(row)

# ── Run all three queries IN PARALLEL ────────────────────────────
async def build_dashboard():
    conn = await asyncpg.connect(DSN)
    try:
        # All three queries execute simultaneously
        users_task    = get_top_users(conn, limit=5)
        hashtags_task = get_trending_hashtags(conn, days=7)
        stats_task    = get_platform_stats(conn)

        top_users, trending_tags, stats = await asyncio.gather(
            users_task, hashtags_task, stats_task
        )

        print("=== Platform Stats ===")
        print(stats)
        print("\n=== Top Users ===")
        for u in top_users:
            print(f"  @{u['username']}: {u['follower_count']:,} followers")
        print("\n=== Trending Hashtags ===")
        for h in trending_tags:
            print(f"  #{h['tag']}: {h['tweet_count']} tweets")

    finally:
        await conn.close()

asyncio.run(build_dashboard())

Purchase this course to unlock the full lesson.

Sign up