-
Notifications
You must be signed in to change notification settings - Fork 10
Celery : moving collections module to celery #385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8123e26
073a61c
a757f29
824652e
b471c3e
580c3f5
0de7cd0
19928ab
b1e8fb4
3d9b2fa
b962760
7764568
a569485
87ecdb6
96160c6
f150372
e7fe68d
e62fa53
bc40a18
f3f1565
7504ef7
1268dfb
6bc2a47
65380f8
db0c276
6fe72d6
eaf8202
fcbd764
4083b62
37e0b7a
2a69b55
7f948fb
e274c45
4397434
f5a5871
d741ef6
81e0890
8e8fe7d
5b6f2f2
4a1f5ff
14d50a8
1dcab67
7b09e3f
95ef105
4de8c1d
ebf0cc8
2c0a11d
94234b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| """delete processing and failed columns from collection table | ||
|
|
||
| Revision ID: 7ab577d3af26 | ||
| Revises: c6fb6d0b5897 | ||
| Create Date: 2025-10-06 13:59:28.561706 | ||
|
|
||
| """ | ||
| from alembic import op | ||
| import sqlalchemy as sa | ||
| import sqlmodel.sql.sqltypes | ||
|
|
||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "7ab577d3af26" | ||
| down_revision = "c6fb6d0b5897" | ||
nishika26 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def upgrade(): | ||
| op.execute( | ||
| """ | ||
| DELETE FROM collection | ||
| WHERE status IN ('processing', 'failed') | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sure no collection with empty
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not think there would be a situation where status turned to successful while no llm service id is there as they're committed together in the same transaction, they both persist atomically. By they i mean the status as successful and the llm service id , for reference - here . But for surity i will still add that sql query as well |
||
| """ | ||
| ) | ||
| op.execute( | ||
| """ | ||
| DELETE FROM collection | ||
| WHERE llm_service_id IS NULL | ||
| """ | ||
| ) | ||
|
|
||
|
|
||
| def downgrade(): | ||
| pass | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| """adding collection job table and altering collections table | ||
|
|
||
| Revision ID: b30727137e65 | ||
| Revises: 7ab577d3af26 | ||
| Create Date: 2025-10-05 14:19:14.213933 | ||
|
|
||
| """ | ||
| from alembic import op | ||
| import sqlalchemy as sa | ||
| import sqlmodel.sql.sqltypes | ||
| from sqlalchemy.dialects import postgresql | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "b30727137e65" | ||
| down_revision = "7ab577d3af26" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
| collection_job_status_enum = postgresql.ENUM( | ||
| "PENDING", | ||
| "PROCESSING", | ||
| "SUCCESSFUL", | ||
| "FAILED", | ||
| name="collectionjobstatus", | ||
| create_type=False, | ||
| ) | ||
|
|
||
| collection_action_type = postgresql.ENUM( | ||
| "CREATE", | ||
| "DELETE", | ||
| name="collectionactiontype", | ||
| create_type=False, | ||
| ) | ||
|
|
||
|
|
||
| def upgrade(): | ||
| collection_job_status_enum.create(op.get_bind(), checkfirst=True) | ||
| collection_action_type.create(op.get_bind(), checkfirst=True) | ||
| op.create_table( | ||
| "collection_jobs", | ||
| sa.Column("action_type", collection_action_type, nullable=False), | ||
| sa.Column("collection_id", sa.Uuid(), nullable=True), | ||
| sa.Column("project_id", sa.Integer(), nullable=False), | ||
| sa.Column("id", sa.Uuid(), nullable=False), | ||
| sa.Column("status", collection_job_status_enum, nullable=False), | ||
| sa.Column("task_id", sa.String(), nullable=True), | ||
| sa.Column("trace_id", sa.String(), nullable=True), | ||
| sa.Column("error_message", sa.Text(), nullable=True), | ||
| sa.Column("inserted_at", sa.DateTime(), nullable=False), | ||
| sa.Column("updated_at", sa.DateTime(), nullable=False), | ||
| sa.ForeignKeyConstraint( | ||
| ["collection_id"], ["collection.id"], ondelete="CASCADE" | ||
| ), | ||
| sa.ForeignKeyConstraint(["project_id"], ["project.id"], ondelete="CASCADE"), | ||
| sa.PrimaryKeyConstraint("id"), | ||
| ) | ||
|
|
||
| op.alter_column("collection", "created_at", new_column_name="inserted_at") | ||
| op.alter_column( | ||
| "collection", "llm_service_id", existing_type=sa.VARCHAR(), nullable=False | ||
| ) | ||
| op.alter_column( | ||
| "collection", "llm_service_name", existing_type=sa.VARCHAR(), nullable=False | ||
| ) | ||
| op.drop_constraint("collection_owner_id_fkey", "collection", type_="foreignkey") | ||
| op.drop_column("collection", "owner_id") | ||
| op.drop_column("collection", "status") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kartpop
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added an sql query migration file for this here |
||
| op.drop_column("collection", "error_message") | ||
|
|
||
|
|
||
| def downgrade(): | ||
| op.add_column( | ||
| "collection", | ||
| sa.Column("error_message", sa.VARCHAR(), autoincrement=False, nullable=True), | ||
| ) | ||
| collectionstatus = postgresql.ENUM( | ||
| "processing", "successful", "failed", name="collectionstatus" | ||
| ) | ||
|
|
||
| op.add_column( | ||
| "collection", | ||
| sa.Column( | ||
| "status", | ||
| collectionstatus, | ||
| server_default=sa.text("'processing'::collectionstatus"), | ||
| nullable=True, | ||
| ), | ||
| ) | ||
| op.add_column( | ||
| "collection", | ||
| sa.Column("owner_id", sa.Integer(), nullable=True), | ||
| ) | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| op.execute("UPDATE collection SET status = 'processing' WHERE status IS NULL") | ||
| op.execute("UPDATE collection SET owner_id = 1 WHERE owner_id IS NULL") | ||
| op.create_foreign_key( | ||
| "collection_owner_id_fkey", | ||
| "collection", | ||
| "user", | ||
| ["owner_id"], | ||
| ["id"], | ||
| ondelete="CASCADE", | ||
| ) | ||
| op.alter_column("collection", "status", nullable=False) | ||
| op.alter_column("collection", "owner_id", nullable=False) | ||
| op.alter_column("collection", "inserted_at", new_column_name="created_at") | ||
| op.alter_column( | ||
| "collection", "llm_service_name", existing_type=sa.VARCHAR(), nullable=True | ||
| ) | ||
| op.alter_column( | ||
| "collection", "llm_service_id", existing_type=sa.VARCHAR(), nullable=True | ||
| ) | ||
| op.drop_table("collection_jobs") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,4 @@ | ||
| Retrieve all AI-platform information about a collection given its | ||
| ID. This route is very helpful for: | ||
| Retrieve detailed information about a specific collection by its ID from the collection table. Note that this endpoint CANNOT be used as a polling endpoint for collection creation because an entry will be made in the collection table only after the resource creation and association has been successful. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One suggestion for all the md/docs files: these are user-facing documents, so they should focus on describing the behavior of the endpoints, not their internal implementation details.
Similarly, avoid mentioning internal mechanisms like can be use for polling or background tasks unless they directly affect how the user interacts with the API.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. noted, but since for now the main users of this endpoint had been dalgo, I think keeping that in mind I wrote the docs in that way . Stating that you need to poll on the job id returned is something that the user needs to know in this case. The fact that the deletion is soft or hard is sometimes needed to be known considering that our current user of the platform is dalgo, glific and us(the internal team) . So I think we can keep the docs which go into the behaviour of endpoints , anyway we will discuss this with other people as well, and according to their opinion we will apply those changes in v2 |
||
|
|
||
| * Understanding whether a `create` request has finished | ||
| * Obtaining the OpenAI assistant ID (`llm_service_id`) | ||
| This endpoint returns metadata for the collection, including its project, organization, | ||
| timestamps, and associated LLM service details (`llm_service_id`). | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| Retrieve information about a collection job by the collection job ID. This endpoint can be considered the polling endpoint for collection creation job. This endpoint provides detailed status and metadata for a specific collection job | ||
| in the AI platform. It is especially useful for: | ||
|
|
||
| * Fetching the collection job object containing the ID which will be collection job id, collection ID, status of the job as well as error message. | ||
|
|
||
| * If the job has finished, has been successful and it was a job of creation of collection then this endpoint will fetch the associated collection details from the collection table, including: | ||
| - `llm_service_id`: The OpenAI assistant or model used for the collection. | ||
| - Collection metadata such as ID, project, organization, and timestamps. | ||
nishika26 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| * If the job of delete collection was successful, we will get the status as successful and nothing will be returned as collection. | ||
|
|
||
| * Containing a simplified error messages in the retrieved collection job object when a job has failed. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| import logging | ||
| from uuid import UUID | ||
|
|
||
| from fastapi import APIRouter | ||
| from fastapi import Path as FastPath | ||
|
|
||
|
|
||
| from app.api.deps import SessionDep, CurrentUserOrgProject | ||
| from app.crud import ( | ||
| CollectionCrud, | ||
| CollectionJobCrud, | ||
| ) | ||
| from app.models import CollectionJobStatus, CollectionJobPublic, CollectionActionType | ||
| from app.models.collection import CollectionPublic | ||
| from app.utils import APIResponse, load_description | ||
| from app.services.collections.helpers import extract_error_message | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
| router = APIRouter(prefix="/collections", tags=["collections"]) | ||
|
|
||
|
|
||
| @router.get( | ||
| "/info/jobs/{job_id}", | ||
| description=load_description("collections/job_info.md"), | ||
| response_model=APIResponse[CollectionJobPublic], | ||
| ) | ||
| def collection_job_info( | ||
| session: SessionDep, | ||
| current_user: CurrentUserOrgProject, | ||
| job_id: UUID = FastPath(description="Collection job to retrieve"), | ||
| ): | ||
| collection_job_crud = CollectionJobCrud(session, current_user.project_id) | ||
| collection_job = collection_job_crud.read_one(job_id) | ||
|
|
||
| job_out = CollectionJobPublic.model_validate(collection_job) | ||
|
|
||
| if ( | ||
| collection_job.status == CollectionJobStatus.SUCCESSFUL | ||
| and collection_job.action_type == CollectionActionType.CREATE | ||
| and collection_job.collection_id | ||
| ): | ||
| collection_crud = CollectionCrud(session, current_user.project_id) | ||
| collection = collection_crud.read_one(collection_job.collection_id) | ||
| job_out.collection = CollectionPublic.model_validate(collection) | ||
|
|
||
| if collection_job.status == CollectionJobStatus.FAILED and job_out.error_message: | ||
| job_out.error_message = extract_error_message(job_out.error_message) | ||
|
|
||
| return APIResponse.success_response(data=job_out) |
Uh oh!
There was an error while loading. Please reload this page.