#!/usr/bin/env python3
"""Generate a Promotion PR body (develop → staging / staging → production).

Why this script exists: a Promotion PR rolls up 10-30 commits across many
topics. The Feature PR template is the wrong shape for that audience (QA /
business / deployer). This script reads git log between `base..head` and
produces a markdown body matching `.gitea/PULL_REQUEST_TEMPLATE/promotion.md`.

Coverage: ~80%. The remaining 20% (UAT must-test focus, risk-level human
judgment, previous-promotion tag, follow-ups) must be filled by a human
before opening the PR.

Spec: docs/standards/13-pr-description-spec.md「晋级 PR（Promotion）规范」
"""

from __future__ import annotations

import argparse
import re
import subprocess
import sys
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path

# Conventional commit type → human-readable category
TYPE_CATEGORY = {
    "feat": "Features（新功能，要重点测）",
    "fix": "Bug Fixes（修复，验证已修复）",
    "perf": "Refactor / Perf（重构 / 性能，验证无回归）",
    "refactor": "Refactor / Perf（重构 / 性能，验证无回归）",
    "chore": "Chore / Docs / Style / Test（无运行时影响 / 低风险）",
    "docs": "Chore / Docs / Style / Test（无运行时影响 / 低风险）",
    "style": "Chore / Docs / Style / Test（无运行时影响 / 低风险）",
    "test": "Chore / Docs / Style / Test（无运行时影响 / 低风险）",
    "build": "Chore / Docs / Style / Test（无运行时影响 / 低风险）",
    "ci": "Chore / Docs / Style / Test（无运行时影响 / 低风险）",
}
CATEGORY_ORDER = [
    "Features（新功能，要重点测）",
    "Bug Fixes（修复，验证已修复）",
    "Refactor / Perf（重构 / 性能，验证无回归）",
    "Chore / Docs / Style / Test（无运行时影响 / 低风险）",
    "其他（未识别 conventional commit 类型）",
]

# Paths whose touch should hint at higher risk in the human reviewer's mind.
HIGH_RISK_PATTERNS = [
    re.compile(r"^prisma/schema/"),
    re.compile(r"^prisma/migrations/"),
    re.compile(r"^docs/standards/"),
    re.compile(r"^CLAUDE\.md$"),
    re.compile(r"^AGENTS\.md$"),
    re.compile(r"^\.gitea/workflows/"),
    re.compile(r"^scripts/deploy/"),
    re.compile(r"\.env\.example$"),
]
MEDIUM_RISK_PATTERNS = [
    re.compile(r"^backend/src/core/"),
    re.compile(r"^backend/src/modules/iam/"),
    re.compile(r"^engines/"),
    re.compile(r"^\.agents/skills/"),
]

MERGE_PR_RE = re.compile(r"^Merge pull request '(.+)' \(#(\d+)\) from .+ into ")
# Squash-merged commits end with "(#NNN)" — Gitea's squash style appends PR ref to the subject
TRAILING_PR_RE = re.compile(r"\s*\(#(\d+)\)\s*$")
CONVENTIONAL_RE = re.compile(r"^(?P<type>feat|fix|perf|refactor|chore|docs|style|test|build|ci)(?:\((?P<scope>[^)]+)\))?(?P<bang>!)?:\s*(?P<subject>.+)$")


@dataclass
class CommitEntry:
    sha: str
    subject: str
    pr_number: int | None = None  # if subject is a merge-PR line
    inner_subject: str | None = None  # subject of the underlying squashed commit if known
    type: str | None = None
    scope: str | None = None
    breaking: bool = False
    files: list[str] = field(default_factory=list)

    @property
    def category(self) -> str:
        if self.type:
            return TYPE_CATEGORY.get(self.type, "其他（未识别 conventional commit 类型）")
        return "其他（未识别 conventional commit 类型）"

    @property
    def display_subject(self) -> str:
        return self.inner_subject or self.subject

    @property
    def risk_hint(self) -> str:
        for p in HIGH_RISK_PATTERNS:
            if any(p.search(f) for f in self.files):
                return "🔴"
        for p in MEDIUM_RISK_PATTERNS:
            if any(p.search(f) for f in self.files):
                return "🟡"
        return "🟢"


def run(cmd: list[str]) -> str:
    r = subprocess.run(cmd, capture_output=True, text=True, check=False)
    if r.returncode != 0:
        sys.stderr.write(f"ERROR running {cmd}: {r.stderr}\n")
        sys.exit(1)
    return r.stdout


def get_commits(base: str, head: str) -> list[CommitEntry]:
    """Return commit list base..head, oldest-first, with file lists.

    Uses --first-parent so each PR shows up exactly once on the main-branch
    line (as either the merge commit or, if squash-merged, the squashed
    commit). Without --first-parent, regular-merged PRs would surface both
    the merge commit AND the individual feature-branch commits, causing
    duplicates that string-based dedup can't catch (PR title and original
    commit subject often differ slightly).
    """
    # %H<TAB>%s for sha + subject
    out = run(["git", "log", "--first-parent", "--reverse", "--pretty=format:%H%x09%s", f"{base}..{head}"])
    entries: list[CommitEntry] = []
    for line in out.splitlines():
        if not line.strip():
            continue
        sha, _, subject = line.partition("\t")
        entry = CommitEntry(sha=sha, subject=subject)

        # Detect merge-PR commit or squash-merged commit with trailing (#N)
        m = MERGE_PR_RE.match(subject)
        if m:
            entry.pr_number = int(m.group(2))
            entry.inner_subject = m.group(1)
            inner = m.group(1)
        else:
            inner = subject
            tm = TRAILING_PR_RE.search(subject)
            if tm:
                entry.pr_number = int(tm.group(1))
                # Strip the trailing (#N) from display subject for cleaner rendering
                entry.inner_subject = TRAILING_PR_RE.sub("", subject).rstrip()
                inner = entry.inner_subject

        # Parse conventional commit on the displayed subject
        cm = CONVENTIONAL_RE.match(inner)
        if cm:
            entry.type = cm.group("type")
            entry.scope = cm.group("scope")
            entry.breaking = bool(cm.group("bang"))

        # File list for this commit (merge commits use --diff-merges to see net effect)
        files_out = run(
            ["git", "log", "-1", "--name-only", "--pretty=format:", "--diff-merges=first-parent", sha]
        )
        entry.files = [f for f in files_out.splitlines() if f.strip()]

        entries.append(entry)
    return entries


def get_period(base: str, head: str) -> tuple[str, str]:
    """Return (oldest-date, newest-date) of commits base..head as ISO yyyy-mm-dd."""
    oldest = run(["git", "log", "--reverse", "--pretty=format:%ad", "--date=short", f"{base}..{head}"])
    oldest = oldest.splitlines()[0] if oldest.strip() else ""
    newest = run(["git", "log", "-1", "--pretty=format:%ad", "--date=short", f"{base}..{head}"])
    return oldest, newest.strip()


def categorize(entries: list[CommitEntry]) -> dict[str, list[CommitEntry]]:
    """Group commits by category. Only one entry per PR number (drop the
    underlying squashed commits when a merge-PR commit is present)."""
    pr_seen: set[int] = set()
    inner_subjects_in_prs: set[str] = set()

    # Pass 1: collect inner subjects from merge commits so we can suppress dups
    for e in entries:
        if e.pr_number and e.inner_subject:
            inner_subjects_in_prs.add(e.inner_subject)

    grouped: dict[str, list[CommitEntry]] = defaultdict(list)
    for e in entries:
        if e.pr_number:
            if e.pr_number in pr_seen:
                continue
            pr_seen.add(e.pr_number)
            grouped[e.category].append(e)
        else:
            # Suppress if this commit's subject is also represented as a merge-PR entry
            if e.subject in inner_subjects_in_prs:
                continue
            grouped[e.category].append(e)
    return grouped


