#!/usr/bin/env python3
"""sweep-remote-stale.py: 远端 stale 分支清理协调器。

判定一条远端分支"可清理"必须**同时**满足两类条件：

  1. 年龄：last commit ≥ N 天前（默认 14）。
  2. 内容已被吸收：任一命中即可——
     L1 `git merge-base --is-ancestor <tip> origin/<default>`（fast-forward 合并）
     L2 `branch..develop` 范围内每个 commit 的 patch-id 都能在 develop 找到等价
        （squash merge 后残留的 zombie 分支，最常见的 ubuntu CI bot 残留模式）

安全网：
  - 保护分支白名单（develop/staging/production/main/master）+ 前缀（chore/notes-rolling-*）
  - 有 open PR 指向的分支跳过
  - agent-pool slot 当前持有的分支跳过（扫 .agent-pool/slot-*.lock）
  - 默认 dry-run；代删 orphan 要 --sweep-orphan --execute
  - 默认一次最多删 30 条，--limit 放开

模式：
  python3 sweep-remote-stale.py                          # dry-run，按作者分组列待删
  python3 sweep-remote-stale.py --post-issue             # 创建/更新 Gitea issue，@ 出本人自删
  python3 sweep-remote-stale.py --sweep-orphan --execute # 代删无主分支（CI bot / noreply）
  python3 sweep-remote-stale.py --post-issue --sweep-orphan --execute  # 一把跑（推荐）

Auth: GITEA_API_TOKEN / GITEA_TOKEN，需要 write:repository（删 branch）+ write:issue（发 issue）权限。
"""

from __future__ import annotations

import argparse
import json
import os
import re
import subprocess
import sys
import urllib.parse
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent))
from _gitea_api import (  # noqa: E402
    BASE, Api, detect_default_branch, detect_repo, ensure_label,
    find_issue_by_title, get_token, normalize_email_local, paginate,
)

REPO_ROOT = Path(__file__).resolve().parents[2]
AUDIT_DIR = REPO_ROOT / "testing" / "reports" / "stale-cleanup"

PROTECTED_EXACT = ("develop", "staging", "production", "main", "master")
PROTECTED_PREFIX = ("chore/notes-rolling", "chore/rolling-")

ISSUE_LABEL = "stale-branch-cleanup"
ISSUE_LABEL_COLOR = "d4c5f9"
ISSUE_LABEL_DESC = "远端 stale 分支清理（sweep-remote-stale.py 自动维护）"

PATCH_ID_LOOKBACK_DEFAULT = 2000  # commits on default branch scanned for patch-id equivalence


# ---------- Identity / pool ----------

def gitea_user_exists(api: Api, username: str, cache: dict[str, bool]) -> bool:
    if username in cache:
        return cache[username]
    code, _ = api.call_root("GET", f"/users/{urllib.parse.quote(username)}")
    cache[username] = (code == 200)
    return cache[username]


def resolve_branch_owner(commit: dict, api: Api, user_cache: dict[str, bool]) -> str | None:
    """Verified Gitea username for the commit's author, or None.

    Prefers `commit.author.username` (Gitea pre-resolved at push time — free,
    no API call); falls back to email local-part parsing then `/users/<name>`
    verification. None means "no Gitea account for this author" (CI bot,
    noreply, or just unknown email).
    """
    author = commit.get("author") or {}
    gitea_username = (author.get("username") or "").strip()
    username = gitea_username or normalize_email_local(author.get("email") or "")
    if username and gitea_user_exists(api, username, user_cache):
        return username
    return None


def read_pool_branches() -> set[str]:
    """Branches currently claimed by agent-pool slots.

    Looks at every `slot-N.lock` file under the discovered pool root and
    extracts `task_branch=...`. Falls back to env hints if the pool root isn't
    discoverable from layout.
    """
    candidates: list[Path] = []
    parent = REPO_ROOT.parent
    repo_name = REPO_ROOT.name
    pool_env = os.environ.get("FFOA_AGENT_POOL_ROOT")
    if pool_env:
        candidates.append(Path(pool_env))
    candidates.append(parent / f"{repo_name}-wt" / ".agent-pool")
    candidates.append(parent / "ffworkspace-wt" / ".agent-pool")

    branches: set[str] = set()
    seen: set[Path] = set()
    for pool_root in candidates:
        if not pool_root.exists() or pool_root in seen:
            continue
        seen.add(pool_root)
        for lock in pool_root.glob("slot-*.lock"):
            if lock.name.endswith(".flock"):
                continue
            try:
                for line in lock.read_text().splitlines():
                    if line.startswith("task_branch="):
                        b = line.split("=", 1)[1].strip()
                        if b:
                            branches.add(b)
                        break
            except OSError:
                pass
    return branches


# ---------- Judgement: is the content already absorbed? ----------

def run_git(args: list[str]) -> str:
    return subprocess.check_output(["git", "-C", str(REPO_ROOT), *args]).decode()


def run_git_check(args: list[str]) -> bool:
    return subprocess.run(
        ["git", "-C", str(REPO_ROOT), *args],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    ).returncode == 0


def build_develop_patch_id_set(default_branch: str, lookback: int) -> set[str]:
    """Compute patch-id of the last `lookback` commits on origin/<default>.

    Used to detect L2 zombies: branches whose commits got squash-merged into
    develop and the original branch was never deleted. `git diff-tree -p
    --stdin | git patch-id --stable` emits `<patch-id> <commit-sha>` per line;
    we only need the patch-id column.
    """
    try:
        rev_list = run_git([
            "rev-list", f"origin/{default_branch}",
            f"--max-count={lookback}", "--no-merges",
        ]).strip()
    except subprocess.CalledProcessError:
        return set()
    if not rev_list:
        return set()
    # `git diff-tree --stdin` silently drops the last line if it lacks a trailing
    # newline. `.strip()` above removed it, so we re-add — otherwise the oldest
    # commit's patch-id is silently missing from the baseline set.
    proc = subprocess.run(
        ["git", "-C", str(REPO_ROOT), "diff-tree", "-p", "--stdin"],
        input=rev_list + "\n", capture_output=True, text=True,
    )
    if proc.returncode != 0:
        return set()
    patch_proc = subprocess.run(
        ["git", "patch-id", "--stable"],
        input=proc.stdout, capture_output=True, text=True,
    )
    if patch_proc.returncode != 0:
        return set()
    ids: set[str] = set()
    for line in patch_proc.stdout.splitlines():
        parts = line.split()
        if parts:
            ids.add(parts[0])
    return ids


def branch_patch_ids(branch_ref: str, default_branch: str) -> list[str] | None:
    """Patch-ids of commits unique to `branch_ref` vs origin/<default>.

    Returns None on git failure (treat as "can't judge"). Empty list is dead
    in practice: a branch with zero diverging commits is already an ancestor
    of develop, so L1 (`merge-base --is-ancestor`) catches it before L2 runs.
    The caller still guards `and ids` defensively.
    """
    try:
        diff = run_git([
            "log", f"origin/{default_branch}..{branch_ref}",
            "--format=%H", "--no-merges",
        ])
    except subprocess.CalledProcessError:
        return None
    shas = [s for s in diff.strip().splitlines() if s]
    if not shas:
        return []
    # `git diff-tree --stdin` silently drops the last line if it lacks a trailing
    # newline; `"\n".join(...)` omits it, so we append. Without this the LAST
    # diverging commit gets no patch-id and L2 falsely concludes "all matched"
    # from a partial set.
    proc = subprocess.run(
        ["git", "-C", str(REPO_ROOT), "diff-tree", "-p", "--stdin"],
        input="\n".join(shas) + "\n", capture_output=True, text=True,
    )
    if proc.returncode != 0:
        return None
    patch_proc = subprocess.run(
        ["git", "patch-id", "--stable"],
        input=proc.stdout, capture_output=True, text=True,
    )
    if patch_proc.returncode != 0:
        return None
    return [line.split()[0] for line in patch_proc.stdout.splitlines() if line.strip()]


def judge_safe_to_delete(branch: str, default_branch: str,
                         develop_patch_ids: set[str]) -> str | None:
    """Return a short reason string if the branch's content is already absorbed
    into origin/<default>, else None.
    """
    branch_ref = f"origin/{branch}"
    if run_git_check(["merge-base", "--is-ancestor", branch_ref, f"origin/{default_branch}"]):
        return "L1: merged (fast-forward)"
    ids = branch_patch_ids(branch_ref, default_branch)
    if ids is not None and ids and all(i in develop_patch_ids for i in ids):
        return f"L2: patch-id equivalent ({len(ids)} commit{'s' if len(ids) > 1 else ''})"
    return None


# ---------- Collection ----------

def list_open_pr_heads(api: Api) -> set[str]:
    prs = paginate(api, "/pulls", {"state": "open", "sort": "newest"})
    return {(p.get("head") or {}).get("ref", "") for p in prs}


def collect_branches(api: Api) -> list[dict]:
    return paginate(api, "/branches", {})


def _is_protected(name: str) -> bool:
    if name in PROTECTED_EXACT:
        return True
    return any(name.startswith(p) for p in PROTECTED_PREFIX)


