Alerting Pipeline — Scheduled Query → Threshold → Webhook

Alerting Pipeline — Scheduled Query → Threshold → Webhook

A production alerting pipeline: query the database on a schedule, compare results against thresholds, and fire a webhook (Slack, PagerDuty, email) when something looks wrong. This is the pattern behind Grafana alerts, DataDog monitors, and custom watchdogs.

import psycopg2
from psycopg2.extras import RealDictCursor
from dataclasses import dataclass, field
from typing import Callable
import datetime
import json
import urllib.request   # stdlib only — no requests dependency

DSN = dict(host="127.0.0.1", dbname="shared_db",
           user="your_db_user",
           password="your_db_password")


@dataclass
class Alert:
    name:       str
    sql:        str
    params:     tuple = field(default_factory=tuple)
    threshold:  float = 0.0
    comparator: str = ">"         # >, <, >=, <=, ==
    metric_col: str = "value"     # which column to check
    severity:   str = "warning"   # info / warning / critical

    def evaluate(self, row: dict) -> bool:
        value = row[self.metric_col]
        ops = {">": float.__gt__, "<": float.__lt__,
               ">=": float.__ge__, "<=": float.__le__,
               "==": float.__eq__}
        return ops[self.comparator](float(value), self.threshold)


def send_slack_webhook(webhook_url: str, message: str) -> None:
    payload = json.dumps({"text": message}).encode()
    req = urllib.request.Request(
        webhook_url,
        data=payload,
        headers={"Content-Type": "application/json"},
        method="POST",
    )
    urllib.request.urlopen(req, timeout=5)


def run_alerts(alerts: list[Alert], webhook_url: str | None = None) -> list[dict]:
    fired = []
    with psycopg2.connect(**DSN) as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            for alert in alerts:
                cur.execute(alert.sql, alert.params)
                row = cur.fetchone()
                if row and alert.evaluate(row):
                    event = {
                        "alert":    alert.name,
                        "severity": alert.severity,
                        "value":    row[alert.metric_col],
                        "threshold":alert.threshold,
                        "fired_at": datetime.datetime.utcnow().isoformat(),
                        "row":      dict(row),
                    }
                    fired.append(event)
                    msg = (f":red_circle: *{alert.severity.upper()}* — {alert.name}
"
                           f"Value: `{row[alert.metric_col]}` "
                           f"(threshold: {alert.comparator} {alert.threshold})")
                    print(msg)
                    if webhook_url:
                        try:
                            send_slack_webhook(webhook_url, msg)
                        except Exception as e:
                            print(f"Webhook failed: {e}")
    return fired

Purchase this course to unlock the full lesson.

Sign up