Skip to content

Add XCom read access to callback supervisor comms channel#66611

Open
seanghaeli wants to merge 3 commits into
apache:mainfrom
aws-mwaa:ghaeli/callback-xcom-comms
Open

Add XCom read access to callback supervisor comms channel#66611
seanghaeli wants to merge 3 commits into
apache:mainfrom
aws-mwaa:ghaeli/callback-xcom-comms

Conversation

@seanghaeli

@seanghaeli seanghaeli commented May 8, 2026

Copy link
Copy Markdown
Contributor

Add read-only XCom access to the callback supervisor IPC channel, allowing
callbacks to fetch XCom values from upstream tasks via the same mechanism
used for Connections and Variables.

Motivation

PR #65269 added Connection, Variable, and MaskSecret comms channels to the
callback supervisor. XCom was deferred because it requires explicit dag_id,
run_id, and task_id (callbacks have no implicit task context). This PR adds
that missing piece.

Changes

  • Add GetXCom to the CallbackToSupervisor union type
  • Add handle_get_xcom shared handler in request_handlers.py
  • Route GetXCom messages in CallbackSubprocess._handle_request
  • Add tests for both basic XCom fetch and fetch with map_index/include_prior_dates

Only read access (GetXCom) is exposed. SetXCom and DeleteXCom remain out of
scope for callbacks since they should not be producing XCom values.

related: #65269


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4.6)

Generated-by: Claude Code (Opus 4.6) following the guidelines

@ferruzzi

Copy link
Copy Markdown
Contributor

What is the plan for plumbing the dag_id, run_id, etc through here? You don't seem to have addressed thr "motivation" part of your description. The reason I didn't include this one in the initial batch of comms channels was because GetXCom requires those and I didn't want users to have to manually provide those like XCom.get(dag_id=context["dag_id"], run_id=context["run_id"], task_id="upstream_task", key="return_value"), it should work the same way as a task so that context info needs to get provided "magically" somehow.

@seanghaeli seanghaeli marked this pull request as ready for review May 22, 2026 22:54
@seanghaeli

seanghaeli commented May 25, 2026

Copy link
Copy Markdown
Contributor Author

Good question. The plan is for PR #66608 to provide the context plumbing — once a callback has access to dag_id and run_id via its context (either through the triggerer supervisor or executor subprocess SUPERVISOR_COMMS), XCom.get() inside a callback can use those automatically rather than requiring the user to specify them manually.

So this PR gives the mechanism (XCom comms channel), and #66608 gives the identifiers. Once #66608 lands, we can add a convenience wrapper that pre-fills dag_id/run_id from context. For now, the user passes them explicitly — still useful for callbacks that know which task's XCom they want to read.

@seanghaeli seanghaeli marked this pull request as draft May 25, 2026 22:01
@seanghaeli seanghaeli force-pushed the ghaeli/callback-xcom-comms branch from 90f3323 to 91e5a68 Compare May 27, 2026 23:01
@seanghaeli seanghaeli marked this pull request as ready for review May 27, 2026 23:11
@seanghaeli seanghaeli force-pushed the ghaeli/callback-xcom-comms branch from 4a33ab3 to 454244a Compare May 28, 2026 16:33
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Jun 1, 2026
@seanghaeli seanghaeli force-pushed the ghaeli/callback-xcom-comms branch 4 times, most recently from cfdf5dc to 94b8c53 Compare June 8, 2026 20:44
@o-nikolas o-nikolas added this to the Airflow 3.3.0 milestone Jun 15, 2026
Accept workload tokens on the get_xcom, connections, and variables execution
API routes so a deadline callback subprocess can read XCom values (and
connections/variables) via the supervisor comms channel, mirroring task access.
@seanghaeli seanghaeli force-pushed the ghaeli/callback-xcom-comms branch from 94b8c53 to cccbd89 Compare June 15, 2026 21:38
@vincbeck vincbeck added the backport-to-v3-3-test Backport to v3-3-test label Jun 17, 2026
seanghaeli pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jun 17, 2026
This PR's callback comms only use GetDagRun/GetConnection/GetVariable, never
GetXCom, so the token:workload scope changes on the XCom execution-API route
(and their TestWorkloadTokenScopeEnforcement tests) had no consumer here — they
are apache#66611's deliverable (callback XCom read access) and were riding along.
Revert xcoms.py and test_xcoms.py to main so this PR stays scoped to callback
context fetching; apache#66611 owns the XCom route + comms changes.
seanghaeli pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jun 19, 2026
This PR's callback comms only use GetDagRun/GetConnection/GetVariable, never
GetXCom, so the token:workload scope changes on the XCom execution-API route
(and their TestWorkloadTokenScopeEnforcement tests) had no consumer here — they
are apache#66611's deliverable (callback XCom read access) and were riding along.
Revert xcoms.py and test_xcoms.py to main so this PR stays scoped to callback
context fetching; apache#66611 owns the XCom route + comms changes.
Callback supervisors carry a workload-scoped JWT, but the XCom read routes
only accepted execution-scoped tokens, so a callback's XCom read got 403
("Token type 'workload' not allowed"). Mirror the variables/connections
routes: set route_class=ExecutionAPIRoute on the router (so per-route
token:* scopes are honored) and add token:workload to the GET item/slice
and HEAD routes alongside the existing get_xcom route. POST/DELETE stay
execution-only.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk backport-to-v3-3-test Backport to v3-3-test ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants