Juan Nadal
  • Home
  • Projects
  • About

On this page

  • Overview
  • Pipeline architecture
  • Data model
  • Findings
    • 1 Catalog concentration: how hit-driven is each channel?
    • 2 Content strategy: what lengths does each channel publish?
    • 3 Views-per-video by category (and the MrBeast effect)
  • Engineering notes worth surfacing
  • Source & sample data

YouTube Tracker An Hourly Time-Series Data Pipeline

Python
SQL
PostgreSQL
Data Engineering
Serverless
A serverless data pipeline that crawls YouTube channel and video stats every hour into a PostgreSQL time-series, then answers questions a single scrape never could like which creator’s back catalog is still being algorithmically revived. SQL is the deliverable; pandas just runs the queries.
Published

May 1, 2026

Overview

YouTube Tracker collects view, like, comment, and subscriber counts for a set of YouTube channels on an hourly schedule and persists them as a time series in PostgreSQL. The same crawler runs two ways: as a local Python script for iteration, and as a DigitalOcean serverless Function on an hourly cron trigger (0 * * * *) in production.

The point of the hourly cadence is the part a one-off scrape can’t reach: velocity. With repeated observations of every video over time, you can ask not just “how many views does this video have” but “how fast is it gaining them right now” and that turns out to surface genuinely surprising behavior in the YouTube recommendation system.

NoteSQL is the artifact, not pandas

This is a SQL-first analysis project. Every finding below begins as a single PostgreSQL query that does the real analytical work DISTINCT ON, NTILE, FILTER (WHERE …), PERCENTILE_CONT, window aggregates. Pandas and Plotly are just the harness that executes the query and draws the result. The interactive charts on this page are rendered in the browser from the actual aggregated query outputs.

Pipeline architecture

YouTube Data API v3
        │  channels.list → uploads playlist IDs
        │  playlistItems.list (paged, stops at first known video)
        │  videos.list (batches of 50) → fresh stats
        ▼
  DigitalOcean Function  ── hourly cron (0 * * * *) ──┐
        │  one transaction per run                    │
        ▼                                             │
  PostgreSQL (managed)                                │
   ├─ metadata tables  (insert-once, ON CONFLICT DO NOTHING)
   └─ snapshot tables  (append-only, one row per entity per run)
        ▼
  SQL-first analysis (Jupyter + pandas + matplotlib)

The crawler is deliberately frugal with API quota: uploads playlists come back newest-first, so paging stops at the first already-known video on a steady-state run that’s usually a single API call per channel. The whole write phase is wrapped in one transaction: any failure rolls back with no partial state, and a crawl_log row records every run as RUNNING → SUCCESS/PARTIAL/FAILED.

Data model

The schema’s core design decision is the split between immutable-ish metadata and append-only time-series:

  • channel, video, category, tag, video_tag written once with ON CONFLICT DO NOTHING; they capture what was seen on first crawl. (The one exception: video.last_seen_at is bumped each run, so a stale value distinguishes “removed from YouTube” from “crawler didn’t run.”)
  • video_snapshot, channel_snapshot append-only, one row per entity per crawl run, each tied to a crawl_log.log_id. These are the trend tables that make every velocity query possible.
ER diagram of the YouTube Tracker schema showing channel, video, category, tag, crawl_log, and the two append-only snapshot tables with their relationships.
Figure 1: Entity-relationship diagram. Snapshot tables (channel_snapshot, video_snapshot) are append-only and reference crawl_log; metadata tables are written once.

Findings

The charts below are interactive hover for exact values, and use the legend to toggle series. They’re drawn client-side from the real aggregated query results.

Plotly = require("https://cdn.plot.ly/plotly-2.35.2.min.js")
data = FileAttachment("findings.json").json()

1 Catalog concentration: how hit-driven is each channel?

Question: What share of a channel’s lifetime views comes from its top 10% of videos? A high number means the channel lives and dies by a few hits; a low number means attention is spread across a deep back catalog.

The query uses DISTINCT ON to grab the latest snapshot per video, NTILE(10) OVER (PARTITION BY channel_id ORDER BY view_count DESC) to cut each channel’s catalog into deciles, then SUM(...) FILTER (WHERE decile = 1) to total just the top decile in one pass.

concChart = {
  const rows = [...data.concentration].sort((a,b) => a.pct_from_top_10pct - b.pct_from_top_10pct)
  const wrapper = html`<div class="plotly-wrapper"></div>`
  const div = html`<div style="height:460px;"></div>`
  wrapper.appendChild(div)
  const colors = rows.map(r => r.pct_from_top_10pct >= 60 ? "#D55E00" : "#0072B2")
  const traces = [{
    type: "bar", orientation: "h",
    x: rows.map(r => r.pct_from_top_10pct),
    y: rows.map(r => r.channel),
    marker: {color: colors},
    customdata: rows.map(r => r.videos),
    hovertemplate: "<b>%{y}</b><br>%{x}% of views from top 10%%<br>%{customdata:,} videos<extra></extra>",
  }]
  const layout = {
    title: {text: "Share of lifetime views from each channel's top 10% of videos", font:{size:15}},
    xaxis: {title: "% of views from top decile", ticksuffix: "%", range:[0,80]},
    yaxis: {automargin: true},
    margin: {l: 20, r: 30, t: 50, b: 50},
  }
  Plotly.newPlot(div, traces, layout, {responsive:true, displayModeBar:false})
  return wrapper
}

Finding: Drake is extraordinarily hit-driven 74.5% of his lifetime views come from just his top 10% of videos. Kendrick is similar at 64%. Tech and explainer channels (MKBHD 37%, LTT 40%, TechLinked just 17%) have much flatter distributions: their back catalogs are evergreen and keep accumulating views. Music lives on a handful of bangers; tutorials compound.

2 Content strategy: what lengths does each channel publish?

Question: Does each creator lean Shorts, mid-length, or long-form? A single aggregate pass with COUNT(*) FILTER (WHERE duration_seconds …) buckets every channel’s catalog into YouTube’s conventional length bands without repeated subqueries.

viewof durMode = Inputs.radio(["Counts", "Share of catalog (%)"], {label: "Show as", value: "Counts"})
durChart = {
  const rows = data.durations
  const buckets = [
    {key:"shorts", name:"Shorts (<1m)", color:"#0072B2"},
    {key:"short_form", name:"Short (1–10m)", color:"#009E73"},
    {key:"mid_form", name:"Mid (10–30m)", color:"#E69F00"},
    {key:"long_form", name:"Long (>30m)", color:"#CC79A7"},
  ]
  const pct = durMode !== "Counts"
  const wrapper = html`<div class="plotly-wrapper"></div>`
  const div = html`<div style="height:480px;"></div>`
  wrapper.appendChild(div)
  const traces = buckets.map(b => ({
    type: "bar", orientation: "h", name: b.name,
    y: rows.map(r => r.channel),
    x: rows.map(r => pct ? 100 * r[b.key] / r.total : r[b.key]),
    marker: {color: b.color},
    customdata: rows.map(r => r[b.key]),
    hovertemplate: `<b>%{y}</b><br>${b.name}: %{customdata:,} videos<extra></extra>`,
  }))
  const layout = {
    title: {text: "Catalog composition by video length", font:{size:15}},
    barmode: "stack",
    xaxis: {title: pct ? "% of catalog" : "Number of videos", ticksuffix: pct ? "%" : ""},
    yaxis: {automargin: true, autorange: "reversed"},
    legend: {orientation: "h", y: -0.12},
    margin: {l: 20, r: 30, t: 50, b: 70},
  }
  Plotly.newPlot(div, traces, layout, {responsive:true, displayModeBar:false})
  return wrapper
}

Finding: Each creator has a visibly distinct production strategy. LTT grinds out a deep mix of short- and mid-form (3,435 + 2,595) a back-catalog-first machine. MrBeast is mostly short/mid-form with almost no long content high-effort, digestible episodes. Kendrick is 98 short-form pieces and nothing else music videos only. Switch to Share of catalog to compare strategy independent of catalog size.

3 Views-per-video by category (and the MrBeast effect)

Question: How much attention does the average video earn in each category? Averages level the field between channels with huge catalogs and ones with small focused ones. The query collapses snapshots to latest-per-video, joins to category, and computes both AVG and a true median via PERCENTILE_CONT(0.5).

This is also where the project’s signature outlier shows up. MrBeast contributes ~966 videos to the Entertainment bucket and dominates it so the query was run twice, with and without him. Toggle below:

viewof beastMode = Inputs.radio(["With MrBeast", "Excluding MrBeast"], {label: "Dataset", value: "With MrBeast"})
viewof metric = Inputs.radio(["Average", "Median"], {label: "Statistic", value: "Average"})
catChart = {
  const src = beastMode === "With MrBeast" ? data.category_with_mrbeast : data.category_without_mrbeast
  const useAvg = metric === "Average"
  const rows = [...src].sort((a,b) => (useAvg ? a.avg_views - b.avg_views : a.median_views - b.median_views))
  const wrapper = html`<div class="plotly-wrapper"></div>`
  const div = html`<div style="height:460px;"></div>`
  wrapper.appendChild(div)
  const traces = [{
    type: "bar", orientation: "h",
    x: rows.map(r => useAvg ? r.avg_views : r.median_views),
    y: rows.map(r => r.category),
    marker: {color: rows.map(r => r.category === "Entertainment" ? "#D55E00"
                              : r.category === "Music" ? "#009E73" : "#0072B2")},
    customdata: rows.map(r => r.videos),
    hovertemplate: "<b>%{y}</b><br>%{x:,.0f} views/video<br>%{customdata:,} videos<extra></extra>",
  }]
  const layout = {
    title: {text: `${useAvg ? "Average" : "Median"} views per video by category  ${beastMode}`, font:{size:15}},
    xaxis: {title: `${useAvg ? "Average" : "Median"} views per video (log scale)`, type: "log"},
    yaxis: {automargin: true},
    margin: {l: 20, r: 30, t: 50, b: 55},
  }
  Plotly.newPlot(div, traces, layout, {responsive:true, displayModeBar:false})
  return wrapper
}

Finding: On the full dataset, Entertainment looks like it earns ~57× a tech video on average but switch to Excluding MrBeast and the Entertainment average collapses from 85.7M to 525K. That “Entertainment dominates” result was almost entirely one channel. Music, by contrast, holds its lead either way (58M avg, 7.4M median) a real medium-level effect, not an artifact. This is exactly why the analysis presents parallel “excluding the outlier” queries rather than silently dropping him: the contrast is the finding. (A log scale is used because values span four orders of magnitude.)

TipThe findings only the time-series can produce

Two further findings in the analysis live velocity (which videos are gaining views per hour right now) and back-catalog revival (videos over a year old still pulling hundreds of thousands of views in a tracking window) are impossible from a single scrape. They require the MIN/MAX deltas across repeated video_snapshot rows. The standout result: every top revival candidate is a MrBeast video, some over two years old evidence of unusually long-lived algorithmic surfacing. See the full notebook for those queries and charts.

Engineering notes worth surfacing

A few non-obvious lessons baked into the code:

  • Subscriber counts are rounded to 3 significant figures by the API for channels over ~1M subs, so flat lines in channel_snapshot.subscriber_count are the API, not a bug.
  • channel.view_count is a slow cached aggregate for channel-level trends you must sum deltas from video_snapshot, not trust the channel field.
  • Channel handles vs IDs: channels.list?id= only accepts 24-char UC… IDs; @handles must be resolved via forHandle= first.
  • Windows deploys need --remote-build so DigitalOcean’s Linux container vendors psycopg2-binary; the DB must also list the Functions namespace as a trusted source or connections hang.

Source & sample data

The full crawler (local + serverless), schema, migrations, and the SQL-first analysis notebooks are in the repo. A small sample export (youtube_export/) is included so the data shape is browsable note it’s a single-channel, single-snapshot capture for illustration, not the multi-channel time-series the findings above were computed on.

View the source on GitHub →

