import { Injectable, Logger, ForbiddenException, BadRequestException } from '@nestjs/common';
import { PrismaService } from '@core/database/prisma/prisma.service';
import { AiUsageOsPlatform, AiUsageDlqReason, Prisma } from '@prisma/client';
import * as path from 'path';
import { IngestionBatchDto } from '../dto';

const MAX_BATCH = 500;
const TS_FUTURE_TOLERANCE_MS = 5 * 60 * 1000;
const TS_PAST_TOLERANCE_MS = 30 * 24 * 60 * 60 * 1000;

@Injectable()
export class AiUsageIngestionService {
  private readonly logger = new Logger(AiUsageIngestionService.name);

  constructor(private readonly prisma: PrismaService) {}

  async ingest(params: {
    tokenId: string;
    userId: string;
    organizationId: string;
    deviceMeta: { deviceId: string; hostname: string; osUser?: string; osPlatform: AiUsageOsPlatform; agentVersion?: string };
    ip?: string;
    body: IngestionBatchDto;
  }) {
    if (!params.body.events || params.body.events.length === 0) {
      throw new BadRequestException('AI_USAGE_INVALID_PAYLOAD');
    }
    if (params.body.events.length > MAX_BATCH) {
      throw new BadRequestException('AI_USAGE_BATCH_TOO_LARGE');
    }

    const device = await this.prisma.aiUsageDevice.upsert({
      where: { deviceId: params.deviceMeta.deviceId },
      create: {
        deviceId: params.deviceMeta.deviceId,
        userId: params.userId,
        organizationId: params.organizationId,
        createdById: params.userId,
        hostname: params.deviceMeta.hostname,
        osUser: params.deviceMeta.osUser ?? null,
        osPlatform: params.deviceMeta.osPlatform,
        agentVersion: params.deviceMeta.agentVersion ?? null,
        firstSeenAt: new Date(),
        firstSeenIp: params.ip ?? null,
        lastSeenAt: new Date(),
      },
      update: {
        lastSeenAt: new Date(),
        hostname: params.deviceMeta.hostname,
        agentVersion: params.deviceMeta.agentVersion ?? undefined,
      },
    });

    // device 所有者锁定首次注册者；若当前 token 持有人与 owner 不同 →
    // 可能是合规的机器交接、也可能是 token 泄漏被盗用，admin 需要可观测信号
    if (device.userId !== params.userId) {
      this.logger.warn(
        `ai-usage device owner mismatch: deviceId=${params.deviceMeta.deviceId} owner=${device.userId} current_token_user=${params.userId} ip=${params.ip ?? '-'}`,
      );
    }

    if (device.blockedAt) {
      await this.dlqAll(
        params.body.events,
        AiUsageDlqReason.BLOCKED_DEVICE,
        device.id,
        params.organizationId,
        params.userId,
      );
      throw new ForbiddenException('AI_USAGE_DEVICE_BLOCKED');
    }

    const now = Date.now();
    const valid: Prisma.AiUsageEventCreateManyInput[] = [];
    const dlq: { rawPayload: any; reason: AiUsageDlqReason }[] = [];

    for (const e of params.body.events) {
      const ts = new Date(e.ts).getTime();
      if (Number.isNaN(ts)) {
        dlq.push({ rawPayload: e, reason: AiUsageDlqReason.BAD_FORMAT });
        continue;
      }
      if (ts > now + TS_FUTURE_TOLERANCE_MS || ts < now - TS_PAST_TOLERANCE_MS) {
        dlq.push({ rawPayload: e, reason: AiUsageDlqReason.TS_OUT_OF_WINDOW });
        continue;
      }
      valid.push({
        rawMessageId: e.rawMessageId,
        deviceId: device.id,
        userId: params.userId,
        // 走 raw SQL `$::"AiUsageTool"` cast，需保留 kebab string，TS 类型上用 as any 屏蔽
        tool: e.tool as any,
        sessionId: e.sessionId,
        projectPath: e.projectPath,
        projectBasename: path.basename(e.projectPath) || e.projectPath,
        model: e.model,
        ts: new Date(e.ts),
        receivedAt: new Date(),
        inputTokens: e.inputTokens,
        outputTokens: e.outputTokens,
        cacheCreationTokens: e.cacheCreationTokens,
        cacheReadTokens: e.cacheReadTokens,
        // totalTokens 是 GENERATED ALWAYS 列；下方 insertEventsRaw 不写它，这里 0 仅为满足 TS 类型
        totalTokens: 0,
        estimatedCostUsd: new Prisma.Decimal(e.estimatedCostUsd),
        // v1.1 富 metadata（全部可空）
        gitBranch: e.gitBranch ?? null,
        agentVersionEvent: e.agentVersionEvent ?? null,
        worktreeLabel: e.worktreeLabel ?? null,
        cwdBasename: e.cwdBasename ?? null,
        turnIndex: e.turnIndex ?? null,
        toolUseCount: e.toolUseCount ?? null,
        toolNames: (e.toolNames as any) ?? null,
        stopReason: e.stopReason ?? null,
        serviceTier: e.serviceTier ?? null,
        organizationId: params.organizationId,
        createdById: params.userId,
      });
    }

    let accepted = 0;
    let deduped = 0;
    if (valid.length > 0) {
      accepted = await this.insertEventsRaw(valid);
      deduped = valid.length - accepted;
    }

    if (dlq.length > 0) {
      await this.prisma.aiUsageEventDlq.createMany({
        data: dlq.map((d) => ({
          deviceId: device.id,
          reason: d.reason,
          rawPayload: d.rawPayload,
          organizationId: params.organizationId,
          createdById: params.userId,
        })),
      });
    }

    return {
      accepted,
      deduped,
      rejected: 0,
      dlq: dlq.length,
      deviceId: device.id,
    };
  }

  /**
   * Multi-VALUES INSERT，ON CONFLICT DO NOTHING 去重。
   * 一次 round-trip 写完整批 → 单批 500 行 25-50× 快于逐行写
   */
  private async insertEventsRaw(rows: Prisma.AiUsageEventCreateManyInput[]): Promise<number> {
    if (rows.length === 0) return 0;
    // v1.1：17 标准列 + 9 富 metadata 列 = 26 占位符/row
    const COLS_PER_ROW = 26;
    const valueClauses: string[] = [];
    const params: any[] = [];
    for (let i = 0; i < rows.length; i++) {
      const b = i * COLS_PER_ROW;
      valueClauses.push(
        `(gen_random_uuid(),` +
          `$${b + 1},$${b + 2}::uuid,$${b + 3}::uuid,$${b + 4}::"platform_ai_usage"."AiUsageTool",` +
          `$${b + 5},$${b + 6},$${b + 7},$${b + 8},$${b + 9}::timestamptz,$${b + 10}::timestamptz,` +
          `$${b + 11},$${b + 12},$${b + 13},$${b + 14},$${b + 15}::numeric,` +
          `$${b + 16},$${b + 17},$${b + 18},$${b + 19},$${b + 20},$${b + 21},$${b + 22}::jsonb,$${b + 23},$${b + 24},` +
          `$${b + 25}::uuid,$${b + 26}::uuid,NOW())`,
      );
      const r = rows[i] as any;
      params.push(
        r.rawMessageId,
        r.deviceId,
        r.userId,
        r.tool,
        r.sessionId,
        r.projectPath,
        r.projectBasename,
        r.model,
        r.ts,
        r.receivedAt,
        r.inputTokens ?? 0,
        r.outputTokens ?? 0,
        r.cacheCreationTokens ?? 0,
        r.cacheReadTokens ?? 0,
        r.estimatedCostUsd,
        // 富 metadata 9 列
        r.gitBranch ?? null,
        r.agentVersionEvent ?? null,
        r.worktreeLabel ?? null,
        r.cwdBasename ?? null,
        r.turnIndex ?? null,
        r.toolUseCount ?? null,
        r.toolNames != null ? JSON.stringify(r.toolNames) : null,
        r.stopReason ?? null,
        r.serviceTier ?? null,
        r.organizationId,
        r.createdById,
      );
    }
    const sql = `INSERT INTO platform_ai_usage.ai_usage_events
        (id, raw_message_id, device_id, user_id, tool, session_id, project_path, project_basename, model, ts, received_at,
         input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, estimated_cost_usd,
         git_branch, agent_version_event, worktree_label, cwd_basename, turn_index, tool_use_count, tool_names, stop_reason, service_tier,
         organization_id, created_by_id, updated_at)
      VALUES ${valueClauses.join(',')}
      ON CONFLICT (raw_message_id) DO NOTHING`;
    const inserted = await this.prisma.$executeRawUnsafe(sql, ...params);
    return Number(inserted);
  }

  private async dlqAll(
    events: any[],
    reason: AiUsageDlqReason,
    deviceId: string | null,
    organizationId: string,
    createdById: string,
  ) {
    if (events.length === 0) return;
    await this.prisma.aiUsageEventDlq.createMany({
      data: events.map((e) => ({
        deviceId: deviceId ?? undefined,
        reason,
        rawPayload: e,
        organizationId,
        createdById,
      })),
    });
  }
}
