Skip to content

Version the worker-bound TaskInstance fields in the execution API schema#68390

Merged
kaxil merged 7 commits into
apache:mainfrom
astronomer:fix-execution-api-ti-schema
Jun 11, 2026
Merged

Version the worker-bound TaskInstance fields in the execution API schema#68390
kaxil merged 7 commits into
apache:mainfrom
astronomer:fix-execution-api-ti-schema

Conversation

@kaxil

@kaxil kaxil commented Jun 11, 2026

Copy link
Copy Markdown
Member

Why

Since #65958 the supervisor routes every task through get_coordinator_manager().for_queue(ti.queue), so it depends on queue being on the task instance it receives. That field (along with pool_slots and priority_weight) lived in a BaseTaskInstanceDTO duplicated between airflow-core and task-sdk (#67174), kept aligned by an AST-comparison prek hook, while the TaskInstance schema in the versioned execution API spec lacked it.

That spec is already the designated home for this vocabulary: get_extra_schemas() injects TaskInstance (and BundleInfo, imported from airflow.executors.workloads) into the OpenAPI schema precisely so client SDKs generate these types from a versioned source. Adding the missing field there gives one source of truth instead of two hand-synced copies.

What

  • Add queue to the execution API TaskInstance schema, with an AddTaskInstanceQueueField Cadwyn version change in the 2026-06-30 bundle, and regenerate the task-sdk datamodels.
  • Re-point StartupDetails.ti, supervise_task() and the coordinator interfaces at the generated TaskInstance.
  • Fold the core TaskInstanceDTO into a subclass of the schema model that adds only the executor-side fields: pool_slots and priority_weight (read by the executor's priority sort and the edge executor's concurrency accounting, never by the worker), plus external_executor_id and executor_config (excluded from serialization).
  • Delete the duplicated task-sdk execution_time.workloads module and the check-task-instance-dto-sync hook. A round-trip test (executor DTO JSON validated by the SDK's generated model) covers the contract the hook used to enforce.
  • Remove parent_context_carrier from the DTO: it has no Python readers or writers anywhere in core, task-sdk or providers.

Design notes

  • Only queue is worker-facing: the supervisor routes on it. pool_slots and priority_weight are executor concerns and stay on the executor DTO, still serialized in the workload (required, exactly the pre-PR wire shape) so older workers that deserialize the workload with the required-field model keep working.
  • queue gets a default ("default") instead of being required. The executor always sends the real value (ExecuteTask.make validates from the ORM task instance, where the column is non-null); the default keeps hand-built instances such as tests and dry_run runs valid, and matches how map_index and hostname are already modelled in this schema.
  • No released artifact ships the deleted surfaces: the duplicated DTO module and the supervisor wire-schema bundle only exist on the unreleased 3.3 line, so no compat shims are needed. The supervisor wire-schema snapshot change rides the in-progress floor version of that bundle, where no VersionChange entry is possible.
  • The edge3 worker API spec regenerates via its existing hook because it embeds the workload TI shape. The checked-in TypeScript/Go clients under plugins/www/openapi-gen and go-sdk/pkg/edgeapi are not regenerated here: their codegen is not wired into prek and already lags the spec on main (the Go client was last regenerated in Simple sending of return value from Go tasks to XCom #56481), and the affected fields are optional in both.

The supervisor's task routing (added in apache#65958) reads queue, pool_slots and
priority_weight from the TaskInstance it receives, but those fields lived in
a TaskInstanceDTO duplicated between airflow-core and task-sdk and kept in
sync by an AST-comparison prek hook, outside the versioned execution API
schema the Task SDK datamodels are generated from.

Add the three fields to the execution API TaskInstance schema (with a Cadwyn
version change) and regenerate the SDK datamodels, so the generated
TaskInstance carries everything the supervisor needs. Re-point StartupDetails
and the coordinator interfaces at the generated model, fold the core
TaskInstanceDTO into a subclass of the schema model that only adds the
executor-side fields, and drop the duplicated task-sdk DTO and the sync hook.

The unused parent_context_carrier field (no readers or writers) is removed.
@boring-cyborg boring-cyborg Bot added area:dev-tools area:Executors-core LocalExecutor & SequentialExecutor area:providers area:task-sdk kind:documentation provider:edge Edge Executor / Worker (AIP-69) / edge3 labels Jun 11, 2026
@ashb

ashb commented Jun 11, 2026

Copy link
Copy Markdown
Member

through get_coordinator_manager().for_queue(ti.queue), so it depends on queue, pool_slots and priority_weight on the task instance it receives.

The last two being required on the worker seems like a code smell. If it makes it to the worker it should just executed. That is the scheduler's job, not the workers

Comment thread airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py Outdated
…he schema

The supervisor only routes on queue; pool_slots and priority_weight are
executor concerns (queued-workload priority ordering and edge concurrency
slot accounting) that the worker never reads. Keep them on the executor
DTO, serialized as before, and add only queue to the worker-facing schema.
@kaxil

kaxil commented Jun 11, 2026

Copy link
Copy Markdown
Member Author

Agreed and fixed in 78a6b82 -- the worker-facing schema now only gains queue; pool_slots and priority_weight remain executor-only fields on the DTO.

Comment thread airflow-core/docs/core-concepts/executor/index.rst
Comment thread task-sdk/src/airflow/sdk/api/datamodels/_generated.py
Comment thread task-sdk/src/airflow/sdk/execution_time/schema/versions/__init__.py Outdated

@ashb ashb left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few nits/Qs but LGTM overall

@ashb ashb marked this pull request as ready for review June 11, 2026 16:08

@jason810496 jason810496 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice catch, thanks. Do we need full tests before merge Btw?

@kaxil kaxil force-pushed the fix-execution-api-ti-schema branch from 586b49e to 3d48d40 Compare June 11, 2026 16:10
@kaxil kaxil added the full tests needed We need to run full set of tests for this PR to merge label Jun 11, 2026
The generated TaskInstance no longer accepts pool_slots/priority_weight
kwargs, its queue is Optional in the generated typing, and the DTO now
validates the inherited hostname field that the ECS adoption test's mock
did not set.
@kaxil kaxil merged commit 872427c into apache:main Jun 11, 2026
145 checks passed
@kaxil kaxil deleted the fix-execution-api-ti-schema branch June 11, 2026 18:39
imrichardwu pushed a commit to imrichardwu/airflow that referenced this pull request Jun 16, 2026
…ema (apache#68390)

The supervisor's task routing (added in apache#65958) reads queue, pool_slots and
priority_weight from the TaskInstance it receives, but those fields lived in
a TaskInstanceDTO duplicated between airflow-core and task-sdk and kept in
sync by an AST-comparison prek hook, outside the versioned execution API
schema the Task SDK datamodels are generated from.

Add the three fields to the execution API TaskInstance schema (with a Cadwyn
version change) and regenerate the SDK datamodels, so the generated
TaskInstance carries everything the supervisor needs. Re-point StartupDetails
and the coordinator interfaces at the generated model, fold the core
TaskInstanceDTO into a subclass of the schema model that only adds the
executor-side fields, and drop the duplicated task-sdk DTO and the sync hook.

The unused parent_context_carrier field (no readers or writers) is removed.

* Keep pool_slots and priority_weight executor-side; only queue joins the schema

The supervisor only routes on queue; pool_slots and priority_weight are
executor concerns (queued-workload priority ordering and edge concurrency
slot accounting) that the worker never reads. Keep them on the executor
DTO, serialized as before, and add only queue to the worker-facing schema.

The generated TaskInstance no longer accepts pool_slots/priority_weight
kwargs, its queue is Optional in the generated typing, and the DTO now
validates the inherited hostname field that the ECS adoption test's mock
did not set.
dingo4dev pushed a commit to dingo4dev/airflow that referenced this pull request Jun 16, 2026
…ema (apache#68390)

The supervisor's task routing (added in apache#65958) reads queue, pool_slots and
priority_weight from the TaskInstance it receives, but those fields lived in
a TaskInstanceDTO duplicated between airflow-core and task-sdk and kept in
sync by an AST-comparison prek hook, outside the versioned execution API
schema the Task SDK datamodels are generated from.

Add the three fields to the execution API TaskInstance schema (with a Cadwyn
version change) and regenerate the SDK datamodels, so the generated
TaskInstance carries everything the supervisor needs. Re-point StartupDetails
and the coordinator interfaces at the generated model, fold the core
TaskInstanceDTO into a subclass of the schema model that only adds the
executor-side fields, and drop the duplicated task-sdk DTO and the sync hook.

The unused parent_context_carrier field (no readers or writers) is removed.

* Keep pool_slots and priority_weight executor-side; only queue joins the schema

The supervisor only routes on queue; pool_slots and priority_weight are
executor concerns (queued-workload priority ordering and edge concurrency
slot accounting) that the worker never reads. Keep them on the executor
DTO, serialized as before, and add only queue to the worker-facing schema.

The generated TaskInstance no longer accepts pool_slots/priority_weight
kwargs, its queue is Optional in the generated typing, and the DTO now
validates the inherited hostname field that the ECS adoption test's mock
did not set.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Executors-core LocalExecutor & SequentialExecutor area:providers area:task-sdk full tests needed We need to run full set of tests for this PR to merge provider:edge Edge Executor / Worker (AIP-69) / edge3

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants