import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { randomUUID } from 'crypto';
import * as fs from 'fs';
import { Readable } from 'stream';

type RagflowResponse<T> = {
  code: number;
  data?: T;
  message?: string;
};

@Injectable()
export class RagflowService {
  private readonly logger = new Logger(RagflowService.name);
  private readonly baseUrl: string;
  private readonly apiKey: string;
  private readonly embeddingModel: string | null;
  private readonly embeddingResetOnChange: boolean;
  private datasetIdCache: string | null = null;
  private chatIdCache: string | null = null;

  constructor(private readonly configService: ConfigService) {
    const baseUrl = this.configService.get<string>('RAGFLOW_BASE_URL') || '';
    this.baseUrl = baseUrl.replace(/\/$/, '');
    this.apiKey = this.configService.get<string>('RAGFLOW_API_KEY') || '';
    const embeddingModel = this.configService.get<string>('RAGFLOW_EMBEDDING_MODEL') || '';
    this.embeddingModel = embeddingModel.trim() ? embeddingModel.trim() : null;
    this.embeddingResetOnChange =
      (this.configService.get<string>('RAGFLOW_EMBEDDING_RESET_ON_CHANGE') || '').toLowerCase() === 'true';
  }

  async resolveDatasetId(): Promise<string> {
    if (this.datasetIdCache) {
      return this.datasetIdCache;
    }

    const configuredId = this.configService.get<string>('RAGFLOW_DATASET_ID');
    if (configuredId) {
      this.datasetIdCache = configuredId;
      return configuredId;
    }

    const datasetName = this.configService.get<string>('RAGFLOW_DATASET_NAME') || 'ffoa-knowledge-base';
    const existing = await this.listDatasets({ name: datasetName });
    if (existing.length > 0) {
      this.datasetIdCache = existing[0].id;
      return existing[0].id;
    }

    const description = this.configService.get<string>('RAGFLOW_DATASET_DESCRIPTION') || 'FF AI Workspace Knowledge Base';
    const created = await this.createDataset(datasetName, description, this.embeddingModel || undefined);
    this.datasetIdCache = created.id;
    return created.id;
  }

  async resolveChatId(datasetId: string): Promise<string> {
    if (this.chatIdCache) {
      return this.chatIdCache;
    }

    const configuredId = this.configService.get<string>('RAGFLOW_CHAT_ID');
    if (configuredId) {
      this.chatIdCache = configuredId;
      return configuredId;
    }

    const chatName = this.configService.get<string>('RAGFLOW_CHAT_NAME') || 'ffoa-knowledge-assistant';
    const existing = await this.listChatsByName(chatName);
    if (existing.length > 0) {
      const chatId = existing[0].id;
      await this.ensureChatDatasets(chatId, datasetId);
      this.chatIdCache = chatId;
      return chatId;
    }

    const created = await this.createChatAssistant(chatName, datasetId);
    this.chatIdCache = created.id;
    return created.id;
  }

  async uploadDocument(
    datasetId: string,
    filePath: string,
    displayName: string,
  ): Promise<{ id: string; name: string }> {
    const boundary = `----ffoa-ragflow-${randomUUID()}`;
    const header = Buffer.from(
      `--${boundary}\r\n` +
        `Content-Disposition: form-data; name="file"; filename="${displayName}"\r\n` +
        'Content-Type: application/octet-stream\r\n\r\n',
    );
    const footer = Buffer.from(`\r\n--${boundary}--\r\n`);
    const fileHandle = await fs.promises.open(filePath, 'r');
    const fileStat = await fileHandle.stat();
    const fileStream = fileHandle.createReadStream();
    const body = Readable.from(
      (async function* () {
        yield header;
        for await (const chunk of fileStream) {
          yield chunk;
        }
        yield footer;
      })(),
    );

    let response: RagflowResponse<Array<{ id: string; name: string }>>;
    try {
      response = await this.request<RagflowResponse<Array<{ id: string; name: string }>>>(
        'POST',
        `/api/v1/datasets/${datasetId}/documents`,
        body,
        {
          'Content-Type': `multipart/form-data; boundary=${boundary}`,
          'Content-Length': `${header.length + fileStat.size + footer.length}`,
        },
      );
    } finally {
      try {
        await fileHandle.close();
      } catch {
        // ignore close errors
      }
    }

    if (response.code !== 0 || !response.data || response.data.length === 0) {
      throw new Error(response.message || 'RAGFlow upload failed');
    }

    return response.data[0];
  }

