Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""add columns to collection job and documents table

Revision ID: 050
Revises: 049
Create Date: 2026-03-25 10:09:47.318575

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "050"
down_revision = "049"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"collection_jobs",
sa.Column(
"docs_num",
sa.Integer(),
nullable=True,
comment="Total number of documents to be processed in this job",
),
)
op.add_column(
"collection_jobs",
sa.Column(
"total_size_mb",
sa.Float(),
nullable=True,
comment="Total size of documents being uploaded to collection",
),
)
op.add_column(
"document",
sa.Column(
"file_size_kb",
sa.Float(),
nullable=True,
comment="Size of the document in bytes",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be Size of the document in kilobytes (KB)

),
)
op.add_column(
"collection_jobs",
sa.Column(
"documents",
sa.JSON(),
nullable=True,
comment="JSON array of document UUIDs included in this job",
),
)


def downgrade():
op.drop_column("document", "file_size_kb")
op.drop_column("collection_jobs", "total_size_mb")
op.drop_column("collection_jobs", "docs_num")
op.drop_column("collection_jobs", "documents")
33 changes: 33 additions & 0 deletions backend/app/alembic/versions/051_add_openai_file_id_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""add openai file id column

Revision ID: 051
Revises: 050
Create Date: 2026-03-31 14:27:26.391631

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel.sql.sqltypes
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "051"
down_revision = "050"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"document",
sa.Column(
"openai_file_id",
sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
comment="file id when uploaded to openai",
),
)


def downgrade():
op.drop_column("document", "openai_file_id")
7 changes: 4 additions & 3 deletions backend/app/api/docs/collections/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ pipeline:

* Create a vector store from the document IDs you received after uploading your
documents through the Documents module.
* The `batch_size` parameter controls how many documents are sent to OpenAI in a
single transaction when creating the vector store. This helps optimize the upload
process for large document sets. If not specified, the default value is **10**.
* Documents are automatically batched when creating the vector store to optimize
the upload process for large document sets. A new batch is created when either
the cumulative size reaches 30 MB of documents given to upload to a vector store
or the document count reaches 200 files in a batch, whichever limit is hit first.
* [Deprecated] Attach the Vector Store to an OpenAI
[Assistant](https://platform.openai.com/docs/api-reference/assistants). Use
parameters in the request body relevant to an Assistant to flesh out
Expand Down
3 changes: 3 additions & 0 deletions backend/app/api/routes/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CollectionCrud,
CollectionJobCrud,
DocumentCollectionCrud,
DocumentCrud,
)
from app.core.cloud import get_cloud_storage
from app.models import (
Expand Down Expand Up @@ -101,6 +102,8 @@ def create_collection(
action_type=CollectionActionType.CREATE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(request.documents),
documents=[str(doc_id) for doc_id in request.documents],
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backend/app/api/routes/collections.py:106 passes documents=[str(doc_id) for doc_id in request.documents] to CollectionJobCreate, but CollectionJobCreate (collection_job.py:119-124) has no documents field. Pydantic silently drops extra fields. The migration (050) adds the JSON column to the DB, but the SQLModel never declares it — so data is never persisted.

)
)

Expand Down
33 changes: 32 additions & 1 deletion backend/app/api/routes/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
UploadFile,
)
from fastapi import Path as FastPath
from fastapi import HTTPException

from app.api.deps import AuthContextDep, SessionDep
from app.api.permissions import Permission, require_permission
Expand All @@ -27,9 +28,11 @@
DocTransformationJobPublic,
)
from app.core.cloud import get_cloud_storage
from app.services.collections.helpers import pick_service_for_documennt
from app.services.collections.helpers import pick_service_for_documennt, MAX_DOC_SIZE_MB
from app.services.documents.helpers import (
calculate_file_size,
schedule_transformation,
schedule_provider_upload,
pre_transform_validation,
build_document_schema,
build_document_schemas,
Expand Down Expand Up @@ -119,6 +122,11 @@ async def upload_doc(
),
callback_url: str
| None = Form(None, description="URL to call to report doc transformation status"),
upload_to_providers: list[str]
| None = Form(
None,
description="List of provider names to upload the document to (e.g., ['openai'])",
),
):
if callback_url:
validate_callback_url(callback_url)
Expand All @@ -129,6 +137,20 @@ async def upload_doc(
transformer=transformer,
)

file_size_kb = await calculate_file_size(src)
file_size_mb = file_size_kb / 1024

if file_size_mb > MAX_DOC_SIZE_MB:
logger.warning(
f"[upload_doc] Document size exceeds limit | "
f"{{'filename': '{src.filename}', 'size_mb': {round(file_size_mb, 2)}, 'max_size_mb': {MAX_DOC_SIZE_MB}}}"
)
raise HTTPException(
status_code=413,
detail=f"Document size ({round(file_size_mb, 2)} MB) exceeds the maximum allowed size of {MAX_DOC_SIZE_MB} MB. "
f"Please upload a smaller file.",
)

