Async Queries — Concurrent SQL with asyncpg & asyncio
Async Queries — Concurrent SQL with asyncpg & asyncio
asyncpg is a high-performance async PostgreSQL driver. Combined with asyncio.gather,
you can run multiple independent queries in parallel — critical for APIs and dashboards
that aggregate data from several queries simultaneously.
import asyncio
import asyncpg
DSN = "postgresql://your_db_user:your_db_password@127.0.0.1/shared_db"
# ── Basic async query ─────────────────────────────────────────────
async def get_top_users(conn, limit: int = 10):
rows = await conn.fetch("""
SELECT username, follower_count, verified
FROM tw_users
ORDER BY follower_count DESC
LIMIT $1
""", limit) # asyncpg uses $1, $2 (not %s)
return [dict(r) for r in rows]
async def get_trending_hashtags(conn, days: int = 7):
rows = await conn.fetch("""
SELECT h.tag, COUNT(*) AS tweet_count
FROM tw_hashtags h
JOIN tw_tweet_hashtags th ON th.hashtag_id = h.id
JOIN tw_tweets t ON t.id = th.tweet_id
WHERE t.created_at >= NOW() - ($1 || ' days')::INTERVAL
GROUP BY h.tag
ORDER BY tweet_count DESC
LIMIT 10
""", str(days))
return [dict(r) for r in rows]
async def get_platform_stats(conn):
row = await conn.fetchrow("""
SELECT
COUNT(*) AS total_tweets,
COUNT(DISTINCT user_id) AS active_users,
SUM(like_count) AS total_likes,
AVG(like_count) AS avg_likes
FROM tw_tweets
WHERE created_at >= NOW() - INTERVAL '7 days'
""")
return dict(row)
# ── Run all three queries IN PARALLEL ────────────────────────────
async def build_dashboard():
conn = await asyncpg.connect(DSN)
try:
# All three queries execute simultaneously
users_task = get_top_users(conn, limit=5)
hashtags_task = get_trending_hashtags(conn, days=7)
stats_task = get_platform_stats(conn)
top_users, trending_tags, stats = await asyncio.gather(
users_task, hashtags_task, stats_task
)
print("=== Platform Stats ===")
print(stats)
print("\n=== Top Users ===")
for u in top_users:
print(f" @{u['username']}: {u['follower_count']:,} followers")
print("\n=== Trending Hashtags ===")
for h in trending_tags:
print(f" #{h['tag']}: {h['tweet_count']} tweets")
finally:
await conn.close()
asyncio.run(build_dashboard())