  async ensureDatasetEmbeddingModel(
    datasetId: string,
  ): Promise<{ changed: boolean; reset: boolean; target?: string }> {
    if (!this.embeddingModel) {
      return { changed: false, reset: false };
    }

    const dataset = await this.getDatasetById(datasetId);
    if (!dataset) {
      return { changed: false, reset: false };
    }

    if (dataset.embedding_model === this.embeddingModel) {
      return { changed: false, reset: false };
    }

    const hasContent = (dataset.chunk_count || 0) > 0 || (dataset.document_count || 0) > 0;
    if (hasContent && !this.embeddingResetOnChange) {
      throw new Error(
        `RAGFlow dataset embedding_model mismatch: ${dataset.embedding_model} -> ${this.embeddingModel}. ` +
          'Set RAGFLOW_EMBEDDING_RESET_ON_CHANGE=true to purge dataset content and re-sync.',
      );
    }

    if (hasContent) {
      await this.deleteAllDocuments(datasetId);
    }

    await this.updateDatasetEmbeddingModel(datasetId, this.embeddingModel);
    return { changed: true, reset: hasContent, target: this.embeddingModel };
  }

  async deleteDocuments(datasetId: string, ids: string[]): Promise<void> {
    if (ids.length === 0) {
      return;
    }

    const response = await this.request<RagflowResponse<unknown>>(
      'DELETE',
      `/api/v1/datasets/${datasetId}/documents`,
      JSON.stringify({ document_ids: ids }),
      { 'Content-Type': 'application/json' },
    );

    if (response.code !== 0) {
      const message = response.message || 'RAGFlow delete failed';
      if (message.includes('Documents not found')) {
        this.logger.warn(`RAGFlow delete ignored missing documents: ${message}`);
        return;
      }
      throw new Error(message);
    }
  }

  async getDocumentStatus(datasetId: string, documentId: string): Promise<any | null> {
    const response = await this.request<RagflowResponse<{ docs: any[] }>>(
      'GET',
      `/api/v1/datasets/${datasetId}/documents?page=1&page_size=1&id=${encodeURIComponent(documentId)}`,
    );

    if (response.code !== 0 || !response.data) {
      return null;
    }

    return response.data.docs?.[0] ?? null;
  }

  isRecoverableDocumentFailure(message?: string | null): boolean {
    if (!message) {
      return false;
    }
    const lower = message.toLowerCase();
    return (
      lower.includes('文档不存在') ||
      lower.includes('document not found') ||
      lower.includes('documents not found') ||
      lower.includes('not found') ||
      lower.includes("don't own the document") ||
      lower.includes('do not own the document') ||
      lower.includes('not owner of the document') ||
      lower.includes('file is empty') ||
      lower.includes('can not find file') ||
      lower.includes('no such file') ||
      lower.includes('package not found') ||
      lower.includes('embedding extraction from file path is not supported') ||
      lower.includes('model(@none) not authorized')
    );
  }