storage = get_cloud_storage(session=session, project_id=current_user.project_.id)
document_id = uuid4()
object_store_url = storage.put(src, Path(str(document_id)))
Expand All @@ -137,6 +159,7 @@ async def upload_doc(
document = Document(
id=document_id,
fname=src.filename,
file_size_kb=file_size_kb,
object_store_url=str(object_store_url),
)
source_document = crud.update(document)
Expand All @@ -151,6 +174,13 @@ async def upload_doc(
callback_url=callback_url,
)

provider_upload_info: dict | None = schedule_provider_upload(
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
document_id=source_document.id,
providers=upload_to_providers or [],
)

document_schema = DocumentPublic.model_validate(
source_document, from_attributes=True
)
Expand All @@ -161,6 +191,7 @@ async def upload_doc(
response = DocumentUploadResponse(
**document_schema.model_dump(),
transformation_job=job_info,
provider_upload_job=provider_upload_info,
)
return APIResponse.success_response(response)

Expand Down
21 changes: 21 additions & 0 deletions backend/app/celery/tasks/job_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,24 @@ def run_tts_result_processing(
task_instance=self,
**kwargs,
)


@celery_app.task(
bind=True,
queue="low_priority",
priority=1,
rate_limit="10/m", # Limit to 10 provider uploads per minute to prevent OOM
)
def run_provider_upload_job(
self, project_id: int, job_id: str, trace_id: str, **kwargs
):
from app.services.documents.provider_upload import execute_job

_set_trace(trace_id)
return execute_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
)
14 changes: 14 additions & 0 deletions backend/app/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,20 @@ def start_tts_result_processing(
return task.id


def start_provider_upload_job(
project_id: int, job_id: str, trace_id: str = "N/A", **kwargs
) -> str:
from app.celery.tasks.job_execution import run_provider_upload_job

task = run_provider_upload_job.delay(
project_id=project_id, job_id=job_id, trace_id=trace_id, **kwargs
)
logger.info(
f"[start_provider_upload_job] Started job {job_id} with Celery task {task.id}"
)
return task.id


def get_task_status(task_id: str) -> Dict[str, Any]:
result = AsyncResult(task_id)
return {
Expand Down
24 changes: 24 additions & 0 deletions backend/app/core/cloud/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ def stream(self, url: str) -> StreamingBody:
"""Stream a file from storage"""
pass

@abstractmethod
def get(self, url: str) -> bytes:
"""Get file contents as bytes (for files that fit in memory)"""
pass

@abstractmethod
def get_file_size_kb(self, url: str) -> float:
"""Return the file size in KB"""
Expand Down Expand Up @@ -193,6 +198,25 @@ def stream(self, url: str) -> StreamingBody:
)
raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err

def get(self, url: str) -> bytes:
name = SimpleStorageName.from_url(url)
kwargs = asdict(name)
try:
body = self.aws.client.get_object(**kwargs).get("Body")
content = body.read()
logger.info(
f"[AmazonCloudStorage.get] File retrieved successfully | "
f"{{'project_id': '{self.project_id}', 'bucket': '{mask_string(name.Bucket)}', 'key': '{mask_string(name.Key)}', 'size_bytes': {len(content)}}}"
)
return content
except ClientError as err:
logger.error(
f"[AmazonCloudStorage.get] AWS get error | "
f"{{'project_id': '{self.project_id}', 'bucket': '{mask_string(name.Bucket)}', 'key': '{mask_string(name.Key)}', 'error': '{str(err)}'}}",
exc_info=True,
)
raise CloudStorageError(f'AWS Error: "{err}" ({url})') from err

def get_file_size_kb(self, url: str) -> float:
name = SimpleStorageName.from_url(url)
kwargs = asdict(name)
Expand Down
55 changes: 19 additions & 36 deletions backend/app/crud/rag/open_ai.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import functools as ft
from io import BytesIO
from typing import Iterable

from openai import OpenAI, OpenAIError
Expand Down Expand Up @@ -121,53 +122,35 @@ def update(
storage: CloudStorage,
documents: Iterable[Document],
):
files = []
for docs in documents:
file_ids = []
files = []
for d in docs:
f_obj = storage.stream(d.object_store_url)

# monkey patch botocore.response.StreamingBody to make
# OpenAI happy
f_obj.name = d.fname

files.append(f_obj)

if d.openai_file_id:
# Document already uploaded to OpenAI, use existing file ID
file_id = d.openai_file_id
file_ids.append(file_id)
else:
# No OpenAI file ID, fetch from S3 and add to upload list
content = storage.get(d.object_store_url)
f_obj = BytesIO(content)
f_obj.name = d.fname
files.append(f_obj)
logger.info(
f"[OpenAIVectorStoreCrud.update] Uploading files to vector store | {{'vector_store_id': '{vector_store_id}', 'file_count': {len(files)}}}"
f"[OpenAIVectorStoreCrud.update] Uploading files to vector store | {{'vector_store_id': '{vector_store_id}', 'new_files': {len(files)}, 'existing_file_ids': {len(file_ids)}}}"
)
req = self.client.vector_stores.file_batches.upload_and_poll(
vector_store_id=vector_store_id,
files=files,
vector_store_id=vector_store_id, files=files, file_ids=file_ids
)
logger.info(
f"[OpenAIVectorStoreCrud.update] File upload completed | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}"
)
if req.file_counts.completed != req.file_counts.total:
view = {x.fname: x for x in docs}
for i in self.read(vector_store_id):
if i.last_error is None:
fname = self.client.files.retrieve(i.id)
view.pop(fname)

error = {
"error": "OpenAI document processing error",
"documents": list(view.values()),
}
try:
raise InterruptedError(json.dumps(error, cls=BaseModelEncoder))
except InterruptedError as err:
logger.error(
f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'error': '{error['error']}', 'failed_documents': {len(error['documents'])}}}",
exc_info=True,
)
raise

while files:
f_obj = files.pop()
f_obj.close()
logger.info(
f"[OpenAIVectorStoreCrud.update] Closed file stream | {{'vector_store_id': '{vector_store_id}', 'filename': '{f_obj.name}'}}"
error_msg = f"OpenAI document processing error: {req.file_counts.completed}/{req.file_counts.total} files completed"
logger.error(
f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}"
)
raise InterruptedError(error_msg)

yield from docs

Expand Down
Loading
Loading