main.py  the crawler (local entrypoint)
import os
import re
import time
from datetime import datetime, timezone

import requests
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv

# ─── CONFIG ───────────────────────────────────────────────────────────────────
load_dotenv()

API_KEY     = os.environ["YT_API_KEY"]
CHANNEL_IDS = [c.strip() for c in os.environ["YT_CHANNEL_IDS"].split(",") if c.strip()]
BASE_URL    = "https://www.googleapis.com/youtube/v3"

DB_CONFIG = {
    "host":     os.environ["DB_HOST"],
    "port":     int(os.environ.get("DB_PORT", "25060")),
    "dbname":   os.environ.get("DB_NAME", "defaultdb"),
    "user":     os.environ.get("DB_USER", "doadmin"),
    "password": os.environ["DB_PASSWORD"],
}

# ─── HELPERS ──────────────────────────────────────────────────────────────────
def get(endpoint, params):
    params["key"] = API_KEY
    r = requests.get(f"{BASE_URL}/{endpoint}", params=params)
    r.raise_for_status()
    return r.json()

def iso_to_dt(iso_str):
    return datetime.fromisoformat(iso_str.replace("Z", "+00:00"))

def duration_to_seconds(duration):
    match = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", duration or "PT0S")
    if not match:
        return 0
    h, m, s = (int(x or 0) for x in match.groups())
    return h * 3600 + m * 60 + s

def chunk(lst, size):
    for i in range(0, len(lst), size):
        yield lst[i:i + size]

# ─── STEP 1: FETCH CHANNEL INFO + UPLOADS PLAYLIST ID ────────────────────────
print("\n── Step 1: Fetching channel info ──────────────────────────────────────")

channel_response = get("channels", {
    "part": "snippet,statistics,contentDetails",
    "id":   ",".join(CHANNEL_IDS)
})

channels          = []
uploads_playlists = {}   # channel_id → uploads playlist id
category_ids      = set()

for item in channel_response.get("items", []):
    channel_id      = item["id"]
    snippet         = item["snippet"]
    stats           = item.get("statistics", {})
    uploads_pl_id   = item["contentDetails"]["relatedPlaylists"]["uploads"]

    uploads_playlists[channel_id] = uploads_pl_id

    channels.append({
        "channel_id":       channel_id,
        "title":            snippet["title"],
        "description":      snippet.get("description", ""),
        "subscriber_count": int(stats.get("subscriberCount", 0)),
        "video_count":      int(stats.get("videoCount", 0)),
        "view_count":       int(stats.get("viewCount", 0)),
        "uploads_playlist": uploads_pl_id,
        "first_crawled_at": datetime.now(timezone.utc),
    })
    print(f"  Found: {snippet['title']}  ({stats.get('videoCount', '?')} videos)")

# ─── STEP 1.5: LOAD VIDEO IDs WE ALREADY HAVE ───────────────────────────────
# Uploads playlists are ordered newest-first, so once we hit a known ID
# every older ID on subsequent pages is also known — we can stop paging.
print("\n── Step 1.5: Loading known video IDs from DB ──────────────────────────")

existing_by_channel = {cid: set() for cid in uploads_playlists.keys()}
try:
    with psycopg2.connect(**DB_CONFIG) as conn_ro, conn_ro.cursor() as cur:
        cur.execute(
            "SELECT channel_id, video_id FROM video WHERE channel_id = ANY(%s)",
            (list(uploads_playlists.keys()),),
        )
        for ch_id, vid_id in cur.fetchall():
            existing_by_channel[ch_id].add(vid_id)
    for ch_id, vids in existing_by_channel.items():
        print(f"  channel={ch_id}  already tracking {len(vids)} videos")
except Exception as e:
    print(f"  [warn] DB lookup failed, treating all videos as new: {e}")

# ─── STEP 2: DISCOVER NEW VIDEO IDs, STOP WHEN WE HIT A KNOWN ONE ────────────
print("\n── Step 2: Discovering new video IDs from uploads playlists ───────────")

all_video_ids = []
video_channel_map = {}

for channel_id, playlist_id in uploads_playlists.items():
    known       = existing_by_channel.get(channel_id, set())
    page_token  = None
    page_count  = 0
    new_ids     = []
    hit_known   = False

    while not hit_known:
        params = {
            "part":       "contentDetails",
            "playlistId": playlist_id,
            "maxResults": 50,
        }
        if page_token:
            params["pageToken"] = page_token

        data       = get("playlistItems", params)
        page_count += 1

        for item in data.get("items", []):
            vid_id = item["contentDetails"]["videoId"]
            if vid_id in known:
                hit_known = True
                break
            new_ids.append(vid_id)
            video_channel_map[vid_id] = channel_id

        print(f"  channel={channel_id}  page={page_count}  new={len(new_ids)}  hit_known={hit_known}")

        if hit_known:
            break
        page_token = data.get("nextPageToken")
        if not page_token:
            break
        time.sleep(0.2)

    # We still want a fresh snapshot for every known video, so map their channel too
    for vid_id in known:
        video_channel_map[vid_id] = channel_id

    channel_ids_total = new_ids + list(known)
    all_video_ids.extend(channel_ids_total)
    print(f"  channel={channel_id}  new: {len(new_ids)}  existing: {len(known)}  total: {len(channel_ids_total)}")

print(f"\n  Total video IDs to snapshot: {len(all_video_ids)}")

# ─── STEP 3: FETCH VIDEO DETAILS IN BATCHES OF 50 ────────────────────────────
print("\n── Step 3: Fetching video details ─────────────────────────────────────")

videos         = []
snapshots      = []
all_tags       = {}
video_tag_rows = []

crawl_start = datetime.now(timezone.utc)
errors      = 0

for batch in chunk(all_video_ids, 50):
    try:
        response = get("videos", {
            "part": "snippet,statistics,contentDetails",
            "id":   ",".join(batch)
        })

        for item in response.get("items", []):
            snippet    = item["snippet"]
            stats      = item.get("statistics", {})
            content    = item.get("contentDetails", {})
            video_id   = item["id"]
            channel_id = video_channel_map.get(video_id, snippet["channelId"])
            cat_id     = snippet.get("categoryId")

            if cat_id:
                category_ids.add(cat_id)

            videos.append({
                "video_id":         video_id,
                "channel_id":       channel_id,
                "title":            snippet["title"],
                "description":      snippet.get("description", ""),
                "category_id":      cat_id,
                "duration_seconds": duration_to_seconds(content.get("duration")),
                "published_at":     iso_to_dt(snippet["publishedAt"]),
                "first_crawled_at": datetime.now(timezone.utc),
            })

            snapshots.append({
                "video_id":      video_id,
                "view_count":    int(stats.get("viewCount", 0)),
                "like_count":    int(stats.get("likeCount", 0)),
                "comment_count": int(stats.get("commentCount", 0)),
                "captured_at":   datetime.now(timezone.utc),
            })

            for tag in snippet.get("tags", []):
                tag_lower = tag.lower()
                if tag_lower not in all_tags:
                    all_tags[tag_lower] = None  # tag_id assigned after DB insert
                video_tag_rows.append({
                    "video_id": video_id,
                    "tag_name": tag_lower,
                })

        print(f"  Processed batch of {len(batch)}  |  total videos: {len(videos)}")
        time.sleep(0.1)

    except Exception as e:
        print(f"  [error] batch failed: {e}")
        errors += 1

# ─── STEP 4: FETCH CATEGORIES ─────────────────────────────────────────────────
print("\n── Step 4: Fetching categories ────────────────────────────────────────")

categories = []
if category_ids:
    cat_response = get("videoCategories", {
        "part": "snippet",
        "id":   ",".join(category_ids),
        "hl":   "en_US"
    })
    categories = [
        {"category_id": i["id"], "name": i["snippet"]["title"]}
        for i in cat_response.get("items", [])
    ]

# ─── STEP 5: WRITE TO POSTGRES ────────────────────────────────────────────────
print("\n── Step 5: Writing to PostgreSQL ──────────────────────────────────────")

conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = False
status = "SUCCESS" if errors == 0 else "PARTIAL"

