Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions airflow-core/docs/authoring-and-scheduling/deferring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,40 @@ In the above example, the trigger will end the task instance directly if ``end_f
Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases.


Pushing XComs from Deferred Tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. versionadded:: 2.10.0

@eladkal eladkal Apr 22, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an easy way to generate a list of all our Trigger that were written before 2.10 and don't support this? so we can patch them? in ~1 week or so the min version for providers will be 2.10 so we can safely fix it.

Also maybe, an idea how to get the list... adding a test to make sure any sub class of BaseTrigger has implementation for this? That would also protect us for future adding new triggers

@guan404ming guan404ming May 1, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late. It took some time to deal with this. And I think this func is not supported for all triggers before 2.10.0 since there are not implementation of some thing like self.xcoms or something like the example do.
I only see the trigger test which use the functionality the example show

@pytest.mark.parametrize(
"event_cls, expected",
[
(TaskSuccessEvent, "success"),
(TaskFailedEvent, "failed"),
(TaskSkippedEvent, "skipped"),
],
)
@patch("airflow.utils.timezone.utcnow")
def test_submit_event_task_end(mock_utcnow, session, create_task_instance, event_cls, expected):

please let me know if there is anything I could help

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my point. Minimum version for Airflow in providers is 2.10 so we should open a tracking issue to modify all trigger we have to support it. Don't we?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After more investigation, it seems currently trigger is not accept xcoms #46677 (comment) after reworked. xcoms is passed when the event have xcoms.

Should we modify our example or try to modify trigger to accept "xcoms" when init?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could leave this here until there is any re-implementation plan.


Triggers can automatically push XCom values to task instances using the ``self.xcoms`` property. When a trigger emits events like ``TaskSuccessEvent`` or ``TaskFailureEvent``, any values stored in ``self.xcoms`` are automatically pushed to the XCom of the corresponding task instance.

This simplifies XCom management in deferred tasks and eliminates the need to implement or call any internal methods.

Below is an example of how to set XCom values within a trigger:

.. code-block:: python

class WaitFiveHourTrigger(BaseTrigger):
def __init__(self, duration: timedelta, xcom_value: dict[str, Any] | None = None):
super().__init__()
self.duration = duration
self.xcoms = xcom_value # setting xcom values to be pushed automatically

def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"your_module.WaitFiveHourTrigger",
{"duration": self.duration, "xcom_value": self.xcoms},
)

async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
yield TaskSuccessEvent() # xcom values are pushed automatically

In this example, the ``xcom_value`` dictionary is set to ``self.xcoms`` when the trigger is initialized. When the trigger emits a ``TaskSuccessEvent``, the framework automatically pushes these XCom values to the task instance. Downstream tasks can then retrieve these values for further processing.

This approach is lightweight and fully supported by the framework, without requiring any manual implementation of internal methods.


High Availability
-----------------

Expand Down