def _age_days(commit_iso: str) -> int | None:
    try:
        ts = datetime.fromisoformat(commit_iso.replace("Z", "+00:00"))
    except (ValueError, TypeError):
        return None
    return (datetime.now(timezone.utc) - ts.astimezone(timezone.utc)).days


# ---------- Markdown rendering ----------

def render_markdown(assigned: dict[str, list[dict]], orphan: list[dict],
                    long_stale_assigned: dict[str, list[dict]],
                    long_stale_orphan: list[dict],
                    *, default_branch: str, kept: int, scanned: int,
                    age_days: int, long_stale_days: int,
                    executed_orphans: list[str] | None) -> str:
    L: list[str] = []
    L.append("# 远端 stale 分支待清理")
    L.append("")
    now = datetime.now().astimezone()
    total_safe = sum(len(v) for v in assigned.values()) + len(orphan)
    total_long_stale = sum(len(v) for v in long_stale_assigned.values()) + len(long_stale_orphan)
    L.append(f"**扫描时间**: {now.strftime('%Y-%m-%d %H:%M %z')}")
    L.append(f"**阈值**: 年龄 ≥ {age_days}d 触发判定；≥ {long_stale_days}d 且未被吸收触发 long-stale 提醒")
    L.append("**判定**:")
    L.append(f"  - L1 `git merge-base --is-ancestor <tip> origin/{default_branch}`（fast-forward 合并）")
    L.append("  - L2 patch-id 等价（squash merge 后残留的 zombie）")
    L.append(f"**扫描总数**: {scanned} 个远端分支；可清 {total_safe}；长期未处理 {total_long_stale}；保留 {kept}")
    L.append("")
    L.append("> 删除自己的分支：复制下面 ` ``` ` 框里的命令到本地执行。做完打勾或评论 `done`；本 issue 全部确认后关闭即闭环。")
    L.append("")
    L.append("---")
    L.append("")

    total_assigned = sum(len(v) for v in assigned.values())
    if assigned:
        L.append(f"## 有主分支（{total_assigned}，请本人自删——内容已被吸收）")
        L.append("")
        for user, items in sorted(assigned.items(), key=lambda x: (-len(x[1]), x[0])):
            L.append(f"### @{user} ({len(items)} 条)")
            L.append("")
            for item in items:
                L.append(f"- [ ] `{item['name']}` — {item['age']}d，{item['reason']}")
                L.append("  ```")
                L.append(f"  git push origin --delete {item['name']}")
                L.append("  ```")
            L.append("")

    if orphan:
        if executed_orphans is None:
            L.append(f"## 无主分支（{len(orphan)}，CI bot / noreply，待 admin 代删）")
        else:
            L.append(f"## 无主分支（{len(orphan)}，已代删）")
        L.append("")
        for item in orphan:
            mark = "✅" if executed_orphans and item["name"] in executed_orphans else "⏳"
            L.append(f"- {mark} `{item['name']}` — {item['age']}d，{item['reason']}，作者 `{item.get('email', '?')}`")
        L.append("")

    if long_stale_assigned:
        L.append(f"## 长期未处理（{sum(len(v) for v in long_stale_assigned.values())}，"
                 f"≥ {long_stale_days}d 有未合并工作，请决定）")
        L.append("")
        L.append("> 这些分支有真实未合并的内容（L1/L2 都未命中）。请选 ① 完成并提 PR 合掉 / "
                 "② 放弃，复制下方命令删除 / ③ 把工作迁到新分支再删旧的。")
        L.append("")
        for user, items in sorted(long_stale_assigned.items(), key=lambda x: (-len(x[1]), x[0])):
            L.append(f"### @{user} ({len(items)} 条)")
            L.append("")
            for item in items:
                L.append(f"- [ ] `{item['name']}` — {item['age']}d")
                L.append("  ```")
                L.append(f"  # 放弃用：git push origin --delete {item['name']}")
                L.append("  ```")
            L.append("")

    if long_stale_orphan:
        L.append(f"## 长期未处理 / 无主（{len(long_stale_orphan)}，有未合并工作但无 Gitea 账号，人工 review）")
        L.append("")
        L.append("> 无法 @ 通知。请人工 review 确认是否还需要这些 changes。")
        L.append("")
        for item in long_stale_orphan:
            L.append(f"- `{item['name']}` — {item['age']}d，作者 `{item.get('email', '?')}`")
        L.append("")

    if not assigned and not orphan and not long_stale_assigned and not long_stale_orphan:
        L.append("_（无 stale 分支符合清理或提醒条件）_")
        L.append("")

    L.append("---")
    L.append("")
    L.append("_Generated by `scripts/ops/sweep-remote-stale.py`._")
    return "\n".join(L)