try:
    with conn.cursor() as cur:
        # crawl_log first — we need log_id for snapshots
        cur.execute(
            """
            INSERT INTO crawl_log (started_at, finished_at, videos_crawled, errors, status)
            VALUES (%s, %s, %s, %s, %s)
            RETURNING log_id
            """,
            (crawl_start, datetime.now(timezone.utc), len(videos), errors, "RUNNING"),
        )
        log_id = cur.fetchone()[0]
        print(f"  crawl_log.log_id = {log_id}")

        # categories — insert only if new; never overwrite
        if categories:
            execute_values(
                cur,
                """
                INSERT INTO category (category_id, name) VALUES %s
                ON CONFLICT (category_id) DO NOTHING
                """,
                [(c["category_id"], c["name"]) for c in categories],
            )
            print(f"  categories inserted (new only): {cur.rowcount}")

        # channels — insert only if new; stats live in channel_snapshot
        if channels:
            execute_values(
                cur,
                """
                INSERT INTO channel (
                    channel_id, title, description, subscriber_count,
                    video_count, view_count, uploads_playlist, first_crawled_at
                ) VALUES %s
                ON CONFLICT (channel_id) DO NOTHING
                """,
                [
                    (
                        c["channel_id"], c["title"], c["description"],
                        c["subscriber_count"], c["video_count"], c["view_count"],
                        c["uploads_playlist"], c["first_crawled_at"],
                    )
                    for c in channels
                ],
            )
            print(f"  channels inserted (new only): {cur.rowcount}")

        # channel_snapshot — channel-level trend, one row per channel per run
        if channels:
            execute_values(
                cur,
                """
                INSERT INTO channel_snapshot (
                    channel_id, log_id, subscriber_count, video_count, view_count, captured_at
                ) VALUES %s
                """,
                [
                    (
                        c["channel_id"], log_id, c["subscriber_count"],
                        c["video_count"], c["view_count"], c["first_crawled_at"],
                    )
                    for c in channels
                ],
            )
            print(f"  channel_snapshot rows: {len(channels)}")

        # videos — insert if new; on conflict, bump last_seen_at so we can
        # detect deleted/unavailable videos as a widening gap from NOW()
        if videos:
            execute_values(
                cur,
                """
                INSERT INTO video (
                    video_id, channel_id, title, description, category_id,
                    duration_seconds, published_at, first_crawled_at, last_seen_at
                ) VALUES %s
                ON CONFLICT (video_id) DO UPDATE SET last_seen_at = EXCLUDED.last_seen_at
                """,
                [
                    (
                        v["video_id"], v["channel_id"], v["title"], v["description"],
                        v["category_id"], v["duration_seconds"], v["published_at"],
                        v["first_crawled_at"], v["first_crawled_at"],
                    )
                    for v in videos
                ],
            )
            print(f"  videos upserted (insert or touch last_seen_at): {len(videos)}")

        # tags — insert and read back ids (handles ON CONFLICT DO NOTHING returning no row)
        if all_tags:
            execute_values(
                cur,
                "INSERT INTO tag (name) VALUES %s ON CONFLICT (name) DO NOTHING",
                [(name,) for name in all_tags.keys()],
            )
            cur.execute(
                "SELECT tag_id, name FROM tag WHERE name = ANY(%s)",
                (list(all_tags.keys()),),
            )
            for tag_id, name in cur.fetchall():
                all_tags[name] = tag_id
            print(f"  tags in DB: {len(all_tags)}")

        # video_tag bridge
        if video_tag_rows:
            execute_values(
                cur,
                """
                INSERT INTO video_tag (video_id, tag_id) VALUES %s
                ON CONFLICT DO NOTHING
                """,
                [(r["video_id"], all_tags[r["tag_name"]]) for r in video_tag_rows],
            )
            print(f"  video_tag rows: {len(video_tag_rows)}")

        # video_snapshot — tied to this crawl run
        if snapshots:
            execute_values(
                cur,
                """
                INSERT INTO video_snapshot (
                    video_id, log_id, view_count, like_count, comment_count, captured_at
                ) VALUES %s
                """,
                [
                    (
                        s["video_id"], log_id, s["view_count"],
                        s["like_count"], s["comment_count"], s["captured_at"],
                    )
                    for s in snapshots
                ],
            )
            print(f"  video_snapshot rows: {len(snapshots)}")

        # finalize crawl_log
        cur.execute(
            """
            UPDATE crawl_log
               SET finished_at    = %s,
                   videos_crawled = %s,
                   errors         = %s,
                   status         = %s
             WHERE log_id = %s
            """,
            (datetime.now(timezone.utc), len(videos), errors, status, log_id),
        )

    conn.commit()
    print(f"\n✓ Done. Committed crawl log_id={log_id}, status={status}\n")

except Exception as e:
    conn.rollback()
    print(f"\n✗ Transaction rolled back: {e}\n")
    raise
finally:
    conn.close()
Datbase.sql  authoritative schema
-- ============================================================
-- YouTube Crawler Schema
-- Database: PostgreSQL
-- ============================================================

-- ── DROP EXISTING TABLES ─────────────────────────────────────
DROP TABLE IF EXISTS video_snapshot   CASCADE;
DROP TABLE IF EXISTS channel_snapshot CASCADE;
DROP TABLE IF EXISTS crawl_log        CASCADE;
DROP TABLE IF EXISTS video_tag        CASCADE;
DROP TABLE IF EXISTS tag              CASCADE;
DROP TABLE IF EXISTS video            CASCADE;
DROP TABLE IF EXISTS channel          CASCADE;
DROP TABLE IF EXISTS category         CASCADE;

-- ── CATEGORY ─────────────────────────────────────────────────
CREATE TABLE category (
    category_id     VARCHAR(10)     NOT NULL,
    name            VARCHAR(100)    NOT NULL,
    CONSTRAINT pk_category PRIMARY KEY (category_id)
);

-- ── CHANNEL ──────────────────────────────────────────────────
CREATE TABLE channel (
    channel_id          VARCHAR(50)     NOT NULL,
    title               VARCHAR(255)    NOT NULL,
    description         TEXT,
    subscriber_count    BIGINT          DEFAULT 0,
    video_count         INT             DEFAULT 0,
    view_count          BIGINT          DEFAULT 0,
    uploads_playlist    VARCHAR(50),
    first_crawled_at    TIMESTAMPTZ     NOT NULL DEFAULT NOW(),
    CONSTRAINT pk_channel PRIMARY KEY (channel_id)
);

-- ── VIDEO ─────────────────────────────────────────────────────
CREATE TABLE video (
    video_id            VARCHAR(20)     NOT NULL,
    channel_id          VARCHAR(50)     NOT NULL,
    title               VARCHAR(500)    NOT NULL,
    description         TEXT,
    category_id         VARCHAR(10),
    duration_seconds    INT             DEFAULT 0,
    published_at        TIMESTAMPTZ,
    first_crawled_at    TIMESTAMPTZ     NOT NULL DEFAULT NOW(),
    last_seen_at        TIMESTAMPTZ     NOT NULL DEFAULT NOW(),
    CONSTRAINT pk_video         PRIMARY KEY (video_id),
    CONSTRAINT fk_video_channel FOREIGN KEY (channel_id)
        REFERENCES channel (channel_id) ON DELETE CASCADE,
    CONSTRAINT fk_video_category FOREIGN KEY (category_id)
        REFERENCES category (category_id) ON DELETE SET NULL
);

-- ── TAG ───────────────────────────────────────────────────────
CREATE TABLE tag (
    tag_id      SERIAL          NOT NULL,
    name        VARCHAR(255)    NOT NULL,
    CONSTRAINT pk_tag   PRIMARY KEY (tag_id),
    CONSTRAINT uq_tag   UNIQUE (name)
);