def detect_schema_changes(entries: list[CommitEntry]) -> list[str]:
    hits = []
    for e in entries:
        for f in e.files:
            if f.startswith("prisma/migrations/") and f.endswith("migration.sql"):
                hits.append(f)
    return sorted(set(hits))


def detect_env_changes(entries: list[CommitEntry]) -> bool:
    for e in entries:
        if any(f.endswith(".env.example") or f == ".env.example" for f in e.files):
            return True
    return False


def detect_high_risk(entries: list[CommitEntry]) -> list[CommitEntry]:
    return [e for e in entries if e.risk_hint == "🔴"]


def format_commit_line(e: CommitEntry) -> str:
    subj = e.display_subject.replace("|", "\\|")
    if e.pr_number:
        return f"- PR #{e.pr_number} {subj}"
    short = e.sha[:8]
    return f"- `{short}` {subj}"


def format_risk_row(e: CommitEntry) -> str:
    subj = e.display_subject.replace("|", "\\|")
    risk_paths = [
        f for f in e.files
        if any(p.search(f) for p in HIGH_RISK_PATTERNS)
    ]
    paths_hint = ", ".join(sorted(set(Path(p).parts[0] for p in risk_paths))) if risk_paths else "—"
    source = f"PR #{e.pr_number}" if e.pr_number else f"`{e.sha[:8]}`"
    return f"| {e.risk_hint} | {subj}（触碰：{paths_hint}）| {source} |"


