Move response API to Celery-backed job processing#381
Conversation
WalkthroughAdds a job queue and orchestration for responses: DB migration and Job model/CRUD, route refactor to start jobs, Celery scheduling/execution, response generation/persistence/callback delivery, utilities for OpenAI errors/callbacks, config/env updates, and tests. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant API as /api/v1/responses
participant S as start_job
participant DB as JobCrud/DB
participant Q as Celery Queue
participant W as Worker.execute_job
participant SRV as process_response
participant OA as OpenAI
participant CB as Callback URL
C->>API: POST ResponsesAPIRequest
API->>S: start_job(db, request, project_id, org_id)
S->>DB: create Job (type=RESPONSE, trace_id)
S->>Q: enqueue execute_job(job_id, task_id, request_data)
API-->>C: APIResponse { data: { status: "processing", message: ... } }
Note over Q,W: asynchronous worker execution
Q->>W: execute_job(...)
W->>SRV: process_response(request, project_id, org_id, job_id, task_id)
SRV->>OA: generate_response(...) (OpenAI calls)
alt success
SRV->>DB: update Job status=SUCCESS
alt callback_url present
W->>CB: send_response_callback(payload)
CB-->>W: 200 OK
end
else failure
SRV->>DB: update Job status=FAILED, error_message
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
🧰 Additional context used🧬 Code graph analysis (2)backend/app/services/response/response.py (5)
backend/app/services/response/jobs.py (6)
🪛 Ruff (0.13.1)backend/app/services/response/response.py208-208: Unused function argument: (ARG001) ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
🔇 Additional comments (2)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
1) Move models from route to models folder 2) fix process response to handle callback response
…tructured response
…nd error handling
23f67a4 to
96160c6
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/tests/utils/openai.py (1)
25-31: Avoid mutable default for vector_store_idsUsing a list as a default arg can leak state across tests. Initialize inside the function.
Apply:
def mock_openai_assistant( assistant_id: str = "assistant_mock", - vector_store_ids: Optional[list[str]] = ["vs_1", "vs_2"], + vector_store_ids: Optional[list[str]] = None, max_num_results: int = 30, ) -> OpenAIAssistant: - return OpenAIAssistant( + if vector_store_ids is None: + vector_store_ids = ["vs_1", "vs_2"] + return OpenAIAssistant(backend/app/tests/services/response/test_jobs.py (1)
1-38: Fix PyTest “import file mismatch” by renaming this test module.Another module named
test_jobs.pyalready exists atbackend/app/tests/crud/test_jobs.py. Rename this file to a unique basename, e.g.,test_response_jobs_service.py.
🧹 Nitpick comments (26)
backend/app/utils.py (2)
7-7: Tighten typing and generic return for APIResponse
- Prefer built-in typing (dict | None) over typing.Dict/Optional.
- failure_response returns data but is annotated as APIResponse[None]. Make it APIResponse[T | None].
Apply:
-from typing import Any, Dict, Generic, Optional, TypeVar +from typing import Any, Generic, TypeVar @@ class APIResponse(BaseModel, Generic[T]): success: bool - data: Optional[T] = None - error: Optional[str] = None - metadata: Optional[Dict[str, Any]] = None + data: T | None = None + error: str | None = None + metadata: dict[str, Any] | None = None @@ @classmethod def failure_response( - cls, - error: str | list, - data: Optional[T] = None, - metadata: Optional[Dict[str, Any]] = None, - ) -> "APIResponse[None]": + cls, + error: str | list[dict[str, Any]] | dict[str, Any], + data: T | None = None, + metadata: dict[str, Any] | None = None, + ) -> "APIResponse[T | None]": - if isinstance(error, list): # to handle cases when error is a list of errors - error_message = "\n".join([f"{err['loc']}: {err['msg']}" for err in error]) + if isinstance(error, list): + error_message = "\n".join(f"{err.get('loc')}: {err.get('msg')}" for err in error) + elif isinstance(error, dict) and "msg" in error: + error_message = f"{error.get('loc')}: {error['msg']}" else: error_message = errorAlso applies to: 29-40, 41-54
208-223: Narrow bare except in handle_openai_errorReplace bare except with Exception to satisfy linters and avoid swallowing BaseException.
Apply:
- except: + except Exception: passbackend/app/models/job.py (1)
36-44: Index commonly queried fieldsIndex status, job_type, and created_at to speed lookups (e.g., dashboards, cleaners).
Apply:
- status: JobStatus = Field( - default=JobStatus.PENDING, description="Current state of the job." - ) - job_type: JobType = Field( - description="Job type or classification (e.g., response job, ingestion job)." - ) - created_at: datetime = Field(default_factory=now) + status: JobStatus = Field( + default=JobStatus.PENDING, description="Current state of the job.", index=True + ) + job_type: JobType = Field( + description="Job type or classification (e.g., response job, ingestion job).", + index=True, + ) + created_at: datetime = Field(default_factory=now, index=True)Note: requires an Alembic migration.
backend/app/models/response.py (2)
10-12: Use Pydantic v2 config style (if on v2)SQLModel on Pydantic v2 expects model_config instead of inner Config.
Apply if using Pydantic v2:
+from pydantic import ConfigDict @@ - class Config: - extra = "allow" + model_config = ConfigDict(extra="allow") @@ - class Config: - extra = "allow" + model_config = ConfigDict(extra="allow") @@ - class Config: - extra = "allow" + model_config = ConfigDict(extra="allow") @@ - class Config: - extra = "allow" + model_config = ConfigDict(extra="allow")If still on Pydantic v1, keep as-is for now.
Also applies to: 23-25, 31-33, 54-56
27-30: Constrain status typeResponseJobStatus.status is a free-form str. Consider Literal or reusing a lower-cased job status enum to avoid drift.
Would you like a small helper to map JobStatus (DB) -> API wire values (e.g., PROCESSING -> "processing")?
backend/app/tests/crud/test_jobs.py (1)
45-52: Capture pre-update timestamp to assert monotonicityThe session may update job in-place; compare against a saved value.
Apply:
- update_data = JobUpdate(status=JobStatus.SUCCESS, error_message="All good now") - updated_job = crud.update(job.id, update_data) + old_updated_at = job.updated_at + update_data = JobUpdate(status=JobStatus.SUCCESS, error_message="All good now") + updated_job = crud.update(job.id, update_data) @@ - assert updated_job.updated_at >= job.updated_at + assert updated_job.updated_at > old_updated_atbackend/app/tests/api/routes/test_responses.py (1)
18-20: Add leading slash to the endpoint pathFastAPI TestClient expects an absolute path; relative paths can misroute.
Apply:
- response = client.post( - "api/v1/responses", json=payload.model_dump(), headers=user_api_key_header - ) + response = client.post( + "/api/v1/responses", json=payload.model_dump(), headers=user_api_key_header + )backend/app/tests/services/response/response/test_generate_response.py (3)
27-29: Remove unused fixture/var and set an explicit mock return.
dbparam is unused; drop it.mock_responseis never used; remove it.- Also set a concrete return value for
mock_client.responses.createto avoid brittle MagicMock attribute chaining.Apply:
-def test_generate_response_success(db: Session, assistant_mock: Assistant): +def test_generate_response_success(assistant_mock: Assistant): """Test successful OpenAI response generation.""" - mock_response = MagicMock() mock_client = MagicMock() + mock_client.responses.create.return_value = mock_openai_response( + text="Paris is the capital of France." + )
1-3: Use the shared OpenAI response helper for realism.Import the test helper to generate a realistic OpenAI response object.
import pytest from unittest.mock import MagicMock +from app.tests.utils.openai import mock_openai_response
13-24: Fix fixture accuracy and typing.
- The docstring says “in DB” but the object isn’t persisted; reword it.
Assistant.idis an int in models; use an int to avoid type drift.- """Fixture to create an assistant in DB with id=123.""" + """Fixture to create an Assistant object (not persisted).""" assistant = Assistant( - id="123", + id=123, name="Test Assistant", model="gpt-4", temperature=0.7, instructions="You are a helpful assistant.", vector_store_ids=["vs1", "vs2"], max_num_results=5, )backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py (1)
51-68: Drop Postgres enum types on downgrade to avoid orphaned types.Otherwise subsequent re-migrations can fail.
op.create_foreign_key( "openai_conversation_project_id_fkey1", "openai_conversation", "project", ["project_id"], ["id"], ) op.drop_table("job") + bind = op.get_bind() + if bind.dialect.name == "postgresql": + op.execute("DROP TYPE IF EXISTS jobstatus") + op.execute("DROP TYPE IF EXISTS jobtype")backend/app/tests/services/response/test_jobs.py (1)
30-37: Optionally assert Celery trace propagation.You can assert the
trace_idforwarded to Celery to ensure correlation is preserved (expected default is "N/A" without a request context).args, kwargs = mock_schedule.call_args assert kwargs["trace_id"] in ("N/A", None) # or the correlation id you set in the testbackend/app/tests/services/response/response/test_process_response.py (1)
33-41: Avoid accidental network calls when creating an assistant.If
create_assistanthits OpenAI, patch it or pass a stubbed client to keep tests hermetic. Confirm it doesn’t perform I/O; otherwise:with patch("app.crud.create_assistant") as mock_create: mock_create.return_value = Assistant(assistant_id="asst_123", model="gpt-4", name="Test Assistant") ...backend/app/crud/jobs.py (1)
25-39: Minor: early-exit when no fields to update.If
job_updateis empty, skip commit/refresh to avoid a no-op write.update_data = job_update.model_dump(exclude_unset=True) - for field, value in update_data.items(): - setattr(job, field, value) + if not update_data: + return job + for field, value in update_data.items(): + setattr(job, field, value)backend/app/services/response/callbacks.py (2)
4-20: Harden against leaking sensitive request fields in callbacks.Since
ResponsesAPIRequestallows extras, blacklist alone is risky. Filter out common secret tokens.def get_additional_data(request: dict) -> dict: @@ - return {k: v for k, v in request.items() if k not in exclude_keys} + sensitive = {"api_key", "apikey", "secret", "password", "token", "credential", "cookie", "auth"} + return { + k: v + for k, v in request.items() + if k not in exclude_keys and not any(s in k.lower() for s in sensitive) + }
22-41: Optionally propagate callback send result.Returning a bool can help callers decide on retries. Currently it’s fire-and-forget.
-def send_response_callback(... ) -> None: +def send_response_callback(... ) -> bool: @@ - send_callback( + return send_callback( callback_url, { "success": callback_response.get("success", False), "data": { **(callback_response.get("data") or {}), **get_additional_data(request_dict), }, "error": callback_response.get("error"), "metadata": None, }, )backend/app/services/response/jobs.py (2)
1-15: Prune unused imports.Drop unused
HTTPException,engine,JobStatus, andsend_callback.-from fastapi import HTTPException -from sqlmodel import Session -from asgi_correlation_id import correlation_id -from app.core.db import engine +from sqlmodel import Session +from asgi_correlation_id import correlation_id @@ -from app.models import JobType, JobStatus, JobUpdate, ResponsesAPIRequest +from app.models import JobType, JobUpdate, ResponsesAPIRequest @@ -from app.api.routes.threads import send_callback
27-39: Persist Celerytask_idon the Job after scheduling.This helps correlate jobs with Celery tasks from the outset (before processing begins).
task_id = start_high_priority_job( function_path="app.services.response.jobs.execute_job", project_id=project_id, job_id=str(job.id), trace_id=trace_id, request_data=request.model_dump(), organization_id=organization_id, ) + # store Celery task id + job_crud.update(job.id, JobUpdate(task_id=task_id)) + logger.info( f"[start_job] Job scheduled to generate response | job_id={job.id}, project_id={project_id}, task_id={task_id}" )backend/app/services/response/response.py (3)
94-101: Ensure Langfuse tags are strings.
Assistant.idis an int; cast to str to satisfy Langfuse’slist[str]tags.tracer.start_trace( name="generate_response_async", - input={"question": request.question, "assistant_id": assistant.id}, + input={"question": request.question, "assistant_id": str(assistant.id)}, metadata={"callback_url": request.callback_url}, - tags=[assistant.id], + tags=[str(assistant.id)], )
145-151: Align log context with function name.The log tag says
process_response_taskinsidegenerate_response. Use a consistent label.- logger.error( - f"[process_response_task] OpenAI API error: {error_message}", + logger.error( + f"[generate_response] OpenAI API error: {error_message}", exc_info=True, )
200-207: Silence unusedtask_instancelint without breaking the signature.Keep the parameter (Celery passes it) but mark as intentionally unused.
def process_response( request: ResponsesAPIRequest, project_id: int, organization_id: int, job_id: UUID, task_id: str, task_instance, ) -> APIResponse: + # Celery passes `task_instance`; not used here. + del task_instancebackend/app/api/routes/responses.py (5)
29-36: Return 202 for async job start.Signal asynchronous semantics explicitly.
-@router.post("/responses", response_model=APIResponse[ResponseJobStatus]) +@router.post("/responses", response_model=APIResponse[ResponseJobStatus], status_code=202)
41-46: Add idempotency to avoid duplicate jobs on client retries.Consider deduping by a stable key (e.g.,
request.response_idor a hash of the request) instart_job/JobCrud.create.
71-84: Standardize error envelopes via APIResponse.failure_response.Keeps shape consistent and centralized.
- return JSONResponse( - status_code=e.status_code, - content={ - "success": False, - "data": additional_data if additional_data else None, - "error": str(e.detail), - "metadata": None, - }, - ) + return JSONResponse( + status_code=e.status_code, + content=APIResponse.failure_response( + error=str(e.detail), + data=additional_data if additional_data else None, + ).model_dump(), + )- return JSONResponse( - status_code=400, - content={ - "success": False, - "data": additional_data if additional_data else None, - "error": error_message, - "metadata": None, - }, - ) + status = getattr(e, "status_code", 400) + return JSONResponse( + status_code=status, + content=APIResponse.failure_response( + error=error_message, + data=additional_data if additional_data else None, + ).model_dump(), + )Also applies to: 183-191
65-65: Clarify docstring (not just “benchmarking”).Small wording nit to reflect production use.
- """Synchronous endpoint for benchmarking OpenAI responses API with Langfuse tracing.""" + """Synchronous endpoint calling OpenAI Responses API with Langfuse tracing."""
106-122: Guard against long hangs with client-level timeouts.Set reasonable connect/read timeouts when creating the OpenAI client (in
get_openai_client) and optionally enable retries with backoff.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py(1 hunks)backend/app/api/routes/responses.py(4 hunks)backend/app/crud/__init__.py(2 hunks)backend/app/crud/jobs.py(1 hunks)backend/app/models/__init__.py(2 hunks)backend/app/models/job.py(1 hunks)backend/app/models/response.py(1 hunks)backend/app/services/response/callbacks.py(1 hunks)backend/app/services/response/jobs.py(1 hunks)backend/app/services/response/response.py(1 hunks)backend/app/tests/api/routes/test_responses.py(1 hunks)backend/app/tests/crud/test_jobs.py(1 hunks)backend/app/tests/services/response/response/test_generate_response.py(1 hunks)backend/app/tests/services/response/response/test_process_response.py(1 hunks)backend/app/tests/services/response/test_jobs.py(1 hunks)backend/app/tests/utils/openai.py(2 hunks)backend/app/utils.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (14)
backend/app/utils.py (1)
backend/app/api/routes/collections.py (3)
success(163-164)success(172-174)success(198-200)
backend/app/tests/services/response/response/test_generate_response.py (5)
backend/app/core/langfuse/langfuse.py (1)
LangfuseTracer(12-109)backend/app/models/assistants.py (1)
Assistant(33-44)backend/app/models/response.py (1)
ResponsesAPIRequest(4-11)backend/app/services/response/response.py (1)
generate_response(82-154)backend/app/tests/conftest.py (1)
db(24-41)
backend/app/crud/__init__.py (2)
backend/app/crud/jobs.py (1)
JobCrud(11-42)backend/app/crud/openai_conversation.py (1)
get_ancestor_id_from_response(61-99)
backend/app/crud/jobs.py (1)
backend/app/models/job.py (3)
Job(20-43)JobType(16-17)JobUpdate(46-49)
backend/app/models/__init__.py (2)
backend/app/models/job.py (4)
Job(20-43)JobType(16-17)JobStatus(9-13)JobUpdate(46-49)backend/app/models/response.py (6)
CallbackResponse(47-55)Diagnostics(35-39)FileResultChunk(42-44)ResponsesAPIRequest(4-11)ResponseJobStatus(27-32)ResponsesSyncAPIRequest(14-24)
backend/app/models/response.py (1)
backend/app/tests/api/routes/test_assistants.py (1)
assistant_id(25-26)
backend/app/tests/services/response/response/test_process_response.py (8)
backend/app/services/response/response.py (1)
process_response(200-282)backend/app/models/response.py (1)
ResponsesAPIRequest(4-11)backend/app/models/job.py (3)
Job(20-43)JobStatus(9-13)JobType(16-17)backend/app/tests/conftest.py (1)
db(24-41)backend/app/utils.py (1)
APIResponse(29-53)backend/app/tests/utils/test_data.py (1)
create_test_credential(103-130)backend/app/tests/utils/openai.py (2)
mock_openai_response(57-87)generate_openai_id(17-22)backend/app/crud/jobs.py (1)
JobCrud(11-42)
backend/app/tests/services/response/test_jobs.py (5)
backend/app/models/response.py (1)
ResponsesAPIRequest(4-11)backend/app/models/job.py (2)
JobType(16-17)JobStatus(9-13)backend/app/crud/jobs.py (2)
JobCrud(11-42)get(41-42)backend/app/tests/utils/utils.py (1)
get_project(70-89)backend/app/tests/conftest.py (1)
db(24-41)
backend/app/services/response/jobs.py (7)
backend/app/crud/jobs.py (1)
JobCrud(11-42)backend/app/models/job.py (3)
JobType(16-17)JobStatus(9-13)JobUpdate(46-49)backend/app/models/response.py (1)
ResponsesAPIRequest(4-11)backend/app/utils.py (1)
APIResponse(29-53)backend/app/celery/utils.py (1)
start_high_priority_job(18-43)backend/app/services/response/response.py (1)
process_response(200-282)backend/app/services/response/callbacks.py (1)
send_response_callback(22-41)
backend/app/services/response/callbacks.py (1)
backend/app/utils.py (2)
APIResponse(29-53)send_callback(225-237)
backend/app/tests/crud/test_jobs.py (3)
backend/app/crud/jobs.py (4)
JobCrud(11-42)create(15-23)get(41-42)update(25-39)backend/app/models/job.py (3)
JobUpdate(46-49)JobStatus(9-13)JobType(16-17)backend/app/tests/conftest.py (1)
db(24-41)
backend/app/tests/api/routes/test_responses.py (2)
backend/app/models/response.py (1)
ResponsesAPIRequest(4-11)backend/app/tests/conftest.py (2)
client(52-55)user_api_key_header(77-79)
backend/app/services/response/response.py (6)
backend/app/core/langfuse/langfuse.py (6)
LangfuseTracer(12-109)start_trace(53-69)start_generation(71-84)end_generation(86-93)update_trace(95-97)log_error(99-106)backend/app/crud/jobs.py (1)
JobCrud(11-42)backend/app/crud/credentials.py (1)
get_provider_credential(102-130)backend/app/crud/openai_conversation.py (3)
create_conversation(140-163)get_ancestor_id_from_response(61-99)get_conversation_by_ancestor_id(41-58)backend/app/models/response.py (4)
CallbackResponse(47-55)Diagnostics(35-39)FileResultChunk(42-44)ResponsesAPIRequest(4-11)backend/app/utils.py (5)
APIResponse(29-53)get_openai_client(175-205)mask_string(163-172)failure_response(42-53)success_response(36-39)
backend/app/api/routes/responses.py (7)
backend/app/api/deps.py (2)
get_db(33-35)get_current_user_org_project(110-131)backend/app/core/langfuse/langfuse.py (1)
LangfuseTracer(12-109)backend/app/models/response.py (5)
CallbackResponse(47-55)Diagnostics(35-39)ResponsesAPIRequest(4-11)ResponseJobStatus(27-32)ResponsesSyncAPIRequest(14-24)backend/app/services/response/jobs.py (1)
start_job(19-39)backend/app/services/response/response.py (1)
get_file_search_results(41-50)backend/app/services/response/callbacks.py (1)
get_additional_data(4-19)backend/app/utils.py (4)
APIResponse(29-53)get_openai_client(175-205)handle_openai_error(208-222)success_response(36-39)
🪛 Ruff (0.13.1)
backend/app/utils.py
7-7: typing.Dict is deprecated, use dict instead
(UP035)
220-220: Do not use bare except
(E722)
backend/app/tests/services/response/response/test_generate_response.py
27-27: Unused function argument: db
(ARG001)
29-29: Local variable mock_response is assigned to but never used
Remove assignment to unused variable mock_response
(F841)
backend/app/services/response/response.py
206-206: Unused function argument: task_instance
(ARG001)
🪛 GitHub Actions: AI Platform CI
backend/app/tests/services/response/test_jobs.py
[error] 1-1: import file mismatch: imported module 'test_jobs' has this file attribute: /home/runner/work/ai-platform/ai-platform/backend/app/tests/crud/test_jobs.py which is not the same as the test file we want to collect: /home/runner/work/ai-platform/ai-platform/backend/app/tests/services/response/test_jobs.py. HINT: remove pycache / .pyc files and/or use a unique basename for your test file modules
🔇 Additional comments (10)
backend/app/tests/utils/openai.py (1)
57-87: Mock shape looks sufficientThe SimpleNamespace response covers IDs, output, and usage fields used in tests. No blockers.
backend/app/models/__init__.py (1)
18-19: Re-exports look consistentThe new public surface (Job*, Responses*, CallbackResponse, etc.) is cleanly exposed.
Also applies to: 97-105
backend/app/crud/__init__.py (1)
13-14: CRUD exports LGTMExporting JobCrud and get_ancestor_id_from_response aligns with usage across services and tests.
Also applies to: 63-72
backend/app/tests/services/response/response/test_generate_response.py (1)
51-56: Confirm exception class alignment with installedopenaiSDK.Some SDK versions prefer
openai.APIErroroverOpenAIError. If CI/lint starts failing on import, switch tofrom openai import APIErrorand raise that instead.backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py (1)
38-47: Verify FK constraint names exist before dropping.The suffix “_fkey1” looks nonstandard and will fail if names differ. Please confirm actual names in your DB and adjust accordingly.
backend/app/tests/services/response/response/test_process_response.py (1)
59-89: LGTM: success path is well-isolated.Good use of patching
generate_responseand DB assertions forJobStatus.SUCCESS.backend/app/crud/jobs.py (1)
15-23: LGTM: create flow is idiomatic and safe.Commit + refresh pattern is correct.
backend/app/services/response/response.py (1)
56-68: Nice: callback payload includes diagnostics and file-search chunks.Solid structure for downstream consumers.
backend/app/api/routes/responses.py (2)
92-105: Langfuse instrumentation looks solid.Trace + generation boundaries and metadata are hooked correctly.
If you want, I can add light tests asserting trace start/update calls via a Langfuse test double.
156-170: Success envelope LGTM.Shape is consistent and diagnostics are complete.
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
backend/app/tests/services/response/test_jobs_response.py (2)
30-37: Also assert question is forwarded to the scheduler payload.This tightens the contract on request_data.
mock_schedule.assert_called_once() - _, kwargs = mock_schedule.call_args + _, kwargs = mock_schedule.call_args assert kwargs["function_path"] == "app.services.response.jobs.execute_job" assert kwargs["project_id"] == project.id assert kwargs["organization_id"] == project.organization_id assert kwargs["job_id"] == str(job_id) assert kwargs["request_data"]["assistant_id"] == "assistant_123" + assert kwargs["request_data"]["question"] == "What is the capital of France?"
16-17: Prefer a dedicated project fixture for determinism.Selecting “any active project” can couple test behavior to seed state. Consider a fixture that creates/returns a known test project.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
backend/app/tests/services/response/test_jobs_response.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/tests/services/response/test_jobs_response.py (5)
backend/app/models/response.py (1)
ResponsesAPIRequest(4-11)backend/app/models/job.py (2)
JobType(16-17)JobStatus(9-13)backend/app/crud/jobs.py (1)
JobCrud(11-42)backend/app/tests/utils/utils.py (1)
get_project(70-89)backend/app/tests/conftest.py (1)
db(24-41)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (2)
backend/app/tests/services/response/test_jobs_response.py (2)
10-37: LGTM: solid happy‑path coverage for start_job.Covers job creation, initial status, and scheduler invocation with key args. Nice.
21-29: Don't assert Celery task id in start_job test — start_job doesn't persist it.start_job only creates the Job and schedules the task (logs the task_id); Job.task_id is set later by the worker in process_response via JobCrud.update. See backend/app/services/response/jobs.py and backend/app/services/response/response.py.
Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (3)
backend/pyproject.toml (3)
27-28: Remove pytest and pre-commit from runtime dependencies.These are dev tools and should not ship in prod images; they already exist under dev-dependencies.
Apply this diff:
- "pytest>=7.4.4", - "pre-commit>=3.8.0",
61-63: Align Ruff target-version with supported Python (set to py311).requires-python is >=3.11,<4.0 but Ruff targets py310; update to match.
-[tool.ruff] -target-version = "py310" +[tool.ruff] +target-version = "py311"
37-37: Make Flower an optional extra (monitoring) — safe to remove from runtime depsrg found only release-notes references; no code imports/usages detected. Move Flower out of runtime deps or pin an upper bound.
Apply this diff to remove it from runtime:
- "flower>=2.0.1",Then add this block to declare an extra:
[project.optional-dependencies] monitoring = [ "flower>=2.0.1,<2.1.0" ]Install with: pip install .[monitoring]
If you must keep it in runtime, add an upper bound:
- "flower>=2.0.1", + "flower>=2.0.1,<2.1.0",Ensure the Flower UI is not publicly exposed without auth and run it as a separate service in compose/k8s.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
backend/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (1)
backend/pyproject.toml(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
|
@avirajsingh7 please resolve review comments before merging |
backend/app/tests/services/response/response/test_generate_response.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
backend/app/utils.py (1)
7-7: Update deprecated type hint.Use the built-in
dictinstead oftyping.Dictwhich is deprecated in Python 3.9+.-from typing import Any, Dict, Generic, Optional, TypeVar +from typing import Any, Generic, Optional, TypeVarAnd update line 33:
- metadata: Optional[Dict[str, Any]] = None + metadata: Optional[dict[str, Any]] = Nonebackend/app/tests/services/response/response/test_generate_response.py (1)
27-27: Remove unuseddbparameter from test.The test function declares a
dbparameter but doesn't use it.-def test_generate_response_success(db: Session, assistant_mock: Assistant): +def test_generate_response_success(assistant_mock: Assistant):
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py(1 hunks)backend/app/core/config.py(1 hunks)backend/app/services/response/jobs.py(1 hunks)backend/app/tests/services/response/response/test_generate_response.py(1 hunks)backend/app/tests/services/response/test_jobs_response.py(1 hunks)backend/app/utils.py(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/app/tests/services/response/test_jobs_response.py
- backend/app/alembic/versions/c6fb6d0b5897_create_job_table.py
🧰 Additional context used
🧬 Code graph analysis (3)
backend/app/services/response/jobs.py (7)
backend/app/crud/jobs.py (1)
JobCrud(11-42)backend/app/models/job.py (3)
JobType(16-17)JobStatus(9-13)JobUpdate(46-49)backend/app/models/response.py (1)
ResponsesAPIRequest(4-11)backend/app/utils.py (1)
APIResponse(29-53)backend/app/celery/utils.py (1)
start_high_priority_job(18-43)backend/app/services/response/response.py (1)
process_response(200-282)backend/app/services/response/callbacks.py (1)
send_response_callback(22-41)
backend/app/utils.py (2)
backend/app/api/routes/collections.py (3)
success(163-164)success(172-174)success(198-200)backend/app/api/routes/threads.py (2)
handle_openai_error(52-56)send_callback(37-49)
backend/app/tests/services/response/response/test_generate_response.py (5)
backend/app/core/langfuse/langfuse.py (1)
LangfuseTracer(12-109)backend/app/models/assistants.py (1)
Assistant(33-44)backend/app/models/response.py (1)
ResponsesAPIRequest(4-11)backend/app/services/response/response.py (1)
generate_response(82-154)backend/app/tests/conftest.py (1)
db(24-41)
🪛 Ruff (0.13.1)
backend/app/utils.py
7-7: typing.Dict is deprecated, use dict instead
(UP035)
220-220: Do not use bare except
(E722)
backend/app/tests/services/response/response/test_generate_response.py
27-27: Unused function argument: db
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (2)
backend/app/core/config.py (1)
120-120: LGTM! Good configuration addition for callback timeouts.The
CALLBACK_TIMEOUTtuple provides configurable timeout values for connect and read operations, which is essential for preventing hanging connections during callback operations. This aligns well with the Celery-backed processing changes.backend/app/utils.py (1)
225-240: Good implementation of timeout configuration!The
send_callbackfunction properly uses the configurableCALLBACK_TIMEOUTfrom settings and implements a context manager for session cleanup, addressing the previous review comments effectively.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
.env.example (1)
76-80: Fix dotenv formatting (lint failures): remove extra blank line and spaces around "=".Align with dotenv-linter: no spaces around "=" and avoid extra blank lines.
Apply:
- - # Callback Timeouts (in seconds) -CALLBACK_CONNECT_TIMEOUT = 3 -CALLBACK_READ_TIMEOUT = 10 +CALLBACK_CONNECT_TIMEOUT=3 +CALLBACK_READ_TIMEOUT=10.env.test.example (1)
32-34: Fix dotenv formatting (lint failures): remove spaces around "=".Comply with dotenv-linter.
# Callback Timeouts (in seconds) -CALLBACK_CONNECT_TIMEOUT = 3 -CALLBACK_READ_TIMEOUT = 10 +CALLBACK_CONNECT_TIMEOUT=3 +CALLBACK_READ_TIMEOUT=10backend/app/utils.py (1)
43-54: Correct generic return type for failure_response.It can return data on failures now; reflect this in the type.
- ) -> "APIResponse[None]": + ) -> "APIResponse[T | None]":
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
.env.example(1 hunks).env.test.example(1 hunks)backend/app/core/config.py(1 hunks)backend/app/utils.py(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/utils.py (1)
backend/app/api/routes/threads.py (2)
handle_openai_error(52-56)send_callback(37-49)
🪛 dotenv-linter (3.3.0)
.env.test.example
[warning] 33-33: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
[warning] 34-34: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
.env.example
[warning] 77-77: [ExtraBlankLine] Extra blank line detected
(ExtraBlankLine)
[warning] 79-79: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
[warning] 80-80: [SpaceCharacter] The line has spaces around equal sign
(SpaceCharacter)
🪛 Ruff (0.13.1)
backend/app/utils.py
7-7: typing.Dict is deprecated, use dict instead
(UP035)
220-220: Do not use bare except
(E722)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (2)
backend/app/utils.py (1)
208-223: Replace bare except with explicit exceptions (ruff E722).Avoid masking errors; log parse failures.
elif hasattr(e, "response") and hasattr(e.response, "json"): try: error_data = e.response.json() if isinstance(error_data, dict) and "error" in error_data: error_info = error_data["error"] if isinstance(error_info, dict) and "message" in error_info: return error_info["message"] - except: - pass + except (ValueError, KeyError, AttributeError, TypeError) as parse_err: + logger.debug( + "[handle_openai_error] Failed to parse error payload", + exc_info=True, + ) return str(e)backend/app/core/config.py (1)
121-124: Back-compat for CALLBACK_TIMEOUT — add shim or verify no remaining references.rg returned no matches for CALLBACK_TIMEOUT (no output); absence of matches isn't definitive — either add the compatibility property below or confirm there are no callers.
# callback timeouts CALLBACK_CONNECT_TIMEOUT: int = 3 CALLBACK_READ_TIMEOUT: int = 10 + # Backward-compat shim for legacy tuple usage + @computed_field # type: ignore[prop-decorator] + @property + def CALLBACK_TIMEOUT(self) -> tuple[int, int]: + return (self.CALLBACK_CONNECT_TIMEOUT, self.CALLBACK_READ_TIMEOUT)
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
backend/app/services/response/response.py (1)
202-208: Drop or underscore the unusedtask_instanceargument.Ruff is already flagging this as unused (ARG001). Either remove the parameter or rename it to
_task_instanceso the task signature stays compatible without tripping lint.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/models/response.py(1 hunks)backend/app/services/response/response.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/models/response.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/services/response/response.py (8)
backend/app/core/langfuse/langfuse.py (6)
LangfuseTracer(12-109)start_trace(53-69)start_generation(71-84)end_generation(86-93)update_trace(95-97)log_error(99-106)backend/app/crud/jobs.py (1)
JobCrud(11-42)backend/app/crud/assistants.py (1)
get_assistant_by_id(19-30)backend/app/crud/credentials.py (1)
get_provider_credential(102-130)backend/app/crud/openai_conversation.py (3)
create_conversation(140-163)get_ancestor_id_from_response(61-99)get_conversation_by_ancestor_id(41-58)backend/app/models/response.py (4)
CallbackResponse(47-54)Diagnostics(35-39)FileResultChunk(42-44)ResponsesAPIRequest(4-11)backend/app/models/openai_conversation.py (2)
OpenAIConversationCreate(72-98)OpenAIConversation(58-69)backend/app/utils.py (5)
APIResponse(29-53)get_openai_client(175-205)mask_string(163-172)failure_response(42-53)success_response(36-39)
🪛 Ruff (0.13.1)
backend/app/services/response/response.py
207-207: Unused function argument: task_instance
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/app/tests/crud/test_jobs.py (1)
48-52: Capture the pre-update timestamp before asserting onupdated_atBecause
jobandupdated_jobreference the same identity within the session, Line 51 currently compares the post-update value to itself and would pass even ifJobCrud.updatestopped bumpingupdated_at. Stash the original timestamp before callingupdate(...)so the assertion actually verifies thatupdated_atadvanced.You can tighten the check like this:
- update_data = JobUpdate(status=JobStatus.FAILED, error_message="Error occurred") - updated_job = crud.update(job.id, update_data) + previous_updated_at = job.updated_at + update_data = JobUpdate(status=JobStatus.FAILED, error_message="Error occurred") + updated_job = crud.update(job.id, update_data) @@ - assert updated_job.updated_at >= job.updated_at + assert previous_updated_at is not None + assert updated_job.updated_at > previous_updated_at
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
backend/app/tests/crud/test_jobs.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/tests/crud/test_jobs.py (3)
backend/app/crud/jobs.py (4)
JobCrud(11-42)create(15-23)get(41-42)update(25-39)backend/app/models/job.py (3)
JobUpdate(46-49)JobStatus(9-13)JobType(16-17)backend/app/tests/conftest.py (1)
db(24-41)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
… request metadata
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/services/response/callbacks.py(1 hunks)backend/app/tests/crud/test_jobs.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/tests/crud/test_jobs.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/services/response/callbacks.py (2)
backend/app/models/response.py (2)
ResponsesAPIRequest(4-11)ResponsesSyncAPIRequest(14-24)backend/app/utils.py (2)
APIResponse(29-53)send_callback(225-244)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.11.7, 6)
Issues attributed to commits in this pull requestThis pull request was merged and Sentry observed the following issues:
|
Summary
This PR refactors the existing response API to use Celery for background processing of responses.
We introduce a Job model to track the status of a response task (PENDING → PROCESSING → SUCCESS/FAILED), schedule tasks via Celery, and send callbacks once processing completes.
What Changed
Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.Summary by CodeRabbit
New Features
Refactor
Chores
Tests