-- ── VIDEO_TAG (bridge) ────────────────────────────────────────
CREATE TABLE video_tag (
    video_id    VARCHAR(20)     NOT NULL,
    tag_id      INT             NOT NULL,
    CONSTRAINT pk_video_tag         PRIMARY KEY (video_id, tag_id),
    CONSTRAINT fk_video_tag_video   FOREIGN KEY (video_id)
        REFERENCES video (video_id) ON DELETE CASCADE,
    CONSTRAINT fk_video_tag_tag     FOREIGN KEY (tag_id)
        REFERENCES tag (tag_id) ON DELETE CASCADE
);

-- ── CRAWL_LOG ─────────────────────────────────────────────────
CREATE TABLE crawl_log (
    log_id          SERIAL          NOT NULL,
    started_at      TIMESTAMPTZ     NOT NULL,
    finished_at     TIMESTAMPTZ,
    videos_crawled  INT             DEFAULT 0,
    errors          INT             DEFAULT 0,
    status          VARCHAR(20)     NOT NULL DEFAULT 'RUNNING',
    CONSTRAINT pk_crawl_log         PRIMARY KEY (log_id),
    CONSTRAINT chk_crawl_status     CHECK (status IN ('RUNNING', 'SUCCESS', 'PARTIAL', 'FAILED'))
);

-- ── VIDEO_SNAPSHOT ────────────────────────────────────────────
CREATE TABLE video_snapshot (
    snapshot_id     SERIAL          NOT NULL,
    video_id        VARCHAR(20)     NOT NULL,
    log_id          INT,
    view_count      BIGINT          DEFAULT 0,
    like_count      INT             DEFAULT 0,
    comment_count   INT             DEFAULT 0,
    captured_at     TIMESTAMPTZ     NOT NULL DEFAULT NOW(),
    CONSTRAINT pk_video_snapshot        PRIMARY KEY (snapshot_id),
    CONSTRAINT fk_snapshot_video        FOREIGN KEY (video_id)
        REFERENCES video (video_id) ON DELETE CASCADE,
    CONSTRAINT fk_snapshot_crawl_log    FOREIGN KEY (log_id)
        REFERENCES crawl_log (log_id) ON DELETE SET NULL
);

-- ── CHANNEL_SNAPSHOT ──────────────────────────────────────────
CREATE TABLE channel_snapshot (
    snapshot_id         SERIAL          NOT NULL,
    channel_id          VARCHAR(50)     NOT NULL,
    log_id              INT,
    subscriber_count    BIGINT          DEFAULT 0,
    video_count         INT             DEFAULT 0,
    view_count          BIGINT          DEFAULT 0,
    captured_at         TIMESTAMPTZ     NOT NULL DEFAULT NOW(),
    CONSTRAINT pk_channel_snapshot          PRIMARY KEY (snapshot_id),
    CONSTRAINT fk_ch_snapshot_channel       FOREIGN KEY (channel_id)
        REFERENCES channel (channel_id) ON DELETE CASCADE,
    CONSTRAINT fk_ch_snapshot_crawl_log     FOREIGN KEY (log_id)
        REFERENCES crawl_log (log_id) ON DELETE SET NULL
);

-- ============================================================
-- INDEXES
-- ============================================================

-- Look up all videos for a channel
CREATE INDEX idx_video_channel_id      ON video (channel_id);

-- Look up all snapshots for a video, ordered by time
CREATE INDEX idx_snapshot_video_time   ON video_snapshot (video_id, captured_at DESC);

-- Look up all snapshots from a crawl run
CREATE INDEX idx_snapshot_log_id       ON video_snapshot (log_id);

-- Look up all tags for a video
CREATE INDEX idx_video_tag_video_id    ON video_tag (video_id);

-- Full-text search on video titles
CREATE INDEX idx_video_title           ON video USING GIN (to_tsvector('english', title));

-- Detect videos that have gone dark (not seen recently = likely deleted)
CREATE INDEX idx_video_last_seen_at    ON video (last_seen_at);

-- Look up all channel snapshots for a channel, ordered by time
CREATE INDEX idx_ch_snapshot_channel_time ON channel_snapshot (channel_id, captured_at DESC);

-- Look up all channel snapshots from a crawl run
CREATE INDEX idx_ch_snapshot_log_id       ON channel_snapshot (log_id);
Back to top
 

Copyright 2026 Juan Nadal