def build_body(base: str, head: str) -> str:
    entries = get_commits(base, head)
    if not entries:
        return f"# 晋级 PR 草稿\n\n`{base}..{head}` 无可晋级 commits。"

    oldest, newest = get_period(base, head)
    pr_count = len({e.pr_number for e in entries if e.pr_number})

    grouped = categorize(entries)
    schema_migrations = detect_schema_changes(entries)
    env_changed = detect_env_changes(entries)
    high_risk = detect_high_risk(entries)

    lines = []
    lines.append("## 本次晋级范围")
    lines.append("")
    lines.append(
        f"{base} ← {head} / {oldest} ~ {newest} / {len(entries)} commits / {pr_count} PRs"
    )
    lines.append("")

    lines.append("## 按类型分类")
    lines.append("")
    for cat in CATEGORY_ORDER:
        items = grouped.get(cat, [])
        if not items:
            continue
        lines.append(f"**{cat}**")
        for e in items:
            lines.append(format_commit_line(e))
        lines.append("")

    lines.append("## UAT 必测清单")
    lines.append("")
    lines.append("> ⚠️ 以下是脚本能自动生成的部分。**回归重点**和**外部集成**必须人工补全。")
    lines.append("")
    lines.append("- [ ] **新功能逐项过**：每条链回上面 Features 列表")
    lines.append("- [ ] **i18n 双语**：所有新增 UI zh-CN ↔ en-US 切换无 missing key warning，日期/数字按 locale 格式化")
    lines.append("- [ ] **回归重点**（本次大改动的模块）：")
    lines.append("  - <!-- 人工补：根据本批改动模块列出 -->")
    lines.append("- [ ] **外部集成仍可用**（按本次涉及）：")
    lines.append("  - <!-- 人工补：ADP / Outlook / SAP / Entra / Temporal 等 -->")
    lines.append("")

    lines.append("## 破坏性变更汇总")
    lines.append("")
    breaking = [e for e in entries if e.breaking]
    if breaking:
        lines.append("是（Y）")
        lines.append("")
        for e in breaking:
            lines.append(format_commit_line(e) + "  ← 含 `!` 标记")
        lines.append("")
        lines.append("<!-- 人工补：每项的依赖方清单 + 迁移路径 + 兼容窗口 -->")
    else:
        lines.append("否（N）")
    lines.append("")

    lines.append("## Schema / Env / Config 变更汇总")
    lines.append("")
    if schema_migrations:
        lines.append("**Prisma 迁移**：")
        for m in schema_migrations:
            lines.append(f"- `{m}`")
    else:
        lines.append("- Schema 迁移：无")
    if env_changed:
        lines.append("- **新 env**：检测到 `.env.example` 有变更——人工补具体 keys + 4 环境（dev/test/uat/pro）.env 由谁配 + 何时配")
    else:
        lines.append("- 新 env：无")
    lines.append("- 部署侧需变更：<!-- 人工判断：本批是否需要改 certs / pm2 / 容器 / nginx -->")
    lines.append("")

    lines.append("## 风险热点")
    lines.append("")
    lines.append("> ⚠️ 脚本只根据触碰路径给**初步建议**。**最终风险等级必须人工判断**——某 refactor 触碰热路径就算文件路径不在高风险清单也要标 🔴。")
    lines.append("")
    lines.append("| 风险等级 | 内容 | 来源 PR |")
    lines.append("|---|---|---|")
    if high_risk:
        for e in high_risk:
            lines.append(format_risk_row(e))
    medium = [e for e in entries if e.risk_hint == "🟡"]
    for e in medium[:5]:  # cap to top 5 medium to avoid noise
        lines.append(format_risk_row(e))
    if len(medium) > 5:
        lines.append(f"| ⋯ | 另有 {len(medium) - 5} 条 🟡 中风险条目省略；完整列表见 `git log --first-parent {base}..{head}` 或重跑本脚本 | — |")
    if not high_risk and not medium:
        lines.append("| 🟢 低 | 本批无触碰高/中风险路径 | — |")
    lines.append("")

    lines.append("## 回滚策略")
    lines.append("")
    lines.append("- **UAT 单功能问题**：revert 对应 feature PR 并重新发起 promotion PR")
    lines.append(f"- **UAT 整体崩溃**：{base} 分支 reset 到上一个 promotion 标签 `<人工补：上一批 staging-passed 标签>`")
    lines.append("- **生产部署后发现**：走 hotfix 流程（本地改 → hotfix 分支 → PR → UAT 验证 → 合 production），不回滚 staging")
    lines.append("")

    lines.append("## 已知遗留 / 后续跟进")
    lines.append("")
    lines.append("- <!-- 人工补：本批未解决的已知问题 / 不在本批的相关工单 / 下一批要带的事；如无写「无」 -->")
    lines.append("")

    lines.append("## 关联")
    lines.append("")
    lines.append("- <!-- 人工补：本批 close 的 issue 用 Closes #N，一行一个 -->")
    lines.append("")

    lines.append("---")
    lines.append("")
    lines.append("> 🤖 本草稿由 `scripts/ops/promotion-pr-body.py` 自动生成，覆盖约 80%。")
    lines.append("> 仍需人工补全：UAT 回归重点 / UAT 外部集成 / 风险等级人工判断 / 部署侧变更 / 上一批 promotion 标签 / 已知遗留 / Closes #N。")

    return "\n".join(lines)


def main() -> int:
    ap = argparse.ArgumentParser(description=__doc__.splitlines()[0])
    ap.add_argument("--base", default="staging", help="base branch (default: staging)")
    ap.add_argument("--head", default="develop", help="head branch (default: develop)")
    ap.add_argument("--output", "-o", default="-", help="output file (default: stdout)")
    args = ap.parse_args()

    body = build_body(args.base, args.head)

    if args.output == "-":
        print(body)
    else:
        Path(args.output).write_text(body, encoding="utf-8")
        print(f"Wrote {args.output} ({len(body)} bytes)", file=sys.stderr)
    return 0


if __name__ == "__main__":
    sys.exit(main())