  async updateDocumentMetadata(
    datasetId: string,
    documentId: string,
    metadata: Record<string, string>,
  ): Promise<void> {
    const updates = Object.entries(metadata).map(([key, value]) => ({
      key,
      value,
    }));

    if (updates.length === 0) {
      return;
    }

    const response = await this.request<RagflowResponse<{ updated: number }>>(
      'POST',
      `/api/v1/datasets/${datasetId}/metadata/update`,
      JSON.stringify({
        selector: { document_ids: [documentId] },
        updates,
      }),
      { 'Content-Type': 'application/json' },
    );

    if (response.code !== 0) {
      throw new Error(response.message || 'RAGFlow metadata update failed');
    }
  }

  async parseDocuments(datasetId: string, documentIds: string[]): Promise<void> {
    if (documentIds.length === 0) {
      return;
    }

    const response = await this.request<RagflowResponse<unknown>>(
      'POST',
      `/api/v1/datasets/${datasetId}/chunks`,
      JSON.stringify({ document_ids: documentIds }),
      { 'Content-Type': 'application/json' },
    );

    if (response.code !== 0) {
      throw new Error(response.message || 'RAGFlow parse failed');
    }
  }

  async retrieve(datasetId: string, question: string, pageSize: number): Promise<any> {
    const response = await this.request<RagflowResponse<unknown>>(
      'POST',
      '/api/v1/retrieval',
      JSON.stringify({
        question,
        dataset_ids: [datasetId],
        page: 1,
        page_size: pageSize,
      }),
      { 'Content-Type': 'application/json' },
    );

    if (response.code !== 0) {
      throw new Error(response.message || 'RAGFlow retrieval failed');
    }

    return response.data;
  }

  async createChatCompletion(chatId: string, question: string): Promise<any> {
    const response = await this.request<any>(
      'POST',
      `/api/v1/chats_openai/${chatId}/chat/completions`,
      JSON.stringify({
        model: 'ragflow',
        messages: [{ role: 'user', content: question }],
        stream: false,
        extra_body: { reference: true },
      }),
      { 'Content-Type': 'application/json' },
    );

    return response;
  }

  private async listDatasets(params: { name?: string; id?: string }): Promise<any[]> {
    const query: string[] = ['page=1', 'page_size=30'];
    if (params.name) {
      query.push(`name=${encodeURIComponent(params.name)}`);
    }
    if (params.id) {
      query.push(`id=${encodeURIComponent(params.id)}`);
    }

    const response = await this.request<RagflowResponse<any[]>>(
      'GET',
      `/api/v1/datasets?${query.join('&')}`,
    );

    if (response.code !== 0) {
      return [];
    }

    return response.data ?? [];
  }

  private async getDatasetById(datasetId: string): Promise<any | null> {
    const datasets = await this.listDatasets({ id: datasetId });
    return datasets[0] ?? null;
  }

  private async createDataset(
    name: string,
    description: string,
    embeddingModel?: string,
  ): Promise<{ id: string; name: string }> {
    const payload: Record<string, unknown> = {
      name,
      description,
      permission: 'team',
    };
    if (embeddingModel) {
      payload.embedding_model = embeddingModel;
    }

    const response = await this.request<RagflowResponse<{ id: string; name: string }>>(
      'POST',
      '/api/v1/datasets',
      JSON.stringify(payload),
      { 'Content-Type': 'application/json' },
    );

    if (response.code !== 0 || !response.data) {
      throw new Error(response.message || 'RAGFlow create dataset failed');
    }

    return response.data;
  }

  private async updateDatasetEmbeddingModel(datasetId: string, embeddingModel: string): Promise<void> {
    const response = await this.request<RagflowResponse<any[]>>(
      'PUT',
      `/api/v1/datasets/${datasetId}`,
      JSON.stringify({ embedding_model: embeddingModel }),
      { 'Content-Type': 'application/json' },
    );

    if (response.code !== 0) {
      throw new Error(response.message || 'RAGFlow update dataset failed');
    }
  }

  private async listDocuments(datasetId: string, page: number, pageSize: number): Promise<any[]> {
    const response = await this.request<RagflowResponse<any>>(
      'GET',
      `/api/v1/datasets/${datasetId}/documents?page=${page}&page_size=${pageSize}`,
    );

    if (response.code !== 0) {
      return [];
    }

    if (Array.isArray(response.data)) {
      return response.data;
    }
    return response.data?.docs ?? [];
  }

  private async deleteAllDocuments(datasetId: string): Promise<void> {
    const pageSize = 100;
    let page = 1;
    while (true) {
      const documents = await this.listDocuments(datasetId, page, pageSize);
      if (!documents.length) {
        break;
      }
      const ids = documents.map((doc: any) => doc.id).filter(Boolean);
      await this.deleteDocuments(datasetId, ids);
      if (documents.length < pageSize) {
        break;
      }
      page += 1;
    }
  }

  private async listChatsByName(name: string): Promise<Array<{ id: string; name: string }>> {
    const response = await this.request<RagflowResponse<Array<{ id: string; name: string }>>>(
      'GET',
      `/api/v1/chats?page=1&page_size=30&name=${encodeURIComponent(name)}`,
    );

    if (response.code !== 0) {
      return [];
    }

    return response.data ?? [];
  }

  private async getChatById(chatId: string): Promise<any | null> {
    const response = await this.request<RagflowResponse<any[]>>(
      'GET',
      `/api/v1/chats?page=1&page_size=30&id=${encodeURIComponent(chatId)}`,
    );

    if (response.code !== 0) {
      return null;
    }

    return response.data?.[0] ?? null;
  }

  private async updateChatDatasets(chatId: string, datasetIds: string[]): Promise<void> {
    const response = await this.request<RagflowResponse<any>>(
      'PUT',
      `/api/v1/chats/${chatId}`,
      JSON.stringify({ dataset_ids: datasetIds }),
      { 'Content-Type': 'application/json' },
    );

    if (response.code !== 0) {
      throw new Error(response.message || 'RAGFlow update chat failed');
    }
  }

  private async ensureChatDatasets(chatId: string, datasetId: string): Promise<void> {
    const chat = await this.getChatById(chatId);
    if (!chat) {
      return;
    }
    const datasets: Array<{ id?: string }> = chat.datasets ?? [];
    const hasDataset = datasets.some((dataset) => dataset?.id === datasetId);
    if (!hasDataset) {
      await this.updateChatDatasets(chatId, [datasetId]);
    }
  }

  private async createChatAssistant(name: string, datasetId: string): Promise<{ id: string; name: string }> {
    const response = await this.request<RagflowResponse<{ id: string; name: string }>>(
      'POST',
      '/api/v1/chats',
      JSON.stringify({
        name,
        dataset_ids: [datasetId],
      }),
      { 'Content-Type': 'application/json' },
    );

    if (response.code !== 0 || !response.data) {
      throw new Error(response.message || 'RAGFlow create chat failed');
    }

    return response.data;
  }

  private async request<T>(
    method: string,
    path: string,
    body?: any,
    headers: Record<string, string> = {},
  ): Promise<T> {
    if (!this.baseUrl) {
      throw new Error('RAGFLOW_BASE_URL is not configured');
    }
    if (!this.apiKey) {
      throw new Error('RAGFLOW_API_KEY is not configured');
    }
    const requestInit: RequestInit & { duplex?: 'half' } = {
      method,
      headers: {
        Authorization: `Bearer ${this.apiKey}`,
        ...headers,
      },
      body,
    };
    if (body && typeof body === 'object' && 'pipe' in body) {
      requestInit.duplex = 'half';
    }

    const response = await fetch(`${this.baseUrl}${path}`, requestInit);

    const text = await response.text();
    const payload = text ? JSON.parse(text) : {};

    if (!response.ok) {
      this.logger.error(`RAGFlow request failed: ${response.status} ${response.statusText}`);
      throw new Error(payload?.message || 'RAGFlow request failed');
    }

    return payload as T;
  }
}