# ---------- Issue posting ----------

def post_or_update_issue(api: Api, title: str, body: str) -> str:
    label_id = ensure_label(api, ISSUE_LABEL, ISSUE_LABEL_COLOR, ISSUE_LABEL_DESC)
    existing = find_issue_by_title(api, ISSUE_LABEL, title)
    if existing:
        n = existing["number"]
        code, txt = api.patch(f"/issues/{n}", {"body": body})
        if code in (200, 201):
            return existing.get("html_url") or f"{BASE}/{api.repo}/issues/{n}"
        print(f"ERROR PATCH /issues/{n} → HTTP {code}: {txt[:300]}", file=sys.stderr)
        sys.exit(2)
    payload = {"title": title, "body": body}
    if label_id is not None:
        payload["labels"] = [label_id]
    code, txt = api.post("/issues", payload)
    if code not in (200, 201):
        print(f"ERROR POST /issues → HTTP {code}: {txt[:300]}", file=sys.stderr)
        sys.exit(2)
    return json.loads(txt).get("html_url", "")


def issue_title() -> str:
    return f"stale-branches 待清理 {datetime.now().strftime('%Y-%m')}"


# ---------- Audit log ----------

def write_audit(lines: list[str]) -> Path:
    AUDIT_DIR.mkdir(parents=True, exist_ok=True)
    path = AUDIT_DIR / f"{datetime.now().strftime('%Y-%m-%d-%H%M')}.log"
    path.write_text("\n".join(lines) + "\n", encoding="utf-8")
    return path


# ---------- Main ----------

