diff --git a/packages/tale_knowledge/src/tale_knowledge/extraction/image.py b/packages/tale_knowledge/src/tale_knowledge/extraction/image.py index ebbb36b073..1074f6f487 100644 --- a/packages/tale_knowledge/src/tale_knowledge/extraction/image.py +++ b/packages/tale_knowledge/src/tale_knowledge/extraction/image.py @@ -48,7 +48,10 @@ async def extract_text_from_image_bytes( Tuple of (extracted_text, vision_was_used). """ if not vision_client: - logger.warning(f"No vision client provided for image extraction: {filename}") + logger.warning( + f"No vision client configured for image extraction: {filename}. " + f"Image OCR requires a vision model in provider settings." + ) return "", False logger.info(f"Processing image: {filename}") diff --git a/packages/tale_knowledge/src/tale_knowledge/extraction/pdf.py b/packages/tale_knowledge/src/tale_knowledge/extraction/pdf.py index b48da93548..c605466a41 100644 --- a/packages/tale_knowledge/src/tale_knowledge/extraction/pdf.py +++ b/packages/tale_knowledge/src/tale_knowledge/extraction/pdf.py @@ -3,12 +3,20 @@ Hybrid approach: 1. Digital PDFs: Extract text directly using PyMuPDF (no API calls) 2. Embedded images: Large images (>50% page area) are OCR'd, smaller ones described + +Scanned page detection: +- Pages with < SCANNED_PAGE_TEXT_THRESHOLD characters of digital text are + considered "scanned" when they also contain at least one embedded image + covering > LARGE_IMAGE_RATIO of the page area. +- The extraction result includes `scanned_pages_detected` and `ocr_applied` + so callers can inform the user about OCR activity. """ from __future__ import annotations import asyncio from collections.abc import Callable +from dataclasses import dataclass from functools import partial from typing import TYPE_CHECKING @@ -23,10 +31,21 @@ ProgressCallback = Callable[[int, int], None] # (pages_done, total_pages) LARGE_IMAGE_RATIO = 0.5 +SCANNED_PAGE_TEXT_THRESHOLD = 50 MAX_PAGES = 2000 DEFAULT_PAGE_CONCURRENCY = 8 +@dataclass(frozen=True, slots=True) +class PdfExtractionResult: + """Result of a PDF text extraction.""" + + text: str + vision_used: bool + scanned_pages_detected: int + ocr_applied: bool + + def _extract_page_text_sync(page_bytes: bytes) -> dict: """Extract text and image blocks from a single page (runs in thread pool). @@ -95,11 +114,14 @@ async def _extract_page_with_layout( vision_semaphore: asyncio.Semaphore, vision_client: VisionClient | None, process_images: bool, -) -> tuple[str, bool]: +) -> tuple[str, bool, bool]: """Extract page content preserving text and image positions. Images covering >50% of the page area are OCR'd (likely scanned pages), smaller images are described. + + Returns: + Tuple of (content, vision_used, is_scanned_page). """ loop = asyncio.get_running_loop() @@ -108,8 +130,18 @@ async def _extract_page_with_layout( ) elements: list[tuple[float, str]] = text_data["elements"] images: list[tuple[float, bytes, float]] = text_data["images"] + total_text_len: int = text_data["total_text_len"] vision_used = False + has_large_image = any(area_ratio > LARGE_IMAGE_RATIO for _, _, area_ratio in images) + is_scanned_page = total_text_len < SCANNED_PAGE_TEXT_THRESHOLD and has_large_image + + if is_scanned_page and not vision_client: + logger.warning( + f"Page {page_num + 1}: Scanned page detected ({total_text_len} chars, " + f"large image present) but no vision client configured — OCR skipped" + ) + if process_images and vision_client and images: for y0, img_bytes, area_ratio in images: try: @@ -132,7 +164,7 @@ async def _extract_page_with_layout( elements.sort(key=lambda x: x[0]) content = "\n\n".join(elem[1] for elem in elements) - return content, vision_used + return content, vision_used, is_scanned_page async def extract_text_from_pdf_bytes( @@ -143,7 +175,7 @@ async def extract_text_from_pdf_bytes( process_images: bool = True, max_pages: int = MAX_PAGES, on_progress: ProgressCallback | None = None, -) -> tuple[str, bool]: +) -> PdfExtractionResult: """Extract text from PDF bytes. Args: @@ -156,7 +188,7 @@ async def extract_text_from_pdf_bytes( after each page completes. Safe to call from concurrent tasks. Returns: - Tuple of (extracted_text, vision_was_used). + PdfExtractionResult with text, vision_used, scanned_pages_detected, and ocr_applied. """ logger.info(f"Processing PDF: {filename}") @@ -190,10 +222,12 @@ async def extract_text_from_pdf_bytes( pages_done = 0 - async def process_page(page_num: int, page_bytes: bytes) -> tuple[int, str, bool]: + async def process_page( + page_num: int, page_bytes: bytes + ) -> tuple[int, str, bool, bool]: nonlocal pages_done async with page_semaphore: - content, vis_used = await _extract_page_with_layout( + content, vis_used, is_scanned = await _extract_page_with_layout( page_bytes, page_num, vision_semaphore, @@ -203,29 +237,52 @@ async def process_page(page_num: int, page_bytes: bytes) -> tuple[int, str, bool pages_done += 1 if on_progress is not None: on_progress(pages_done, pages_to_process) - return page_num, f"--- Page {page_num + 1} ---\n{content}", vis_used + return ( + page_num, + f"--- Page {page_num + 1} ---\n{content}", + vis_used, + is_scanned, + ) tasks = [process_page(pn, pb) for pn, pb in page_data] results = await asyncio.gather(*tasks, return_exceptions=True) pages_content: list[tuple[int, str]] = [] vision_used = False + scanned_pages_detected = 0 + ocr_applied = False for result in results: if isinstance(result, Exception): logger.warning(f"Page processing failed: {result}") continue - page_num, content, page_vision_used = result + page_num, content, page_vision_used, page_is_scanned = result pages_content.append((page_num, content)) if page_vision_used: vision_used = True + if page_is_scanned: + scanned_pages_detected += 1 + if page_vision_used: + ocr_applied = True pages_content.sort(key=lambda x: x[0]) combined_text = "\n\n".join(p[1] for p in pages_content) + if scanned_pages_detected > 0 and not vision_client: + logger.warning( + f"PDF '{filename}': {scanned_pages_detected} scanned page(s) detected " + f"but no vision client configured — text may be incomplete" + ) + logger.info( f"PDF processing complete: {pages_to_process} pages, {len(combined_text)} chars, " - f"Vision API used: {vision_used}" + f"Vision API used: {vision_used}, scanned pages: {scanned_pages_detected}, " + f"OCR applied: {ocr_applied}" ) - return combined_text, vision_used + return PdfExtractionResult( + text=combined_text, + vision_used=vision_used, + scanned_pages_detected=scanned_pages_detected, + ocr_applied=ocr_applied, + ) diff --git a/packages/tale_knowledge/src/tale_knowledge/extraction/router.py b/packages/tale_knowledge/src/tale_knowledge/extraction/router.py index e16e2cdcd7..5d39ed9ca8 100644 --- a/packages/tale_knowledge/src/tale_knowledge/extraction/router.py +++ b/packages/tale_knowledge/src/tale_knowledge/extraction/router.py @@ -67,13 +67,14 @@ async def extract_text( if suffix in PDF_EXTENSIONS: from .pdf import extract_text_from_pdf_bytes - return await extract_text_from_pdf_bytes( + result = await extract_text_from_pdf_bytes( file_bytes, filename, vision_client=vision_client, process_images=process_images, on_progress=on_progress, ) + return result.text, result.vision_used if suffix in DOCX_EXTENSIONS: from .docx import extract_text_from_docx_bytes diff --git a/packages/tale_knowledge/tests/test_pdf.py b/packages/tale_knowledge/tests/test_pdf.py index 8b60cf9843..f7190a2df5 100644 --- a/packages/tale_knowledge/tests/test_pdf.py +++ b/packages/tale_knowledge/tests/test_pdf.py @@ -7,6 +7,7 @@ from tale_knowledge.extraction.pdf import ( LARGE_IMAGE_RATIO, + PdfExtractionResult, _extract_page_text_sync, extract_text_from_pdf_bytes, ) @@ -114,9 +115,12 @@ class TestExtractTextFromPdfBytes: @pytest.mark.asyncio async def test_digital_pdf_extraction(self): pdf_bytes = _make_simple_pdf("Hello World") - text, vision_used = await extract_text_from_pdf_bytes(pdf_bytes) - assert "Hello World" in text - assert vision_used is False + result = await extract_text_from_pdf_bytes(pdf_bytes) + assert isinstance(result, PdfExtractionResult) + assert "Hello World" in result.text + assert result.vision_used is False + assert result.scanned_pages_detected == 0 + assert result.ocr_applied is False @pytest.mark.asyncio async def test_multi_page_pdf(self): @@ -127,11 +131,11 @@ async def test_multi_page_pdf(self): pdf_bytes = doc.tobytes() doc.close() - text, vision_used = await extract_text_from_pdf_bytes(pdf_bytes) - assert "Page 1" in text - assert "Page 2" in text - assert "Page 3" in text - assert vision_used is False + result = await extract_text_from_pdf_bytes(pdf_bytes) + assert "Page 1" in result.text + assert "Page 2" in result.text + assert "Page 3" in result.text + assert result.vision_used is False @pytest.mark.asyncio async def test_empty_pdf(self): @@ -140,17 +144,15 @@ async def test_empty_pdf(self): pdf_bytes = doc.tobytes() doc.close() - text, vision_used = await extract_text_from_pdf_bytes(pdf_bytes) - assert "--- Page 1 ---" in text + result = await extract_text_from_pdf_bytes(pdf_bytes) + assert "--- Page 1 ---" in result.text @pytest.mark.asyncio async def test_no_vision_without_client(self): pdf_bytes = _make_simple_pdf("Digital text only") - text, vision_used = await extract_text_from_pdf_bytes( - pdf_bytes, vision_client=None - ) - assert "Digital text only" in text - assert vision_used is False + result = await extract_text_from_pdf_bytes(pdf_bytes, vision_client=None) + assert "Digital text only" in result.text + assert result.vision_used is False @pytest.mark.asyncio async def test_image_described_with_vision_client(self): @@ -161,24 +163,20 @@ async def test_image_described_with_vision_client(self): mock_client.max_concurrent_pages = 3 mock_client.describe_image = AsyncMock(return_value="A red square image") - text, vision_used = await extract_text_from_pdf_bytes( - pdf_bytes, vision_client=mock_client - ) - assert long_text in text - assert "[Image: A red square image]" in text - assert vision_used is True + result = await extract_text_from_pdf_bytes(pdf_bytes, vision_client=mock_client) + assert long_text in result.text + assert "[Image: A red square image]" in result.text + assert result.vision_used is True mock_client.describe_image.assert_called() @pytest.mark.asyncio async def test_image_skipped_without_vision_client(self): long_text = "This document has embedded images but no vision client provided" pdf_bytes = _make_pdf_with_image(long_text, image_size=200) - text, vision_used = await extract_text_from_pdf_bytes( - pdf_bytes, vision_client=None - ) - assert long_text in text - assert "[Image:" not in text - assert vision_used is False + result = await extract_text_from_pdf_bytes(pdf_bytes, vision_client=None) + assert long_text in result.text + assert "[Image:" not in result.text + assert result.vision_used is False @pytest.mark.asyncio async def test_image_skipped_when_process_images_false(self): @@ -188,11 +186,11 @@ async def test_image_skipped_when_process_images_false(self): mock_client = AsyncMock() mock_client.max_concurrent_pages = 3 - text, vision_used = await extract_text_from_pdf_bytes( + result = await extract_text_from_pdf_bytes( pdf_bytes, vision_client=mock_client, process_images=False ) - assert long_text in text - assert "[Image:" not in text + assert long_text in result.text + assert "[Image:" not in result.text mock_client.describe_image.assert_not_called() @pytest.mark.asyncio @@ -203,10 +201,19 @@ async def test_large_image_uses_ocr(self): mock_client.max_concurrent_pages = 3 mock_client.ocr_image = AsyncMock(return_value="OCR extracted text from scan") - text, vision_used = await extract_text_from_pdf_bytes( - pdf_bytes, vision_client=mock_client - ) - assert "OCR extracted text from scan" in text - assert vision_used is True + result = await extract_text_from_pdf_bytes(pdf_bytes, vision_client=mock_client) + assert "OCR extracted text from scan" in result.text + assert result.vision_used is True + assert result.scanned_pages_detected == 1 + assert result.ocr_applied is True mock_client.ocr_image.assert_called() mock_client.describe_image.assert_not_called() + + @pytest.mark.asyncio + async def test_scanned_page_without_vision_client(self): + pdf_bytes = _make_fullpage_image_pdf() + + result = await extract_text_from_pdf_bytes(pdf_bytes, vision_client=None) + assert result.scanned_pages_detected == 1 + assert result.ocr_applied is False + assert result.vision_used is False diff --git a/services/crawler/app/models.py b/services/crawler/app/models.py index 9d5a51c31d..a5bb39d263 100644 --- a/services/crawler/app/models.py +++ b/services/crawler/app/models.py @@ -376,6 +376,9 @@ class FileMetadataResponse(BaseModel): slide_count: int | None = Field(None, description="Number of slides (PPTX)") created_at: int | None = Field(None, description="Document creation date (Unix ms)") modified_at: int | None = Field(None, description="Document modification date (Unix ms)") + scanned_pages_detected: int | None = Field( + None, description="Number of pages detected as scanned (low text content)" + ) error: str | None = Field(None, description="Error message if extraction failed") diff --git a/services/crawler/app/routers/docx.py b/services/crawler/app/routers/docx.py index dc53455863..54dfd1640a 100644 --- a/services/crawler/app/routers/docx.py +++ b/services/crawler/app/routers/docx.py @@ -499,6 +499,7 @@ async def extract_docx_metadata(file: UploadFile = _FILE_UPLOAD): author=meta["author"] or None, created_at=meta["created_at"], modified_at=meta["modified_at"], + scanned_pages_detected=0, ) except HTTPException: raise diff --git a/services/crawler/app/routers/pdf.py b/services/crawler/app/routers/pdf.py index f83aa98ea3..24f3c9631c 100644 --- a/services/crawler/app/routers/pdf.py +++ b/services/crawler/app/routers/pdf.py @@ -251,6 +251,20 @@ async def extract_pdf_metadata(file: UploadFile = _FILE_UPLOAD): doc = fitz.open(stream=file_bytes, filetype="pdf") raw = doc.metadata or {} page_count = len(doc) + + large_image_ratio = 0.5 + scanned_count = 0 + for page_num in range(page_count): + page = doc[page_num] + page_area = page.rect.get_area() + if page_area <= 0: + continue + for img in page.get_images(full=True): + bbox = page.get_image_bbox(img) + if bbox and bbox.get_area() / page_area > large_image_ratio: + scanned_count += 1 + break + doc.close() return FileMetadataResponse( @@ -262,6 +276,7 @@ async def extract_pdf_metadata(file: UploadFile = _FILE_UPLOAD): page_count=page_count, created_at=_parse_pdf_date(raw.get("creationDate")), modified_at=_parse_pdf_date(raw.get("modDate")), + scanned_pages_detected=scanned_count, ) except HTTPException: raise diff --git a/services/crawler/app/routers/pptx.py b/services/crawler/app/routers/pptx.py index 9c512a73d0..593cbaaea9 100644 --- a/services/crawler/app/routers/pptx.py +++ b/services/crawler/app/routers/pptx.py @@ -301,6 +301,7 @@ async def extract_pptx_metadata(file: UploadFile = _FILE_UPLOAD): slide_count=len(prs.slides), created_at=meta["created_at"], modified_at=meta["modified_at"], + scanned_pages_detected=0, ) except HTTPException: raise diff --git a/services/crawler/app/services/file_parser_service.py b/services/crawler/app/services/file_parser_service.py index c01a6fd8d5..dffaa2827c 100644 --- a/services/crawler/app/services/file_parser_service.py +++ b/services/crawler/app/services/file_parser_service.py @@ -204,7 +204,7 @@ async def parse_pdf_with_vision( try: acc = UsageAccumulator() - pages_content, vision_used = await extract_text_from_pdf_bytes( + pages_content, vision_used, extraction_meta = await extract_text_from_pdf_bytes( file_bytes, filename, process_images=process_images, @@ -245,6 +245,8 @@ async def parse_pdf_with_vision( "page_count": len(pages_content), "full_text": "\n\n".join(pages_content), "vision_used": vision_used, + "scanned_pages_detected": extraction_meta.scanned_pages_detected, + "ocr_applied": extraction_meta.ocr_applied, "metadata": pdf_metadata, } if acc.total_tokens > 0: diff --git a/services/crawler/app/services/vision/pdf_extractor.py b/services/crawler/app/services/vision/pdf_extractor.py index 7bf577dce2..6a570d28d3 100644 --- a/services/crawler/app/services/vision/pdf_extractor.py +++ b/services/crawler/app/services/vision/pdf_extractor.py @@ -5,9 +5,15 @@ 2. Scanned PDFs: Detect low-text pages and send to Vision API for OCR 3. Embedded images: Extract and describe using Vision API 4. Position preservation: Maintain relative positions of text and images + +Scanned page detection uses a text threshold of MIN_TEXT_THRESHOLD characters. +This is consistent with the knowledge extraction pipeline in tale_knowledge +which uses SCANNED_PAGE_TEXT_THRESHOLD (same value) combined with a large-image +area ratio check. """ import asyncio +from dataclasses import dataclass import fitz # PyMuPDF from loguru import logger @@ -19,6 +25,16 @@ MIN_IMAGE_SIZE = 10000 # ~100x100 pixels +@dataclass +class PdfExtractionMetadata: + """Metadata about the PDF extraction process.""" + + scanned_pages_detected: int = 0 + ocr_applied: bool = False + pages_processed: int = 0 + vision_used: bool = False + + async def _describe_image( doc: fitz.Document, xref: int, @@ -73,7 +89,7 @@ async def _extract_page_with_layout( process_images: bool, ocr_scanned_pages: bool, usage: UsageAccumulator | None = None, -) -> tuple[str, bool]: +) -> tuple[str, bool, bool]: """Extract page content preserving text and image positions. Args: @@ -84,10 +100,11 @@ async def _extract_page_with_layout( ocr_scanned_pages: Whether to OCR pages with low text content Returns: - Tuple of (page_content, vision_used) + Tuple of (page_content, vision_used, is_scanned_page). """ elements: list[tuple[float, str]] = [] # (y_coord, content) vision_used = False + is_scanned_page = False # Get text blocks with positions text_dict = page.get_text("dict") @@ -107,6 +124,7 @@ async def _extract_page_with_layout( # Check if this is a scanned page (low text content) if total_text_len < MIN_TEXT_THRESHOLD and ocr_scanned_pages: + is_scanned_page = True logger.debug(f"Page {page.number + 1}: Low text ({total_text_len} chars), sending to Vision API for OCR") ocr_text = await _ocr_page(page, semaphore, usage=usage) if ocr_text: @@ -138,7 +156,7 @@ async def _extract_page_with_layout( elements.sort(key=lambda x: x[0]) content = "\n\n".join(elem[1] for elem in elements) - return content, vision_used + return content, vision_used, is_scanned_page async def extract_text_from_pdf_bytes( @@ -148,7 +166,7 @@ async def extract_text_from_pdf_bytes( process_images: bool = True, ocr_scanned_pages: bool = True, usage: UsageAccumulator | None = None, -) -> tuple[list[str], bool]: +) -> tuple[list[str], bool, PdfExtractionMetadata]: """Extract text from PDF bytes with Vision support. Args: @@ -158,7 +176,7 @@ async def extract_text_from_pdf_bytes( ocr_scanned_pages: Whether to OCR pages with low text content Returns: - Tuple of (list of page contents, vision_used flag) + Tuple of (list of page contents, vision_used flag, extraction metadata). """ logger.info(f"Processing PDF: {filename}") @@ -166,27 +184,33 @@ async def extract_text_from_pdf_bytes( total_pages = len(doc) semaphore = asyncio.Semaphore(settings.vision_max_concurrent_pages) - async def process_page(page_num: int) -> tuple[int, str, bool]: + async def process_page(page_num: int) -> tuple[int, str, bool, bool]: page = doc[page_num] - content, vision_used = await _extract_page_with_layout( + content, page_vision_used, page_is_scanned = await _extract_page_with_layout( page, doc, semaphore, process_images, ocr_scanned_pages, usage=usage ) - return page_num, f"--- Page {page_num + 1} ---\n{content}", vision_used + return page_num, f"--- Page {page_num + 1} ---\n{content}", page_vision_used, page_is_scanned tasks = [process_page(i) for i in range(total_pages)] results = await asyncio.gather(*tasks, return_exceptions=True) pages_content: list[tuple[int, str]] = [] vision_used = False + scanned_pages_detected = 0 + ocr_applied = False for result in results: if isinstance(result, Exception): logger.warning(f"Page processing failed: {result}") continue - page_num, content, page_vision_used = result + page_num, content, page_vision_used, page_is_scanned = result pages_content.append((page_num, content)) if page_vision_used: vision_used = True + if page_is_scanned: + scanned_pages_detected += 1 + if page_vision_used: + ocr_applied = True doc.close() @@ -194,6 +218,16 @@ async def process_page(page_num: int) -> tuple[int, str, bool]: pages_content.sort(key=lambda x: x[0]) ordered_content = [p[1] for p in pages_content] - logger.info(f"PDF processing complete: {total_pages} pages, Vision API used: {vision_used}") + metadata = PdfExtractionMetadata( + scanned_pages_detected=scanned_pages_detected, + ocr_applied=ocr_applied, + pages_processed=total_pages, + vision_used=vision_used, + ) + + logger.info( + f"PDF processing complete: {total_pages} pages, Vision API used: {vision_used}, " + f"scanned pages: {scanned_pages_detected}, OCR applied: {ocr_applied}" + ) - return ordered_content, vision_used + return ordered_content, vision_used, metadata diff --git a/services/db/init-scripts/03-create-knowledge-database.sql b/services/db/init-scripts/03-create-knowledge-database.sql index d9d73d1398..ab53b780e4 100644 --- a/services/db/init-scripts/03-create-knowledge-database.sql +++ b/services/db/init-scripts/03-create-knowledge-database.sql @@ -175,6 +175,7 @@ CREATE TABLE IF NOT EXISTS private_knowledge.documents ( progress_detail TEXT, source_created_at TIMESTAMPTZ, source_modified_at TIMESTAMPTZ, + ocr_applied BOOLEAN NOT NULL DEFAULT FALSE, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); diff --git a/services/db/migrations/db/migrations/20260411000002_add_document_ocr_applied_column.sql b/services/db/migrations/db/migrations/20260411000002_add_document_ocr_applied_column.sql new file mode 100644 index 0000000000..8c10e4ca10 --- /dev/null +++ b/services/db/migrations/db/migrations/20260411000002_add_document_ocr_applied_column.sql @@ -0,0 +1,10 @@ +-- migrate:up +-- Track whether OCR was applied during document extraction. + +ALTER TABLE private_knowledge.documents + ADD COLUMN IF NOT EXISTS ocr_applied BOOLEAN NOT NULL DEFAULT FALSE; + +-- migrate:down + +ALTER TABLE private_knowledge.documents + DROP COLUMN IF EXISTS ocr_applied; diff --git a/services/platform/app/features/documents/components/document-preview-dialog.tsx b/services/platform/app/features/documents/components/document-preview-dialog.tsx index a6ba610458..a26e74e18c 100644 --- a/services/platform/app/features/documents/components/document-preview-dialog.tsx +++ b/services/platform/app/features/documents/components/document-preview-dialog.tsx @@ -112,6 +112,19 @@ function DetailsSidebar({ doc }: { doc: Document }) { /> + {doc.scannedPagesDetected != null && doc.scannedPagesDetected > 0 && ( + + {String(doc.scannedPagesDetected)} + {doc.ragStatus === 'completed' && doc.ocrApplied != null && ( + + {doc.ocrApplied + ? t('ocr.processingWithOcr') + : t('ocr.unavailable')} + + )} + + )} + {teamNames.length > 0 && ( diff --git a/services/platform/app/features/documents/hooks/use-documents-table-config.tsx b/services/platform/app/features/documents/hooks/use-documents-table-config.tsx index 723a6ab06e..4f54948fda 100644 --- a/services/platform/app/features/documents/hooks/use-documents-table-config.tsx +++ b/services/platform/app/features/documents/hooks/use-documents-table-config.tsx @@ -164,12 +164,24 @@ export function useDocumentsTableConfig({ — ) : ( - + + + {row.original.ragStatus === 'completed' && + row.original.ocrApplied === true && ( + + OCR + + )} + ), }, { diff --git a/services/platform/convex/agents/mutations.ts b/services/platform/convex/agents/mutations.ts index 80365dd80e..d7cba9b65a 100644 --- a/services/platform/convex/agents/mutations.ts +++ b/services/platform/convex/agents/mutations.ts @@ -167,13 +167,16 @@ export const addKnowledgeFile = mutation({ name: authUser.name, }); - await ctx.db.insert('fileMetadata', { - organizationId: args.organizationId, - storageId: args.fileId, - fileName: args.fileName, - contentType: args.contentType, - size: args.fileSize, - }); + await ctx.runMutation( + internal.file_metadata.internal_mutations.saveFileMetadata, + { + organizationId: args.organizationId, + storageId: args.fileId, + fileName: args.fileName, + contentType: args.contentType, + size: args.fileSize, + }, + ); const existing = await ctx.db .query('agentBindings') diff --git a/services/platform/convex/conversations/send_message_via_integration.ts b/services/platform/convex/conversations/send_message_via_integration.ts index 3729189c22..fc66cbd552 100644 --- a/services/platform/convex/conversations/send_message_via_integration.ts +++ b/services/platform/convex/conversations/send_message_via_integration.ts @@ -63,13 +63,16 @@ export async function sendMessageViaIntegration( if (args.attachments && args.attachments.length > 0) { attachmentsMeta = await Promise.all( args.attachments.map(async (att) => { - await ctx.db.insert('fileMetadata', { - organizationId: args.organizationId, - storageId: att.storageId, - fileName: att.fileName, - contentType: att.contentType, - size: att.size, - }); + await ctx.runMutation( + internal.file_metadata.internal_mutations.saveFileMetadata, + { + organizationId: args.organizationId, + storageId: att.storageId, + fileName: att.fileName, + contentType: att.contentType, + size: att.size, + }, + ); const url = await ctx.storage.getUrl(att.storageId); return { id: att.storageId, diff --git a/services/platform/convex/documents/__tests__/create_document_from_upload.test.ts b/services/platform/convex/documents/__tests__/create_document_from_upload.test.ts index 87b1a7af16..8ff85c5dfc 100644 --- a/services/platform/convex/documents/__tests__/create_document_from_upload.test.ts +++ b/services/platform/convex/documents/__tests__/create_document_from_upload.test.ts @@ -28,9 +28,10 @@ vi.mock('../../_generated/server', async (importOriginal) => { vi.mock('../../_generated/api', () => ({ internal: { - documents: { - internal_actions: { - extractDocumentDates: 'extractDocumentDates', + file_metadata: { + internal_mutations: { + saveFileMetadata: 'saveFileMetadata', + linkDocumentToFile: 'linkDocumentToFile', }, }, }, @@ -98,6 +99,7 @@ function createMockCtx() { insert: vi.fn().mockResolvedValue('fm_new'), patch: vi.fn().mockResolvedValue(undefined), }, + runMutation: vi.fn().mockResolvedValue('fm_new'), scheduler: { runAfter: vi.fn().mockResolvedValue(undefined), }, @@ -136,14 +138,14 @@ describe('createDocumentFromUpload', () => { await expect(handler(ctx, baseArgs)).rejects.toThrow('Unauthenticated'); }); - it('inserts file metadata when fileSize is provided', async () => { + it('saves file metadata via runMutation when fileSize is provided', async () => { mockGetAuthUser.mockResolvedValue(AUTH_USER); const ctx = createMockCtx(); const handler = await getHandler(); await handler(ctx, baseArgs); - expect(ctx.db.insert).toHaveBeenCalledWith('fileMetadata', { + expect(ctx.runMutation).toHaveBeenCalledWith('saveFileMetadata', { organizationId: 'org_1', storageId: 'storage_1', fileName: 'report.pdf', @@ -153,7 +155,7 @@ describe('createDocumentFromUpload', () => { }); }); - it('skips file metadata insert when fileSize is not provided', async () => { + it('skips file metadata save when fileSize is not provided', async () => { mockGetAuthUser.mockResolvedValue(AUTH_USER); const ctx = createMockCtx(); const handler = await getHandler(); @@ -161,7 +163,10 @@ describe('createDocumentFromUpload', () => { await handler(ctx, argsWithoutSize); - expect(ctx.db.insert).not.toHaveBeenCalled(); + expect(ctx.runMutation).not.toHaveBeenCalledWith( + 'saveFileMetadata', + expect.anything(), + ); }); it('creates document and returns documentId', async () => { @@ -227,19 +232,20 @@ describe('createDocumentFromUpload', () => { ).rejects.toThrow('Folder not accessible'); }); - it('patches fileMetadata with documentId after document creation', async () => { + it('links document to file after document creation', async () => { mockGetAuthUser.mockResolvedValue(AUTH_USER); const ctx = createMockCtx(); const handler = await getHandler(); await handler(ctx, baseArgs); - expect(ctx.db.patch).toHaveBeenCalledWith('fm_new', { + expect(ctx.runMutation).toHaveBeenCalledWith('linkDocumentToFile', { + storageId: 'storage_1', documentId: 'doc_created', }); }); - it('does not patch fileMetadata when fileSize is not provided', async () => { + it('does not link document to file when fileSize is not provided', async () => { mockGetAuthUser.mockResolvedValue(AUTH_USER); const ctx = createMockCtx(); const handler = await getHandler(); @@ -247,36 +253,18 @@ describe('createDocumentFromUpload', () => { await handler(ctx, argsWithoutSize); - expect(ctx.db.patch).not.toHaveBeenCalled(); - }); - - it('schedules extractDocumentDates for PDF uploads', async () => { - mockGetAuthUser.mockResolvedValue(AUTH_USER); - const ctx = createMockCtx(); - const handler = await getHandler(); - - await handler(ctx, baseArgs); - - expect(ctx.scheduler.runAfter).toHaveBeenCalledWith( - 0, - 'extractDocumentDates', - { - documentId: 'doc_created', - fileId: 'storage_1', - }, + expect(ctx.runMutation).not.toHaveBeenCalledWith( + 'linkDocumentToFile', + expect.anything(), ); }); - it('does not schedule extractDocumentDates for TXT uploads', async () => { + it('does not schedule extractDocumentDates (handled by saveFileMetadata)', async () => { mockGetAuthUser.mockResolvedValue(AUTH_USER); const ctx = createMockCtx(); const handler = await getHandler(); - await handler(ctx, { - ...baseArgs, - fileName: 'notes.txt', - contentType: 'text/plain', - }); + await handler(ctx, baseArgs); expect(ctx.scheduler.runAfter).not.toHaveBeenCalled(); }); diff --git a/services/platform/convex/documents/internal_actions.ts b/services/platform/convex/documents/internal_actions.ts index 6399f84858..8c4d58ff58 100644 --- a/services/platform/convex/documents/internal_actions.ts +++ b/services/platform/convex/documents/internal_actions.ts @@ -2,21 +2,14 @@ import { v } from 'convex/values'; -import { extractExtension } from '../../lib/shared/file-types'; import { fetchJson } from '../../lib/utils/type-cast-helpers'; -import { - isRecord, - getBoolean, - getNumber, - getString, -} from '../../lib/utils/type-guards'; +import { isRecord, getBoolean, getString } from '../../lib/utils/type-guards'; import { internal } from '../_generated/api'; import type { Id } from '../_generated/dataModel'; import { internalAction } from '../_generated/server'; import { buildDownloadUrl } from '../lib/helpers/public_storage_url'; import { getRagConfig } from '../lib/helpers/rag_config'; import { ragAction } from '../workflow_engine/action_defs/rag/rag_action'; -import { getCrawlerUrl } from './generate_document_helpers'; import type { GenerateDocxResult } from './generate_docx'; import * as DocumentsHelpers from './helpers'; import type { GenerateDocumentResult } from './types'; @@ -290,16 +283,25 @@ export const checkRagDocumentStatus = internalAction({ ? getString(docStatus, 'source_modified_at') : undefined, ); - - // Write dates BEFORE marking completed — if we crash after marking - // completed but before writing dates, the polling loop won't retry. - if (sourceCreatedAt != null || sourceModifiedAt != null) { + const ocrApplied = isRecord(docStatus) + ? getBoolean(docStatus, 'ocr_applied') + : undefined; + + // Write dates and OCR status BEFORE marking completed — if we crash + // after marking completed but before writing, the polling loop won't retry. + const datePatch: Record = {}; + if (sourceCreatedAt != null) + datePatch.sourceCreatedAt = sourceCreatedAt; + if (sourceModifiedAt != null) + datePatch.sourceModifiedAt = sourceModifiedAt; + if (ocrApplied != null) datePatch.ocrApplied = ocrApplied; + + if (Object.keys(datePatch).length > 0) { await ctx.runMutation( internal.documents.internal_mutations.updateDocumentDates, { documentId: args.documentId, - ...(sourceCreatedAt != null && { sourceCreatedAt }), - ...(sourceModifiedAt != null && { sourceModifiedAt }), + ...datePatch, }, ); } @@ -698,135 +700,3 @@ export const storeRawContent = internalAction({ }; }, }); - -const EXTRACT_DATES_SUPPORTED_EXTENSIONS = new Set(['pdf', 'docx', 'pptx']); -const EXTRACT_DATES_RETRY_DELAYS = [30_000, 60_000, 120_000]; - -export const extractDocumentDates = internalAction({ - args: { - documentId: v.id('documents'), - fileId: v.id('_storage'), - attempt: v.optional(v.number()), - }, - returns: v.null(), - handler: async (ctx, args): Promise => { - const attempt = args.attempt ?? 0; - - try { - const document = await ctx.runQuery( - internal.documents.internal_queries.getDocumentByIdRaw, - { documentId: args.documentId }, - ); - - if (!document) { - console.warn( - `[extractDocumentDates] Document ${args.documentId} not found, skipping`, - ); - return null; - } - - if (document.fileId !== args.fileId) { - console.warn( - `[extractDocumentDates] File changed for document ${args.documentId}, skipping stale extraction`, - ); - return null; - } - - const ext = document.extension ?? extractExtension(document.title); - if (!ext || !EXTRACT_DATES_SUPPORTED_EXTENSIONS.has(ext)) { - console.warn( - `[extractDocumentDates] Unsupported extension "${ext}" for document ${args.documentId}`, - ); - return null; - } - - const fileUrl = await ctx.storage.getUrl(args.fileId); - if (!fileUrl) { - console.warn( - `[extractDocumentDates] No URL for file ${args.fileId}, skipping`, - ); - return null; - } - - const fileResponse = await fetch(fileUrl, { - signal: AbortSignal.timeout(30_000), - }); - - if (!fileResponse.ok) { - throw new Error( - `Failed to download file: ${fileResponse.status} ${fileResponse.statusText}`, - ); - } - - const fileBlob = await fileResponse.blob(); - - const crawlerUrl = getCrawlerUrl(); - const endpoint = `${crawlerUrl}/api/v1/${ext}/extract-metadata`; - - const formData = new FormData(); - formData.append('file', fileBlob, document.title ?? `file.${ext}`); - - const metadataResponse = await fetch(endpoint, { - method: 'POST', - body: formData, - signal: AbortSignal.timeout(30_000), - }); - - if (!metadataResponse.ok) { - const errorText = await metadataResponse.text().catch(() => ''); - throw new Error( - `Crawler extract-metadata returned ${metadataResponse.status}: ${errorText}`, - ); - } - - let body: unknown; - try { - body = await metadataResponse.json(); - } catch { - throw new Error('Crawler returned non-JSON response'); - } - - if (!isRecord(body)) { - throw new Error('Invalid response shape from crawler extract-metadata'); - } - - const createdAt = getNumber(body, 'created_at'); - const modifiedAt = getNumber(body, 'modified_at'); - - if (createdAt != null || modifiedAt != null) { - await ctx.runMutation( - internal.documents.internal_mutations.updateDocumentDates, - { - documentId: args.documentId, - sourceCreatedAt: createdAt, - sourceModifiedAt: modifiedAt, - }, - ); - } - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - console.error( - `[extractDocumentDates] Error for document ${args.documentId} (attempt ${attempt}): ${message}`, - ); - - if (attempt < EXTRACT_DATES_RETRY_DELAYS.length) { - const retryDelay = EXTRACT_DATES_RETRY_DELAYS[attempt]; - await ctx.scheduler.runAfter( - retryDelay, - internal.documents.internal_actions.extractDocumentDates, - { - documentId: args.documentId, - fileId: args.fileId, - attempt: attempt + 1, - }, - ); - } else { - console.warn( - `[extractDocumentDates] All retries exhausted for document ${args.documentId}: ${message}`, - ); - } - } - - return null; - }, -}); diff --git a/services/platform/convex/documents/internal_mutations.ts b/services/platform/convex/documents/internal_mutations.ts index 1914b0cfb9..c4afed0e89 100644 --- a/services/platform/convex/documents/internal_mutations.ts +++ b/services/platform/convex/documents/internal_mutations.ts @@ -66,6 +66,8 @@ export const updateDocumentDates = internalMutation({ documentId: v.id('documents'), sourceCreatedAt: v.optional(v.number()), sourceModifiedAt: v.optional(v.number()), + scannedPagesDetected: v.optional(v.number()), + ocrApplied: v.optional(v.boolean()), }, returns: v.null(), handler: async (ctx, args) => { @@ -74,13 +76,19 @@ export const updateDocumentDates = internalMutation({ return null; } - const patch: Record = {}; + const patch: Record = {}; if (args.sourceCreatedAt != null) { patch.sourceCreatedAt = args.sourceCreatedAt; } if (args.sourceModifiedAt != null) { patch.sourceModifiedAt = args.sourceModifiedAt; } + if (args.scannedPagesDetected != null) { + patch.scannedPagesDetected = args.scannedPagesDetected; + } + if (args.ocrApplied != null) { + patch.ocrApplied = args.ocrApplied; + } if (Object.keys(patch).length > 0) { await ctx.db.patch(args.documentId, patch); diff --git a/services/platform/convex/documents/mutations.ts b/services/platform/convex/documents/mutations.ts index ea1ec9c2ef..72bc7d3025 100644 --- a/services/platform/convex/documents/mutations.ts +++ b/services/platform/convex/documents/mutations.ts @@ -165,14 +165,17 @@ export const createDocumentFromUpload = mutation({ let fileMetadataId; if (args.fileSize != null) { - fileMetadataId = await ctx.db.insert('fileMetadata', { - organizationId: args.organizationId, - storageId: args.fileId, - fileName: args.fileName, - contentType: args.contentType ?? 'application/octet-stream', - size: args.fileSize, - uploadedBy: userId, - }); + fileMetadataId = await ctx.runMutation( + internal.file_metadata.internal_mutations.saveFileMetadata, + { + organizationId: args.organizationId, + storageId: args.fileId, + fileName: args.fileName, + contentType: args.contentType ?? 'application/octet-stream', + size: args.fileSize, + uploadedBy: userId, + }, + ); } const result = await createDocument(ctx, { @@ -189,18 +192,11 @@ export const createDocumentFromUpload = mutation({ }); if (fileMetadataId) { - await ctx.db.patch(fileMetadataId, { - documentId: result.documentId, - }); - } - - if (ext && ['pdf', 'docx', 'pptx'].includes(ext)) { - await ctx.scheduler.runAfter( - 0, - internal.documents.internal_actions.extractDocumentDates, + await ctx.runMutation( + internal.file_metadata.internal_mutations.linkDocumentToFile, { + storageId: args.fileId, documentId: result.documentId, - fileId: args.fileId, }, ); } diff --git a/services/platform/convex/documents/schema.ts b/services/platform/convex/documents/schema.ts index 6af22018b7..dcb3826931 100644 --- a/services/platform/convex/documents/schema.ts +++ b/services/platform/convex/documents/schema.ts @@ -45,6 +45,8 @@ export const documentsTable = defineTable({ }), ), indexed: v.optional(v.boolean()), + scannedPagesDetected: v.optional(v.number()), + ocrApplied: v.optional(v.boolean()), sourceCreatedAt: v.optional(v.number()), sourceModifiedAt: v.optional(v.number()), createdBy: v.optional(v.string()), diff --git a/services/platform/convex/documents/transform_to_document_item.ts b/services/platform/convex/documents/transform_to_document_item.ts index 2e8c7746df..285302f4fa 100644 --- a/services/platform/convex/documents/transform_to_document_item.ts +++ b/services/platform/convex/documents/transform_to_document_item.ts @@ -107,6 +107,8 @@ export function transformToDocumentItem( ragStatus: document.ragInfo?.status, ragIndexedAt: document.ragInfo?.indexedAt, ragError: document.ragInfo?.error, + scannedPagesDetected: document.scannedPagesDetected, + ocrApplied: document.ocrApplied, teamId: document.teamId ?? null, teamIds: document.teamTags ?? [], // Creator tracking diff --git a/services/platform/convex/documents/validators.ts b/services/platform/convex/documents/validators.ts index 4e00158b41..61f61e334f 100644 --- a/services/platform/convex/documents/validators.ts +++ b/services/platform/convex/documents/validators.ts @@ -69,6 +69,8 @@ export const documentItemValidator = v.object({ ragStatus: v.optional(ragStatusValidator), ragIndexedAt: v.optional(v.number()), ragError: v.optional(v.string()), + scannedPagesDetected: v.optional(v.number()), + ocrApplied: v.optional(v.boolean()), teamId: v.optional(v.union(v.string(), v.null())), teamIds: v.optional(v.array(v.string())), createdBy: v.optional(v.string()), diff --git a/services/platform/convex/file_metadata/actions.ts b/services/platform/convex/file_metadata/actions.ts index a12c17f662..a39327c7a2 100644 --- a/services/platform/convex/file_metadata/actions.ts +++ b/services/platform/convex/file_metadata/actions.ts @@ -2,7 +2,7 @@ import { v } from 'convex/values'; -import { isRecord, getString } from '../../lib/utils/type-guards'; +import { isRecord, getBoolean, getString } from '../../lib/utils/type-guards'; import { internal } from '../_generated/api'; import { action } from '../_generated/server'; import { getRagConfig } from '../lib/helpers/rag_config'; @@ -68,9 +68,14 @@ export const checkFileRagStatuses = action({ : progressPhase || undefined; if (status === 'completed') { + const ocrApplied = getBoolean(docStatus, 'ocr_applied'); await ctx.runMutation( internal.file_metadata.internal_mutations.updateFileRagStatus, - { storageId, ragStatus: 'completed' }, + { + storageId, + ragStatus: 'completed', + ...(ocrApplied != null && { ocrApplied }), + }, ); } else if (status === 'failed') { await ctx.runMutation( diff --git a/services/platform/convex/file_metadata/internal_actions.ts b/services/platform/convex/file_metadata/internal_actions.ts index 38366ef723..9e0ea91d62 100644 --- a/services/platform/convex/file_metadata/internal_actions.ts +++ b/services/platform/convex/file_metadata/internal_actions.ts @@ -2,8 +2,11 @@ import { v } from 'convex/values'; +import { extractExtension } from '../../lib/shared/file-types'; +import { isRecord, getNumber } from '../../lib/utils/type-guards'; import { internal } from '../_generated/api'; import { internalAction } from '../_generated/server'; +import { getCrawlerUrl } from '../documents/generate_document_helpers'; import { getRagConfig } from '../lib/helpers/rag_config'; import { ragAction } from '../workflow_engine/action_defs/rag/rag_action'; @@ -54,3 +57,172 @@ export const uploadFileToRag = internalAction({ return null; }, }); + +const EXTRACT_METADATA_EXTENSIONS = new Set(['pdf', 'docx', 'pptx']); +const IMAGE_CONTENT_TYPES = new Set([ + 'image/jpeg', + 'image/png', + 'image/gif', + 'image/webp', +]); +const EXTRACT_METADATA_RETRY_DELAYS = [30_000, 60_000, 120_000]; + +/** + * Extract vision/OCR metadata and document dates for an uploaded file. + * + * Triggered by saveFileMetadata on new inserts. For PDF/DOCX/PPTX, calls + * the crawler extract-metadata endpoint. For images, sets defaults directly. + * For other file types (CSV, TXT, XLSX), sets visionRequired=false. + */ +export const extractFileMetadata = internalAction({ + args: { + storageId: v.id('_storage'), + fileName: v.string(), + contentType: v.string(), + attempt: v.optional(v.number()), + }, + returns: v.null(), + handler: async (ctx, args): Promise => { + const attempt = args.attempt ?? 0; + const ext = extractExtension(args.fileName); + + // Images: always need vision, no crawler call needed + if (IMAGE_CONTENT_TYPES.has(args.contentType)) { + await ctx.runMutation( + internal.file_metadata.internal_mutations.updateFileVisionMetadata, + { + storageId: args.storageId, + pageCount: 1, + scannedPagesDetected: 0, + visionRequired: true, + }, + ); + return null; + } + + // PDF/DOCX/PPTX: call crawler extract-metadata + if (ext && EXTRACT_METADATA_EXTENSIONS.has(ext)) { + try { + const fileUrl = await ctx.storage.getUrl(args.storageId); + if (!fileUrl) { + console.warn( + `[extractFileMetadata] No URL for file ${args.storageId}, skipping`, + ); + return null; + } + + const fileResponse = await fetch(fileUrl, { + signal: AbortSignal.timeout(30_000), + }); + if (!fileResponse.ok) { + throw new Error( + `Failed to download file: ${fileResponse.status} ${fileResponse.statusText}`, + ); + } + + const fileBlob = await fileResponse.blob(); + const crawlerUrl = getCrawlerUrl(); + const endpoint = `${crawlerUrl}/api/v1/${ext}/extract-metadata`; + + const formData = new FormData(); + formData.append('file', fileBlob, args.fileName); + + const metadataResponse = await fetch(endpoint, { + method: 'POST', + body: formData, + signal: AbortSignal.timeout(30_000), + }); + + if (!metadataResponse.ok) { + const errorText = await metadataResponse.text().catch(() => ''); + throw new Error( + `Crawler extract-metadata returned ${metadataResponse.status}: ${errorText}`, + ); + } + + let body: unknown; + try { + body = await metadataResponse.json(); + } catch { + throw new Error('Crawler returned non-JSON response'); + } + + if (!isRecord(body)) { + throw new Error( + 'Invalid response shape from crawler extract-metadata', + ); + } + + const pageCount = getNumber(body, 'page_count'); + const scannedPagesDetected = getNumber(body, 'scanned_pages_detected'); + const createdAt = getNumber(body, 'created_at'); + const modifiedAt = getNumber(body, 'modified_at'); + + // Write vision metadata to fileMetadata + await ctx.runMutation( + internal.file_metadata.internal_mutations.updateFileVisionMetadata, + { + storageId: args.storageId, + pageCount: pageCount ?? undefined, + scannedPagesDetected: scannedPagesDetected ?? undefined, + visionRequired: + scannedPagesDetected != null ? scannedPagesDetected > 0 : false, + }, + ); + + // Write dates and scanned page info to linked document (if any) + const fileMetadata = await ctx.runQuery( + internal.file_metadata.internal_queries.getByStorageId, + { storageId: args.storageId }, + ); + + if (fileMetadata?.documentId) { + await ctx.runMutation( + internal.documents.internal_mutations.updateDocumentDates, + { + documentId: fileMetadata.documentId, + sourceCreatedAt: createdAt, + sourceModifiedAt: modifiedAt, + scannedPagesDetected: scannedPagesDetected ?? undefined, + }, + ); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error( + `[extractFileMetadata] Error for file ${args.storageId} (attempt ${attempt}): ${message}`, + ); + + if (attempt < EXTRACT_METADATA_RETRY_DELAYS.length) { + const retryDelay = EXTRACT_METADATA_RETRY_DELAYS[attempt]; + await ctx.scheduler.runAfter( + retryDelay, + internal.file_metadata.internal_actions.extractFileMetadata, + { + storageId: args.storageId, + fileName: args.fileName, + contentType: args.contentType, + attempt: attempt + 1, + }, + ); + } else { + console.warn( + `[extractFileMetadata] All retries exhausted for file ${args.storageId}: ${message}`, + ); + } + } + return null; + } + + // All other file types: no vision needed + await ctx.runMutation( + internal.file_metadata.internal_mutations.updateFileVisionMetadata, + { + storageId: args.storageId, + scannedPagesDetected: 0, + visionRequired: false, + }, + ); + return null; + }, +}); diff --git a/services/platform/convex/file_metadata/internal_mutations.ts b/services/platform/convex/file_metadata/internal_mutations.ts index 60cad084d3..f3bd06c2e6 100644 --- a/services/platform/convex/file_metadata/internal_mutations.ts +++ b/services/platform/convex/file_metadata/internal_mutations.ts @@ -65,6 +65,16 @@ export const saveFileMetadata = internalMutation({ }, ); + await ctx.scheduler.runAfter( + 0, + internal.file_metadata.internal_actions.extractFileMetadata, + { + storageId: args.storageId, + fileName: args.fileName, + contentType: args.contentType, + }, + ); + try { await checkOrganizationRateLimit( ctx, @@ -97,6 +107,7 @@ export const updateFileRagStatus = internalMutation({ ), ragError: v.optional(v.string()), ragProgress: v.optional(v.string()), + ocrApplied: v.optional(v.boolean()), }, async handler(ctx, args) { const metadata = await ctx.db @@ -112,7 +123,44 @@ export const updateFileRagStatus = internalMutation({ args.ragStatus === 'completed' || args.ragStatus === 'failed' ? undefined : args.ragProgress, + ...(args.ocrApplied != null && { ocrApplied: args.ocrApplied }), }); + + // Sync ocrApplied to linked document so the list view can show it + if (args.ocrApplied != null && metadata.documentId) { + const doc = await ctx.db.get(metadata.documentId); + if (doc) { + await ctx.db.patch(metadata.documentId, { + ocrApplied: args.ocrApplied, + }); + } + } + }, +}); + +export const updateFileVisionMetadata = internalMutation({ + args: { + storageId: v.id('_storage'), + pageCount: v.optional(v.number()), + scannedPagesDetected: v.optional(v.number()), + visionRequired: v.optional(v.boolean()), + }, + async handler(ctx, args) { + const metadata = await ctx.db + .query('fileMetadata') + .withIndex('by_storageId', (q) => q.eq('storageId', args.storageId)) + .first(); + if (!metadata) return; + + const patch: Record = {}; + if (args.pageCount != null) patch.pageCount = args.pageCount; + if (args.scannedPagesDetected != null) + patch.scannedPagesDetected = args.scannedPagesDetected; + if (args.visionRequired != null) patch.visionRequired = args.visionRequired; + + if (Object.keys(patch).length > 0) { + await ctx.db.patch(metadata._id, patch); + } }, }); diff --git a/services/platform/convex/file_metadata/mutations.ts b/services/platform/convex/file_metadata/mutations.ts index 8698e5afdc..4f25748586 100644 --- a/services/platform/convex/file_metadata/mutations.ts +++ b/services/platform/convex/file_metadata/mutations.ts @@ -84,6 +84,16 @@ export const saveFileMetadata = mutation({ }, ); + await ctx.scheduler.runAfter( + 0, + internal.file_metadata.internal_actions.extractFileMetadata, + { + storageId: args.storageId, + fileName: args.fileName, + contentType: args.contentType, + }, + ); + try { await checkOrganizationRateLimit( ctx, diff --git a/services/platform/convex/file_metadata/queries.ts b/services/platform/convex/file_metadata/queries.ts index 3033b2d502..db0a927c2c 100644 --- a/services/platform/convex/file_metadata/queries.ts +++ b/services/platform/convex/file_metadata/queries.ts @@ -27,6 +27,43 @@ export const getUserStorageUsage = query({ }, }); +export const getByDocumentId = query({ + args: { + organizationId: v.string(), + documentId: v.id('documents'), + }, + returns: v.union( + v.object({ + pageCount: v.optional(v.number()), + scannedPagesDetected: v.optional(v.number()), + visionRequired: v.optional(v.boolean()), + ocrApplied: v.optional(v.boolean()), + }), + v.null(), + ), + handler: async (ctx, args) => { + const authUser = await getAuthUserIdentity(ctx); + if (!authUser) return null; + + const meta = await ctx.db + .query('fileMetadata') + .withIndex('by_organizationId_and_documentId', (q) => + q + .eq('organizationId', args.organizationId) + .eq('documentId', args.documentId), + ) + .first(); + if (!meta) return null; + + return { + pageCount: meta.pageCount, + scannedPagesDetected: meta.scannedPagesDetected, + visionRequired: meta.visionRequired, + ocrApplied: meta.ocrApplied, + }; + }, +}); + export const getByStorageIds = query({ args: { storageIds: v.array(v.id('_storage')), @@ -48,6 +85,9 @@ export const getByStorageIds = query({ ), ragError: v.optional(v.string()), ragProgress: v.optional(v.string()), + pageCount: v.optional(v.number()), + scannedPagesDetected: v.optional(v.number()), + visionRequired: v.optional(v.boolean()), }), ), handler: async (ctx, args) => { @@ -70,6 +110,9 @@ export const getByStorageIds = query({ ragStatus: meta.ragStatus, ragError: meta.ragError, ragProgress: meta.ragProgress, + pageCount: meta.pageCount, + scannedPagesDetected: meta.scannedPagesDetected, + visionRequired: meta.visionRequired, }; }), ); diff --git a/services/platform/convex/file_metadata/schema.ts b/services/platform/convex/file_metadata/schema.ts index b87dfbdb77..e72d244b8d 100644 --- a/services/platform/convex/file_metadata/schema.ts +++ b/services/platform/convex/file_metadata/schema.ts @@ -9,6 +9,10 @@ export const fileMetadataTable = defineTable({ fileName: v.string(), contentType: v.string(), size: v.number(), + pageCount: v.optional(v.number()), + scannedPagesDetected: v.optional(v.number()), + visionRequired: v.optional(v.boolean()), + ocrApplied: v.optional(v.boolean()), ragStatus: v.optional( v.union( v.literal('queued'), diff --git a/services/platform/messages/de.json b/services/platform/messages/de.json index 08c26f5a50..f9fc4d6bcf 100644 --- a/services/platform/messages/de.json +++ b/services/platform/messages/de.json @@ -2972,7 +2972,8 @@ "modified": "Geändert", "sourceUpload": "Upload", "sourceOnedrive": "OneDrive", - "sourceSharepoint": "SharePoint" + "sourceSharepoint": "SharePoint", + "imagePages": "Bildseiten" } }, "upload": { @@ -3013,6 +3014,12 @@ "extensionNotAllowed": "{name}: .{ext} ist nicht erlaubt. Erlaubte Typen: {allowed}.", "uploadDocuments": "Hochladen" }, + "ocr": { + "available": "OCR verfügbar", + "unavailable": "OCR nicht verfügbar", + "processingWithOcr": "Dokument mit OCR verarbeitet", + "noVisionModelWarning": "Kein Vision-Modell konfiguriert — gescannte Dokumente und Bilder werden nicht verarbeitet" + }, "import": { "noResponseBody": "Kein Antwortinhalt erhalten" }, diff --git a/services/platform/messages/en.json b/services/platform/messages/en.json index f970e9d525..8fb5055825 100644 --- a/services/platform/messages/en.json +++ b/services/platform/messages/en.json @@ -2997,7 +2997,8 @@ "modified": "Modified", "sourceUpload": "Upload", "sourceOnedrive": "OneDrive", - "sourceSharepoint": "SharePoint" + "sourceSharepoint": "SharePoint", + "imagePages": "Image pages" } }, "upload": { @@ -3038,6 +3039,12 @@ "extensionNotAllowed": "{name}: .{ext} is not allowed. Allowed types: {allowed}.", "uploadDocuments": "Upload" }, + "ocr": { + "available": "OCR available", + "unavailable": "OCR unavailable", + "processingWithOcr": "Document processed with OCR", + "noVisionModelWarning": "No vision model configured — scanned documents and images will not be processed" + }, "import": { "noResponseBody": "No response body received" }, diff --git a/services/platform/types/documents.ts b/services/platform/types/documents.ts index cfd3485a0b..348b2cfbfa 100644 --- a/services/platform/types/documents.ts +++ b/services/platform/types/documents.ts @@ -26,6 +26,10 @@ export interface DocumentItem { ragIndexedAt?: number; /** Error message (for failed status) */ ragError?: string; + /** Number of scanned pages detected in the document */ + scannedPagesDetected?: number; + /** Whether OCR was applied during RAG indexing */ + ocrApplied?: boolean; teamId?: string | null; teamIds?: string[]; /** User ID who created/uploaded this document */ diff --git a/services/rag/app/models.py b/services/rag/app/models.py index 8f504e09b8..a49e6c414c 100644 --- a/services/rag/app/models.py +++ b/services/rag/app/models.py @@ -144,6 +144,7 @@ class DocumentStatusInfo(BaseModel): source_modified_at: dt.datetime | None = Field( default=None, description="Original file modification date (from file metadata)" ) + ocr_applied: bool | None = Field(default=None, description="Whether OCR was applied during text extraction") class DocumentStatusRequest(BaseModel): diff --git a/services/rag/app/routers/documents.py b/services/rag/app/routers/documents.py index 0c40d54eb5..30fb5dea77 100644 --- a/services/rag/app/routers/documents.py +++ b/services/rag/app/routers/documents.py @@ -559,6 +559,7 @@ async def get_document_statuses(request: DocumentStatusRequest): progress_detail=info.get("progress_detail"), source_created_at=info.get("source_created_at"), source_modified_at=info.get("source_modified_at"), + ocr_applied=info.get("ocr_applied"), ) if info else None diff --git a/services/rag/app/services/indexing_service.py b/services/rag/app/services/indexing_service.py index 2c8d9d063b..cf8376033c 100644 --- a/services/rag/app/services/indexing_service.py +++ b/services/rag/app/services/indexing_service.py @@ -429,8 +429,8 @@ async def _do_store( f""" INSERT INTO {SCHEMA}.documents (file_id, filename, content_hash, status, chunks_count, - source_created_at, source_modified_at) - VALUES ($1, $2, $3, 'completed', $4, $5, $6) + source_created_at, source_modified_at, ocr_applied) + VALUES ($1, $2, $3, 'completed', $4, $5, $6, $7) RETURNING id """, file_id, @@ -439,6 +439,7 @@ async def _do_store( len(prepared.chunks), prepared.source_created_at, prepared.source_modified_at, + prepared.vision_used, ) doc_uuid = doc_row["id"] @@ -543,6 +544,43 @@ async def index_document( content, clone its chunks instead of re-extracting/embedding. """ content_hash = compute_content_hash(content_bytes) + + # Fast path: same file_id with unchanged content AND chunks already stored — + # skip immediately instead of re-extracting/embedding. + async with acquire_with_retry(pool) as conn: + own_row = await conn.fetchrow( + f"""SELECT d.content_hash, + (SELECT COUNT(*) + FROM {SCHEMA}.chunks c + WHERE c.document_id = d.id) AS chunk_count + FROM {SCHEMA}.documents d + WHERE d.file_id = $1""", + file_id, + ) + if own_row and own_row["content_hash"] == content_hash and own_row["chunk_count"] > 0: + logger.info( + "Document {} content unchanged with {} chunks, skipping (early dedup)", + file_id, + own_row["chunk_count"], + ) + async with acquire_with_retry(pool) as conn: + await conn.execute( + f"""UPDATE {SCHEMA}.documents + SET status = 'completed', error = NULL, + progress_phase = NULL, + progress_detail = NULL, + updated_at = NOW() + WHERE file_id = $1""", + file_id, + ) + return { + "success": True, + "file_id": file_id, + "chunks_created": 0, + "skipped": True, + "skip_reason": "content_unchanged", + } + source_id = await find_existing_by_hash(pool, content_hash) if source_id is not None: diff --git a/services/rag/app/services/rag_service.py b/services/rag/app/services/rag_service.py index 4101d224bd..4f6cb260cf 100644 --- a/services/rag/app/services/rag_service.py +++ b/services/rag/app/services/rag_service.py @@ -526,7 +526,7 @@ async def get_document_statuses( f""" SELECT DISTINCT ON (file_id) file_id, status, error, progress_phase, progress_detail, - source_created_at, source_modified_at + source_created_at, source_modified_at, ocr_applied FROM {SCHEMA}.documents WHERE file_id = ANY($1) ORDER BY file_id, @@ -549,6 +549,7 @@ async def get_document_statuses( "progress_detail": row["progress_detail"], "source_created_at": row["source_created_at"], "source_modified_at": row["source_modified_at"], + "ocr_applied": row["ocr_applied"], } for row in rows } diff --git a/services/rag/tests/test_background_ingest.py b/services/rag/tests/test_background_ingest.py index b4f00f8102..819fb81d7c 100644 --- a/services/rag/tests/test_background_ingest.py +++ b/services/rag/tests/test_background_ingest.py @@ -74,6 +74,7 @@ async def test_returns_status_for_found_documents(self): "progress_detail": None, "source_created_at": None, "source_modified_at": None, + "ocr_applied": False, }, { "file_id": "doc-2", @@ -83,6 +84,7 @@ async def test_returns_status_for_found_documents(self): "progress_detail": None, "source_created_at": None, "source_modified_at": None, + "ocr_applied": False, }, ] ) @@ -111,6 +113,7 @@ async def test_returns_error_field_for_failed_documents(self): "progress_detail": None, "source_created_at": None, "source_modified_at": None, + "ocr_applied": False, }, ] ) diff --git a/services/rag/tests/test_file_dates.py b/services/rag/tests/test_file_dates.py index 01d3e4beea..da0a52cfb8 100644 --- a/services/rag/tests/test_file_dates.py +++ b/services/rag/tests/test_file_dates.py @@ -276,7 +276,7 @@ async def test_caller_dates_override_file_dates(self): caller_created = dt.datetime(2022, 6, 1, tzinfo=dt.UTC) mock_conn = AsyncMock() - mock_conn.fetchrow = AsyncMock(side_effect=[None, {"id": "uuid-1"}]) + mock_conn.fetchrow = AsyncMock(side_effect=[None, None, {"id": "uuid-1"}]) mock_conn.executemany = AsyncMock() mock_tx = AsyncMock() mock_tx.__aenter__ = AsyncMock(return_value=mock_tx) @@ -317,10 +317,11 @@ async def test_caller_dates_override_file_dates(self): assert result["success"] is True - # The INSERT query has 6 positional args: + # The INSERT query has 7 positional args: # $1=file_id, $2=filename, $3=content_hash, $4=chunks_count, - # $5=source_created_at, $6=source_modified_at - insert_call = mock_conn.fetchrow.call_args_list[1] + # $5=source_created_at, $6=source_modified_at, $7=ocr_applied + # call_args_list[0] = early-dedup check, [1] = cross-scope dedup, [2] = INSERT + insert_call = mock_conn.fetchrow.call_args_list[2] args = insert_call[0] # args[0] is the SQL string, positional params start at args[1] source_created_arg = args[5] # $5 diff --git a/services/rag/tests/test_indexing_service.py b/services/rag/tests/test_indexing_service.py index 08fec12ca8..a395415f8b 100644 --- a/services/rag/tests/test_indexing_service.py +++ b/services/rag/tests/test_indexing_service.py @@ -52,6 +52,7 @@ def _mock_pool( mock_conn = AsyncMock() mock_conn.fetchrow = AsyncMock( side_effect=[ + None, # early-dedup check (no existing row with same content) existing_row, {"id": inserted_doc_id}, ] @@ -248,20 +249,17 @@ class TestContentHashDedup: async def test_skips_unchanged_content(self): from app.services.indexing_service import index_document - existing = {"id": "existing-uuid", "content_hash": SAMPLE_HASH} - pool, mock_conn = _mock_pool(existing_row=existing) + pool, mock_conn = _mock_pool() + # Early-dedup fetchrow returns a row with matching hash and existing chunks + mock_conn.fetchrow = AsyncMock( + return_value={"content_hash": SAMPLE_HASH, "chunk_count": 5}, + ) mock_embed = AsyncMock() mock_embed.embed_texts = AsyncMock(return_value=SAMPLE_EMBEDDINGS) with ( _patch_acquire(mock_conn), patch("app.services.indexing_service.compute_content_hash", return_value=SAMPLE_HASH), - patch( - "app.services.indexing_service.extract_text", - new_callable=AsyncMock, - return_value=("Some text", False), - ), - patch("app.services.indexing_service.chunk_content", return_value=SAMPLE_CHUNKS), ): result = await index_document( pool, @@ -284,12 +282,12 @@ async def test_reindexes_when_content_changed(self): mock_embed.embed_texts = AsyncMock(return_value=SAMPLE_EMBEDDINGS) # The connection is used multiple times: - # 1. fetchrow for dedup check -> returns existing row - # 2. execute for DELETE old doc + # 1. fetchrow for early-dedup check -> returns row with different hash + # 2. fetchrow for cross-scope dedup -> returns existing row # 3. fetchrow for INSERT new doc RETURNING id - # 4. execute for each chunk INSERT mock_conn.fetchrow = AsyncMock( side_effect=[ + {"content_hash": DIFFERENT_HASH, "chunk_count": 3}, existing, {"id": "new-uuid"}, ] @@ -334,6 +332,7 @@ async def test_replacement_deletes_chunks_before_document(self): mock_conn.fetchrow = AsyncMock( side_effect=[ + {"content_hash": DIFFERENT_HASH, "chunk_count": 3}, existing, {"id": "new-uuid"}, ] @@ -381,6 +380,7 @@ async def test_replacement_uses_transaction(self): mock_conn.fetchrow = AsyncMock( side_effect=[ + {"content_hash": DIFFERENT_HASH, "chunk_count": 3}, existing, {"id": "new-uuid"}, ] @@ -643,6 +643,8 @@ async def test_non_hnsw_internal_error_not_retried(self): from app.services.indexing_service import store_prepared_document, PreparedDocument pool, mock_conn = _mock_pool(existing_row=None) + # store_prepared_document calls _do_store directly (no early-dedup) + mock_conn.fetchrow = AsyncMock(side_effect=[None, {"id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"}]) prepared = PreparedDocument( content_hash=SAMPLE_HASH,