From 2ff3f8b94e035406676bb519d2497aab179ae60f Mon Sep 17 00:00:00 2001 From: GUAN MING Date: Tue, 22 Apr 2025 11:24:10 +0800 Subject: [PATCH] docs: add Pushing XComs from Deferred example Co-Authored-By: Avyukt Soni <95626105+avyuktsoni0731@users.noreply.github.com> --- .../authoring-and-scheduling/deferring.rst | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/airflow-core/docs/authoring-and-scheduling/deferring.rst b/airflow-core/docs/authoring-and-scheduling/deferring.rst index 68933af157d74..89514b7ab2d85 100644 --- a/airflow-core/docs/authoring-and-scheduling/deferring.rst +++ b/airflow-core/docs/authoring-and-scheduling/deferring.rst @@ -460,6 +460,40 @@ In the above example, the trigger will end the task instance directly if ``end_f Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases. +Pushing XComs from Deferred Tasks +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. versionadded:: 2.10.0 + +Triggers can automatically push XCom values to task instances using the ``self.xcoms`` property. When a trigger emits events like ``TaskSuccessEvent`` or ``TaskFailureEvent``, any values stored in ``self.xcoms`` are automatically pushed to the XCom of the corresponding task instance. + +This simplifies XCom management in deferred tasks and eliminates the need to implement or call any internal methods. + +Below is an example of how to set XCom values within a trigger: + +.. code-block:: python + + class WaitFiveHourTrigger(BaseTrigger): + def __init__(self, duration: timedelta, xcom_value: dict[str, Any] | None = None): + super().__init__() + self.duration = duration + self.xcoms = xcom_value # setting xcom values to be pushed automatically + + def serialize(self) -> tuple[str, dict[str, Any]]: + return ( + "your_module.WaitFiveHourTrigger", + {"duration": self.duration, "xcom_value": self.xcoms}, + ) + + async def run(self) -> AsyncIterator[TriggerEvent]: + await asyncio.sleep(self.duration.total_seconds()) + yield TaskSuccessEvent() # xcom values are pushed automatically + +In this example, the ``xcom_value`` dictionary is set to ``self.xcoms`` when the trigger is initialized. When the trigger emits a ``TaskSuccessEvent``, the framework automatically pushes these XCom values to the task instance. Downstream tasks can then retrieve these values for further processing. + +This approach is lightweight and fully supported by the framework, without requiring any manual implementation of internal methods. + + High Availability -----------------