def main() -> int:
    p = argparse.ArgumentParser(description="远端 stale 分支清理协调器")
    p.add_argument("--age-days", type=int, default=14, help="分支 last commit 至少多少天前才纳入清理（默认 14）")
    p.add_argument("--long-stale-days", type=int, default=30,
                   help="未被吸收但 ≥ N 天的分支视为 long-stale，@ 本人提醒处理（默认 30，必须 ≥ --age-days）")
    p.add_argument("--limit", type=int, default=30, help="本次最多代删多少个 orphan 分支（默认 30）")
    p.add_argument("--patch-id-lookback", type=int, default=PATCH_ID_LOOKBACK_DEFAULT,
                   help=f"L2 patch-id 比对扫描 develop 多少条历史 commit（默认 {PATCH_ID_LOOKBACK_DEFAULT}）")
    p.add_argument("--post-issue", action="store_true", help="生成 / 更新 Gitea issue（@ 出有主分支作者）")
    p.add_argument("--sweep-orphan", action="store_true", help="代删 orphan 分支（必须配 --execute）")
    p.add_argument("--execute", action="store_true", help="真删（默认 dry-run）")
    args = p.parse_args()

    if args.long_stale_days < args.age_days:
        print(f"ERROR: --long-stale-days ({args.long_stale_days}) must be ≥ "
              f"--age-days ({args.age_days})", file=sys.stderr)
        return 2

    token = get_token()
    repo = detect_repo()
    default_branch = detect_default_branch()
    api = Api(token, repo)

    print(f"Repo: {repo}  default={default_branch}  age≥{args.age_days}d", file=sys.stderr)

    print("  git fetch origin --prune ...", file=sys.stderr)
    fetch_rc = subprocess.run(
        ["git", "-C", str(REPO_ROOT), "fetch", "origin", "--prune", "--quiet"],
        check=False,
    ).returncode
    if fetch_rc != 0:
        print(f"  WARN: git fetch exited {fetch_rc} — patch-id baseline may be stale, "
              "L2 may underdetect recent merges", file=sys.stderr)

    print(f"  computing develop patch-id baseline (lookback={args.patch_id_lookback}) ...", file=sys.stderr)
    develop_ids = build_develop_patch_id_set(default_branch, args.patch_id_lookback)
    print(f"  baseline: {len(develop_ids)} patch-ids", file=sys.stderr)

    print("  listing remote branches ...", file=sys.stderr)
    branches = collect_branches(api)
    print(f"  found {len(branches)} branches", file=sys.stderr)

    print("  listing open PRs ...", file=sys.stderr)
    open_pr_heads = list_open_pr_heads(api)
    pool_branches = read_pool_branches()
    print(f"  open PR heads: {len(open_pr_heads)}; pool-active: {len(pool_branches)}", file=sys.stderr)

    assigned: dict[str, list[dict]] = defaultdict(list)
    orphan: list[dict] = []
    long_stale_assigned: dict[str, list[dict]] = defaultdict(list)
    long_stale_orphan: list[dict] = []
    kept = 0
    user_cache: dict[str, bool] = {}
    audit_lines: list[str] = [
        f"# stale sweep {datetime.now().isoformat()}",
        f"# repo={repo} default={default_branch} age>={args.age_days}d "
        f"long-stale>={args.long_stale_days}d",
    ]

    for b in branches:
        name = b.get("name") or ""
        commit = b.get("commit") or {}
        committed_at = commit.get("timestamp") or (commit.get("author") or {}).get("date") or ""
        author_email = (commit.get("author") or {}).get("email") or ""

        if _is_protected(name):
            kept += 1
            continue
        if name in open_pr_heads:
            kept += 1
            audit_lines.append(f"keep {name} reason=open-pr")
            continue
        if name in pool_branches:
            kept += 1
            audit_lines.append(f"keep {name} reason=agent-pool-active")
            continue

        age = _age_days(committed_at)
        if age is None or age < args.age_days:
            kept += 1
            continue

        reason = judge_safe_to_delete(name, default_branch, develop_ids)
        username = resolve_branch_owner(commit, api, user_cache)
        item = {"name": name, "age": age, "reason": reason or "content-not-absorbed",
                "email": author_email}

        if reason:
            if username:
                assigned[username].append(item)
                audit_lines.append(f"assign {name} age={age}d to=@{username} reason={reason}")
            else:
                orphan.append(item)
                audit_lines.append(f"orphan {name} age={age}d email={author_email or '?'} reason={reason}")
        elif age >= args.long_stale_days:
            if username:
                long_stale_assigned[username].append(item)
                audit_lines.append(f"long-stale {name} age={age}d to=@{username}")
            else:
                long_stale_orphan.append(item)
                audit_lines.append(f"long-stale-orphan {name} age={age}d email={author_email or '?'}")
        else:
            kept += 1
            audit_lines.append(f"keep {name} age={age}d reason=content-not-absorbed")

    print(
        f"\nResults: assigned={sum(len(v) for v in assigned.values())} "
        f"orphan={len(orphan)} "
        f"long-stale={sum(len(v) for v in long_stale_assigned.values())}+"
        f"{len(long_stale_orphan)} "
        f"kept={kept}",
        file=sys.stderr,
    )

    executed_orphans: list[str] | None = None
    if args.sweep_orphan and args.execute:
        # Race window: open-PR check ran once at scan time, branches are deleted
        # later in this loop. If a PR opens for an orphan in between, we'd
        # delete a branch that just got attached to an open PR. Cron cadence is
        # weekly + orphans are CI bot zombies (rarely re-attached) so we accept
        # the window; flag if it ever materializes in audit log review.
        executed_orphans = []
        sweep_count = 0
        for item in orphan:
            if sweep_count >= args.limit:
                audit_lines.append(f"skip {item['name']} reason=limit-reached")
                break
            code, txt = api.delete(f"/branches/{urllib.parse.quote(item['name'], safe='')}")
            if code in (200, 204):
                executed_orphans.append(item["name"])
                audit_lines.append(f"deleted {item['name']}")
                sweep_count += 1
            else:
                audit_lines.append(f"delete-failed {item['name']} HTTP {code}: {txt[:120]}")
        print(f"  代删 orphan: {len(executed_orphans)}/{len(orphan)}", file=sys.stderr)
    elif args.sweep_orphan and not args.execute:
        print("  --sweep-orphan 需要配 --execute 才真删（当前为 dry-run）", file=sys.stderr)

    md = render_markdown(
        assigned, orphan, long_stale_assigned, long_stale_orphan,
        default_branch=default_branch, kept=kept,
        scanned=len(branches), age_days=args.age_days,
        long_stale_days=args.long_stale_days,
        executed_orphans=executed_orphans,
    )

    if args.post_issue:
        total_actionable = (
            sum(len(v) for v in assigned.values()) + len(orphan)
            + sum(len(v) for v in long_stale_assigned.values()) + len(long_stale_orphan)
        )
        if total_actionable == 0:
            # 周期性 cron 跑空就静默——避免每周生成一条没内容的 issue 刷屏
            print("Nothing to report — skipping issue post.", file=sys.stderr)
        else:
            url = post_or_update_issue(api, issue_title(), md)
            print(f"Issue: {url}", file=sys.stderr)

    audit_path = write_audit(audit_lines)
    print(f"Audit log: {audit_path}", file=sys.stderr)

    print()
    print(md)
    return 0


if __name__ == "__main__":
    sys.exit(main())
