From afcb3ecd5b593e7f2106c2f47f8f1e91b4ed6629 Mon Sep 17 00:00:00 2001 From: Nathan Hadfield Date: Tue, 26 May 2026 14:02:21 +0100 Subject: [PATCH] AIP-76: Propagate partition_date to consumers of partitioned assets Consumers of partitioned assets receive partition_key (str) but partition_date (datetime) is None on the consumer DagRun, so templates have to parse the key string. Propagate the datetime form alongside the string so consumers can use the canonical filter idiom `{{ partition_date | ds }}` and friends. The consumer's partition_date is computed alongside its target partition_key at APDR creation (in assets/manager.py:_queue_partitioned_dags), threaded in from the producer DagRun's partition_date via register_asset_change rather than stored on AssetEvent, and persisted on AssetPartitionDagRun. The scheduler copies apdr.partition_date into the consumer DagRun, so the date stays consistent with the mapper that produced the key. IdentityMapper passes the source date through; the StartOf*Mapper family normalizes via to_downstream_normalized; other mappers leave partition_date None and consumers fall back to partition_key. closes: #67239 --- .../docs/authoring-and-scheduling/assets.rst | 14 +- airflow-core/docs/migrations-ref.rst | 4 +- airflow-core/docs/templates-ref.rst | 3 + airflow-core/newsfragments/67285.feature.rst | 1 + .../core_api/datamodels/dag_run.py | 1 + .../openapi/v2-rest-api-generated.yaml | 7 + .../execution_api/datamodels/taskinstance.py | 1 + .../execution_api/versions/__init__.py | 2 + .../execution_api/versions/v2026_06_30.py | 14 + airflow-core/src/airflow/assets/manager.py | 58 +++- .../example_dags/example_asset_partition.py | 2 +- .../src/airflow/jobs/scheduler_job_runner.py | 41 ++- airflow-core/src/airflow/listeners/types.py | 3 + ...rtition_date_to_asset_partition_dag_run.py | 54 +++ airflow-core/src/airflow/models/asset.py | 1 + airflow-core/src/airflow/models/dagrun.py | 3 + .../src/airflow/models/taskinstance.py | 19 + .../src/airflow/partition_mappers/base.py | 15 + .../src/airflow/partition_mappers/identity.py | 11 + .../ui/openapi-gen/requests/schemas.gen.ts | 14 +- .../ui/openapi-gen/requests/types.gen.ts | 1 + airflow-core/src/airflow/utils/db.py | 2 +- .../core_api/routes/public/test_assets.py | 1 + .../core_api/routes/public/test_dag_run.py | 9 + .../versions/head/test_dag_runs.py | 1 + .../versions/head/test_task_instances.py | 1 + .../v2026_06_30/test_task_instances.py | 96 +++++ .../tests/unit/assets/test_manager.py | 118 +++++++ .../tests/unit/jobs/test_scheduler_job.py | 328 +++++++++++++++++- .../tests/unit/partition_mappers/test_base.py | 12 + .../unit/partition_mappers/test_identity.py | 9 + .../airflowctl/api/datamodels/generated.py | 1 + .../providers/standard/operators/python.py | 2 + .../airflow/sdk/api/datamodels/_generated.py | 1 + .../src/airflow/sdk/definitions/context.py | 1 + .../sdk/execution_time/schema/schema.json | 26 ++ .../airflow/sdk/execution_time/task_runner.py | 1 + task-sdk/src/airflow/sdk/types.py | 1 + .../execution_time/test_supervisor.py | 2 + .../execution_time/test_task_runner.py | 32 ++ 40 files changed, 883 insertions(+), 30 deletions(-) create mode 100644 airflow-core/newsfragments/67285.feature.rst create mode 100644 airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py create mode 100644 airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst b/airflow-core/docs/authoring-and-scheduling/assets.rst index 994cae815d397..d039fbfd1ffd3 100644 --- a/airflow-core/docs/authoring-and-scheduling/assets.rst +++ b/airflow-core/docs/authoring-and-scheduling/assets.rst @@ -631,7 +631,19 @@ partition match can be produced, so the downstream Dag is not triggered for that key. Inside partitioned Dag runs, access the resolved partition through -``dag_run.partition_key``. +``dag_run.partition_key``. When the consumer's partition mapper can +resolve the key to a ``datetime``, that value is also available as +``dag_run.partition_date``, so templates can use +``{{ partition_date | ds }}``. This covers the ``StartOf*Mapper`` family +(which decode the key directly), ``IdentityMapper`` (which carries the +producer's ``partition_date`` through), and composite mappers — +``RollupMapper``, ``ChainMapper`` and ``FanOutMapper`` — whose effective +child mapper is temporal (they delegate the anchor to that child). +Mappers whose key carries no temporal meaning (``ProductMapper``, +``AllowedKeyMapper`` and custom mappers that do not implement +``to_partition_date``) leave ``partition_date`` ``None`` even when the +resulting key is date-shaped, so those consumers should keep parsing +``partition_key``. You can also trigger a DagRun manually with a partition key (for example, through the Trigger Dag window in the UI, or through the REST API by diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index a69a8aa205825..d2cfde623523f 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``9ff64e1c35d3`` (head) | ``dd5f3a8e2b91`` | ``3.3.0`` | Add indexes on dag_run.created_dag_version_id and | +| ``d2f4e1b3c5a7`` (head) | ``9ff64e1c35d3`` | ``3.3.0`` | Add partition_date to asset_partition_dag_run. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``9ff64e1c35d3`` | ``dd5f3a8e2b91`` | ``3.3.0`` | Add indexes on dag_run.created_dag_version_id and | | | | | task_instance.dag_version_id. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``dd5f3a8e2b91`` | ``c20871fbf23a`` | ``3.3.0`` | Add rollup_fingerprint to AssetPartitionDagRun and index | diff --git a/airflow-core/docs/templates-ref.rst b/airflow-core/docs/templates-ref.rst index b1609ff0c943e..aeb907dbe470f 100644 --- a/airflow-core/docs/templates-ref.rst +++ b/airflow-core/docs/templates-ref.rst @@ -87,6 +87,9 @@ Variable Type Description | is enabled in ``airflow.cfg``. ``{{ partition_key }}`` str | None | The partition key from the current :class:`~airflow.models.dagrun.DagRun`. | Returns ``None`` if no partition key was set. Added in version 3.3.0. +``{{ partition_date }}`` datetime | None | The partition datetime from the current :class:`~airflow.models.dagrun.DagRun`. + | Use ``{{ partition_date | ds }}`` and related filters for formatting. + | Returns ``None`` if no partition date was set. Added in version 3.3.0. ``{{ var.value }}`` Airflow variables. See `Airflow Variables in Templates`_ below. ``{{ var.json }}`` Airflow variables. See `Airflow Variables in Templates`_ below. ``{{ conn }}`` Airflow connections. See `Airflow Connections in Templates`_ below. diff --git a/airflow-core/newsfragments/67285.feature.rst b/airflow-core/newsfragments/67285.feature.rst new file mode 100644 index 0000000000000..2c4370d5ff63a --- /dev/null +++ b/airflow-core/newsfragments/67285.feature.rst @@ -0,0 +1 @@ +Propagate ``partition_date`` from producer DagRuns to consumers of partitioned assets, so date-shaped partitions are available in consumer task templates. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 033437e2ad98b..0a530578644d7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -119,6 +119,7 @@ class DAGRunResponse(BaseModel): bundle_version: str | None dag_display_name: str = Field(validation_alias=AliasPath("dag_model", "dag_display_name")) partition_key: str | None + partition_date: datetime | None class DAGRunCollectionResponse(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index f82e665c9bea9..639f6fb990461 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -13704,6 +13704,12 @@ components: - type: string - type: 'null' title: Partition Key + partition_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Partition Date type: object required: - dag_run_id @@ -13727,6 +13733,7 @@ components: - bundle_version - dag_display_name - partition_key + - partition_date title: DAGRunResponse description: Dag Run serializer for responses. DAGRunsBatchBody: diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index fe7f5a5ce050d..bf20bcbc30c55 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -341,6 +341,7 @@ class DagRun(StrictBaseModel): triggering_user_name: str | None = None consumed_asset_events: list[AssetEventDagRunReference] partition_key: str | None + partition_date: UtcDateTime | None = None note: str | None = None team_name: str | None = None diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 332ddb28704ed..dc7035d31e3c9 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -44,6 +44,7 @@ AddAssetsByAliasEndpoint, AddAwaitingInputStatePayload, AddConnectionTestEndpoint, + AddPartitionDateField, AddRetryPolicyFields, AddTaskAndAssetStateStoreEndpoints, AddTaskInstanceQueueField, @@ -63,6 +64,7 @@ AddTeamNameField, AddTaskAndAssetStateStoreEndpoints, AddAssetsByAliasEndpoint, + AddPartitionDateField, ), Version( "2026-04-06", diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py index cbd801c0a9b0b..e89e2ed04cc5d 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py @@ -127,3 +127,17 @@ class AddTaskAndAssetStateStoreEndpoints(VersionChange): endpoint("/store/asset/by-uri/value", ["DELETE"]).didnt_exist, endpoint("/store/asset/by-uri/clear", ["DELETE"]).didnt_exist, ) + + +class AddPartitionDateField(VersionChange): + """Expose the consumer DagRun's partition datetime on the execution API so consumer tasks can template it.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = (schema(DagRun).field("partition_date").didnt_exist,) + + @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] + def remove_partition_date_from_dag_run(response: ResponseInfo) -> None: # type: ignore[misc] + """Strip ``partition_date`` from the nested ``dag_run`` payload for older clients.""" + if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): + response.body["dag_run"].pop("partition_date", None) diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index c8d2edef4f104..567c45a05bbd9 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -49,6 +49,8 @@ from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks if TYPE_CHECKING: + from datetime import datetime + from sqlalchemy.orm.session import Session from airflow.models.dag import DagModel @@ -274,6 +276,7 @@ def register_asset_change( source_alias_names: Collection[str] = (), session: Session, partition_key: str | None = None, + partition_date: datetime | None = None, source_is_api: bool = False, api_user_teams: set[str] | None = None, api_allow_consumer_teams: list[str] | None = None, @@ -394,6 +397,7 @@ def register_asset_change( source_map_index=asset_event.source_map_index, source_aliases=[aam.to_serialized() for aam in asset_alias_models], partition_key=partition_key, + partition_date=partition_date, ) ) @@ -440,6 +444,7 @@ def register_asset_change( asset_id=asset_model.id, dags_to_queue=dags_to_queue, partition_key=partition_key, + partition_date=partition_date, event=asset_event, task_instance=task_instance, session=session, @@ -485,6 +490,7 @@ def _queue_dagruns( asset_id: int, dags_to_queue: set[DagModel], partition_key: str | None, + partition_date: datetime | None, event: AssetEvent, task_instance: TaskInstance | None, session: Session, @@ -499,6 +505,7 @@ def _queue_dagruns( partition_dags=partition_dags, event=event, partition_key=partition_key, + partition_date=partition_date, task_instance=task_instance, session=session, ) @@ -527,6 +534,7 @@ def _queue_partitioned_dags( partition_dags: Iterable[DagModel], event: AssetEvent, partition_key: str | None, + partition_date: datetime | None, task_instance: TaskInstance | None, session: Session, ) -> None: @@ -574,9 +582,9 @@ def _queue_partitioned_dags( if (asset_model := session.scalar(select(AssetModel).where(AssetModel.id == asset_id))) is None: raise RuntimeError(f"Could not find asset for asset_id={asset_id}") + mapper = timetable.get_partition_mapper(name=asset_model.name, uri=asset_model.uri) try: # We'll need to catch every possible exception happen when mapping partition_key. - mapper = timetable.get_partition_mapper(name=asset_model.name, uri=asset_model.uri) target_key = mapper.to_downstream(partition_key) except Exception as err: log.exception( @@ -643,9 +651,18 @@ def _queue_partitioned_dags( ) continue + # The producer's partition_date (threaded in from its DagRun via + # register_asset_change) is carried onto the APDR only by mappers that + # opt in. IdentityMapper does, since its key carries no temporal meaning + # for the scheduler to re-derive at run creation; temporal and composite + # mappers return None here and are resolved from the key by the scheduler + # via PartitionMapper.to_partition_date. + target_partition_date: datetime | None = mapper.carry_partition_date(partition_date) + for target_key in target_keys: apdr = cls._get_or_create_apdr( target_key=target_key, + target_partition_date=target_partition_date, target_dag=target_dag, rollup_fingerprint=fingerprint, asset_id=asset_id, @@ -666,6 +683,7 @@ def _get_or_create_apdr( cls, *, target_key: str, + target_partition_date: datetime | None, target_dag: DagModel, rollup_fingerprint: dict, asset_id: int, @@ -683,6 +701,20 @@ def _get_or_create_apdr( ``rollup_fingerprint`` is the serialized mapper / window definition for all partitioned assets in the timetable at creation time; the scheduler discards APDRs whose stamp no longer matches the current timetable's fingerprint (mapper / window may have changed). + + Reconciling the carried ``partition_date`` on an existing pending APDR is best-effort: + a partitioned consumer's feeding assets are expected to agree on the partition's + datetime. The carry only matters for ``IdentityMapper`` (whose key the scheduler + cannot decode); temporal/composite feeds re-derive the date from the key at run + creation regardless of what is stored here. Within that contract: + + - If the APDR carries no date yet (``None`` — created by an event that carried none), + adopt the incoming date when this event carries one. There is nothing to conflict + with, so a later identity event's date is not dropped. + - If the APDR already carries a date and this event carries a **different** non-null + one, the producing assets disagree; picking one would be order-dependent, so the + carried date is suppressed to ``None`` (and re-adoptable by a later event). + - Otherwise (the dates agree, or this event carries none) the existing value is kept. """ with _lock_asset_model(session=session, asset_id=asset_id): latest_apdr: AssetPartitionDagRun | None = session.scalar( @@ -695,6 +727,29 @@ def _get_or_create_apdr( .limit(1) ) if latest_apdr and latest_apdr.created_dag_run_id is None: + existing_partition_date = latest_apdr.partition_date + if existing_partition_date is None: + # No carried date yet; adopt the incoming one if present (no conflict + # to resolve). Keeps a later identity event's date from being dropped. + if target_partition_date is not None: + latest_apdr.partition_date = target_partition_date + session.flush() + elif target_partition_date is not None and existing_partition_date != target_partition_date: + # Two contributing events carry conflicting partition_dates for the same + # (target_key, target_dag). Choosing one would be order-dependent, so + # suppress: the consumer DagRun gets partition_date=None rather than a + # wrong, unstable value. + log.warning( + "Conflicting partition_date carried for the same target key; " + "suppressing it so the consumer DagRun's partition_date is None. " + "The producing assets likely disagree on the partition's datetime.", + target_dag_id=target_dag.dag_id, + target_key=target_key, + existing_partition_date=existing_partition_date, + incoming_partition_date=target_partition_date, + ) + latest_apdr.partition_date = None + session.flush() cls.logger().debug( "Existing APDR found for key %s dag_id %s", target_key, @@ -707,6 +762,7 @@ def _get_or_create_apdr( target_dag_id=target_dag.dag_id, created_dag_run_id=None, partition_key=target_key, + partition_date=target_partition_date, rollup_fingerprint=rollup_fingerprint, ) session.add(apdr) diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py b/airflow-core/src/airflow/example_dags/example_asset_partition.py index baba6ced1fcc8..7c03d3f8f83f3 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_partition.py +++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py @@ -106,7 +106,7 @@ def combine_player_stats(dag_run=None): """Merge the aligned hourly partitions into a combined dataset.""" if TYPE_CHECKING: assert dag_run - print(dag_run.partition_key) + print(dag_run.partition_key, dag_run.partition_date) combine_player_stats() diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 93b155f2257ac..fc2cc2d8418c4 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2044,13 +2044,15 @@ def _resolve_partition_date( asset_infos: Iterable[tuple[str, str]], partition_key: str, dag_id: str, + carried_partition_date: datetime | None, ) -> datetime | None: """ - Return the temporal anchor (period-start datetime) for *partition_key*. + Return the ``partition_date`` the consumer Dag run should be created with. - Resolves the temporal anchor (period-start datetime) for *partition_key* - across *asset_infos* — the ``(name, uri)`` pairs of the upstream assets - that contributed to it. Each upstream mapper resolves the key via + The temporal anchor (period-start datetime) is resolved for + *partition_key* across *asset_infos* — the ``(name, uri)`` pairs of the + upstream assets that contributed to it. Each upstream mapper resolves the + key via :meth:`~airflow.partition_mappers.base.PartitionMapper.to_partition_date`: temporal mappers decode the key, composite mappers delegate to their child, and non-temporal mappers (e.g. @@ -2059,16 +2061,19 @@ def _resolve_partition_date( A partitioned consumer has a single partition identity, so every temporal mapper feeding it must resolve the same key to the same instant. Anchors are compared by instant (timezone-aware), so equivalent moments collapse - to one. When the temporal mappers agree, that anchor is returned; when - they disagree — a misconfiguration, e.g. assets mapping the same key under - different timezones — ``partition_date`` is left unset and a warning is - logged rather than silently picking one by scan order. Returns ``None`` if - no mapper is temporal. - - A failure in any mapper aborts the whole resolution and returns ``None`` - (logged) — anchors accumulated from earlier mappers are discarded rather - than used as a partial result, since a partial set could hide a conflict. - A broken mapper must not crash the scheduler tick. + to one. When the temporal mappers agree, that anchor is returned. + + When no temporal mapper contributes at all — an identity key carries no + temporal meaning and cannot be decoded back into a date — the producer's + source date carried on the APDR at queue time (*carried_partition_date*, + set only for ``IdentityMapper``) is returned instead. + + When temporal mappers were present but produced no usable anchor — they + disagreed (a misconfiguration, e.g. assets mapping the same key under + different timezones) or one raised — the conflict/error is logged and + ``None`` is returned. The carried date is deliberately *not* substituted + here: stamping it would mask the logged suppression. A broken mapper must + not crash the scheduler tick. """ anchors: set[datetime] = set() try: @@ -2086,7 +2091,12 @@ def _resolve_partition_date( return None if not anchors: - return None + # No temporal mapper contributed an anchor (e.g. an all-IdentityMapper feed), + # so fall back to the date carried on the APDR. A partitioned consumer's feeding + # assets are expected to agree on the partition's datetime; when a temporal mapper + # *does* resolve an anchor it takes precedence over the carried identity date, + # since the key is the authoritative source the scheduler can re-derive. + return carried_partition_date if len(anchors) > 1: self.log.warning( "Upstream partition mappers resolved conflicting partition_date values for the same " @@ -2288,6 +2298,7 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st asset_infos=asset_info_per_apdr[apdr.id].values(), partition_key=apdr.partition_key, dag_id=apdr.target_dag_id, + carried_partition_date=apdr.partition_date, ) dag_run = dag.create_dagrun( run_id=DagRun.generate_run_id( diff --git a/airflow-core/src/airflow/listeners/types.py b/airflow-core/src/airflow/listeners/types.py index 120b8ef503a6e..fa9ebdafaca0f 100644 --- a/airflow-core/src/airflow/listeners/types.py +++ b/airflow-core/src/airflow/listeners/types.py @@ -23,6 +23,8 @@ import attrs if TYPE_CHECKING: + from datetime import datetime + from pydantic import JsonValue from airflow.serialization.definitions.assets import SerializedAsset, SerializedAssetAlias @@ -40,3 +42,4 @@ class AssetEvent: source_map_index: int | None source_aliases: list[SerializedAssetAlias] partition_key: str | None + partition_date: datetime | None = None diff --git a/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py new file mode 100644 index 0000000000000..ba483eb88e037 --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add partition_date to asset_partition_dag_run. + +The target datetime is frozen at APDR creation time so the consumer DagRun's +``partition_date`` is consistent with the partition mapper that produced its +``partition_key``. + +Revision ID: d2f4e1b3c5a7 +Revises: 9ff64e1c35d3 +Create Date: 2026-05-21 09:00:00.000000 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +from airflow.utils.sqlalchemy import UtcDateTime + +revision = "d2f4e1b3c5a7" +down_revision = "9ff64e1c35d3" +branch_labels = None +depends_on = None +airflow_version = "3.3.0" + + +def upgrade(): + """Add partition_date column to asset_partition_dag_run.""" + with op.batch_alter_table("asset_partition_dag_run", schema=None) as batch_op: + batch_op.add_column(sa.Column("partition_date", UtcDateTime, nullable=True)) + + +def downgrade(): + """Remove partition_date column from asset_partition_dag_run.""" + with op.batch_alter_table("asset_partition_dag_run", schema=None) as batch_op: + batch_op.drop_column("partition_date") diff --git a/airflow-core/src/airflow/models/asset.py b/airflow-core/src/airflow/models/asset.py index 34e4ffaec3e38..60256f9cd8852 100644 --- a/airflow-core/src/airflow/models/asset.py +++ b/airflow-core/src/airflow/models/asset.py @@ -921,6 +921,7 @@ class AssetPartitionDagRun(Base): target_dag_id: Mapped[str] = mapped_column(StringID(), nullable=False) created_dag_run_id: Mapped[int | None] = mapped_column(Integer(), nullable=True) partition_key: Mapped[str] = mapped_column(StringID(), nullable=False) + partition_date: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True) # Serialized snapshot of the rollup definition (mapper + window for every # partitioned asset in the timetable) at the time this APDR was created. # The scheduler discards APDRs whose stored fingerprint no longer matches diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 348d6ed9813e4..47de1190b3775 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -433,6 +433,7 @@ def dag_run_data(self) -> DRDataModel: conf=self.conf, consumed_asset_events=[], partition_key=self.partition_key, + partition_date=self.partition_date, ) @property @@ -1099,6 +1100,8 @@ def _emit_dagrun_span(self, state: DagRunState): attributes["airflow.dag_run.logical_date"] = str(self.logical_date) if self.partition_key: attributes["airflow.dag_run.partition_key"] = str(self.partition_key) + if self.partition_date: + attributes["airflow.dag_run.partition_date"] = self.partition_date.isoformat() # TODO: make the empty parent context optional. Default should be to # nest the dag run span under the currently active parent span (by diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 5388b4b6b488b..b39c9f3b322d6 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1554,6 +1554,7 @@ def register_asset_changes_in_db( OutletEventPayload(extra=outlet_event["extra"], partition_key=partition_key) ) dag_run_partition_key = ti.dag_run.partition_key + dag_run_partition_date = ti.dag_run.partition_date asset_keys = { SerializedAssetUniqueKey(o.name, o.uri) @@ -1589,6 +1590,7 @@ def _register(am: AssetModel, key: SerializedAssetUniqueKey) -> None: asset=am, extra=None, partition_key=dag_run_partition_key, + partition_date=dag_run_partition_date, session=session, ) return @@ -1596,11 +1598,26 @@ def _register(am: AssetModel, key: SerializedAssetUniqueKey) -> None: effective_pk = ( payload.partition_key if payload.partition_key is not None else dag_run_partition_key ) + # Carry partition_date only when the effective key matches the + # DagRun's — the run-level date refers to the run-level key and + # would mis-label an event emitted for a different partition. + if effective_pk == dag_run_partition_key: + payload_partition_date = dag_run_partition_date + else: + payload_partition_date = None + if dag_run_partition_date is not None: + ti.log.debug( + "Task-emitted partition_key %r differs from DagRun partition_key %r; " + "consumer partition_date will be None.", + payload.partition_key, + dag_run_partition_key, + ) asset_manager.register_asset_change( task_instance=ti, asset=am, extra=payload.extra, partition_key=effective_pk, + partition_date=payload_partition_date, session=session, ) @@ -1684,6 +1701,7 @@ def _asset_event_extras_from_aliases() -> dict[tuple[SerializedAssetUniqueKey, s source_alias_names=event_aliase_names, extra=asset_event_extra, partition_key=dag_run_partition_key, + partition_date=dag_run_partition_date, session=session, ) if event is None: @@ -1696,6 +1714,7 @@ def _asset_event_extras_from_aliases() -> dict[tuple[SerializedAssetUniqueKey, s source_alias_names=event_aliase_names, extra=asset_event_extra, partition_key=dag_run_partition_key, + partition_date=dag_run_partition_date, session=session, ) diff --git a/airflow-core/src/airflow/partition_mappers/base.py b/airflow-core/src/airflow/partition_mappers/base.py index 0be82d8f6c5bf..f9d05b6d8d3c1 100644 --- a/airflow-core/src/airflow/partition_mappers/base.py +++ b/airflow-core/src/airflow/partition_mappers/base.py @@ -117,6 +117,21 @@ def to_partition_date(self, downstream_key: str) -> datetime | None: """ return None + def carry_partition_date(self, source_partition_date: datetime | None) -> datetime | None: + """ + Return the producer's ``partition_date`` to carry onto the consumer APDR. + + Captured at queue time as an asset event arrives, *source_partition_date* + is the producing run's ``partition_date``. The base implementation returns + ``None``: for most mappers the consumer's date is derived from its own + downstream key by :meth:`to_partition_date` at run creation, not carried + from the producer. + :class:`~airflow.partition_mappers.identity.IdentityMapper` overrides to + pass it through, since the consumer's key equals the producer's and the + key carries no temporal meaning to decode. + """ + return None + def serialize(self) -> dict[str, Any]: if self.max_downstream_keys is None: return {} diff --git a/airflow-core/src/airflow/partition_mappers/identity.py b/airflow-core/src/airflow/partition_mappers/identity.py index e0b2e25b1ae47..c538747e1835f 100644 --- a/airflow-core/src/airflow/partition_mappers/identity.py +++ b/airflow-core/src/airflow/partition_mappers/identity.py @@ -16,11 +16,22 @@ # under the License. from __future__ import annotations +from typing import TYPE_CHECKING + from airflow.partition_mappers.base import PartitionMapper +if TYPE_CHECKING: + from datetime import datetime + class IdentityMapper(PartitionMapper): """Partition mapper that does not change the key.""" def to_downstream(self, key: str) -> str: return key + + def carry_partition_date(self, source_partition_date: datetime | None) -> datetime | None: + # Identity passthrough: the consumer's key equals the producer's, so the + # producer's partition_date is the consumer's. to_partition_date cannot + # recover it (the key carries no temporal meaning), so it is carried here. + return source_partition_date diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 413dbef3d7e42..1d5cfc5d86be7 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3473,10 +3473,22 @@ export const $DAGRunResponse = { } ], title: 'Partition Key' + }, + partition_date: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Partition Date' } }, type: 'object', - required: ['dag_run_id', 'dag_id', 'logical_date', 'queued_at', 'start_date', 'end_date', 'duration', 'data_interval_start', 'data_interval_end', 'run_after', 'last_scheduling_decision', 'run_type', 'state', 'triggered_by', 'triggering_user_name', 'conf', 'note', 'dag_versions', 'bundle_version', 'dag_display_name', 'partition_key'], + required: ['dag_run_id', 'dag_id', 'logical_date', 'queued_at', 'start_date', 'end_date', 'duration', 'data_interval_start', 'data_interval_end', 'run_after', 'last_scheduling_decision', 'run_type', 'state', 'triggered_by', 'triggering_user_name', 'conf', 'note', 'dag_versions', 'bundle_version', 'dag_display_name', 'partition_key', 'partition_date'], title: 'DAGRunResponse', description: 'Dag Run serializer for responses.' } as const; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 31f716d85f885..f8cbb1fbc1875 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -917,6 +917,7 @@ export type DAGRunResponse = { bundle_version: string | null; dag_display_name: string; partition_key: string | null; + partition_date: string | null; }; /** diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 2155ca50d33b3..f24f3636132ae 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -116,7 +116,7 @@ class MappedClassProtocol(Protocol): "3.1.0": "cc92b33c6709", "3.1.8": "509b94a1042d", "3.2.0": "1d6611b6ab7c", - "3.3.0": "9ff64e1c35d3", + "3.3.0": "d2f4e1b3c5a7", } # Prefix used to identify tables holding data moved during migration. diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py index 89957e179c2e2..b65036b72d9b3 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py @@ -1543,6 +1543,7 @@ def test_should_respond_200(self, test_client): "dag_versions": mock.ANY, "logical_date": None, "partition_key": None, + "partition_date": None, "queued_at": mock.ANY, "run_after": mock.ANY, "start_date": None, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index d953e950dea6a..ff14aaa8980f5 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -167,6 +167,9 @@ def setup(request, dag_maker, *, session=None): # The value uses the ProductMapper default delimiter (|) to form a composite key so we can # verify that the filter treats | as a literal character, not an OR separator. dag_run1.partition_key = "2026-01-01|us" + # Set a real partition_date so the GET/list responses exercise the serialized + # (non-None) partition_date path, not just the None case. + dag_run1.partition_date = datetime(2026, 1, 1, tzinfo=timezone.utc) dag_run1.note = (DAG1_RUN1_NOTE, "not_test") # Set end_date for testing duration filter dag_run1.end_date = dag_run1.start_date + timedelta(seconds=101) @@ -317,6 +320,9 @@ def get_dag_run_dict(run: DagRun): "note": run.note, "dag_versions": get_dag_versions_dict(run.dag_versions), "partition_key": run.partition_key, + "partition_date": from_datetime_to_zulu_without_ms(run.partition_date) + if run.partition_date + else None, } @@ -2371,6 +2377,7 @@ def test_should_respond_200( "triggered_by": "rest_api", "triggering_user_name": "test", "partition_key": None, + "partition_date": None, } assert response.json() == expected_response_json @@ -2601,6 +2608,7 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client): "conf": {}, "note": note, "partition_key": None, + "partition_date": None, } assert response_2.status_code == 409 @@ -2690,6 +2698,7 @@ def test_should_respond_200_with_null_logical_date(self, test_client): "conf": {}, "note": None, "partition_key": None, + "partition_date": None, } @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index fbe7cc56e4a08..5bd6d6b36e373 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -452,6 +452,7 @@ def test_get_state(self, client, session, dag_maker): "end_date": "2025-12-13T00:00:00Z", "logical_date": None, "partition_key": None, + "partition_date": None, "run_after": "2025-12-13T00:00:00Z", "run_id": "previous", "run_type": "manual", diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index af44599776107..90325bb9c8542 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -236,6 +236,7 @@ def test_ti_run_state_to_running( "triggering_user_name": None, "consumed_asset_events": [], "partition_key": None, + "partition_date": None, "note": None, "team_name": None, }, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py new file mode 100644 index 0000000000000..b2cc016be84d7 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pytest + +from airflow._shared.timezones import timezone +from airflow.utils.state import DagRunState, State + +from tests_common.test_utils.db import clear_db_runs + +pytestmark = pytest.mark.db_test + +TIMESTAMP_STR = "2024-09-30T12:00:00Z" +TIMESTAMP = timezone.parse(TIMESTAMP_STR) +PARTITION_DATE = timezone.parse("2026-05-20T01:00:00") + +RUN_PATCH_BODY = { + "state": "running", + "hostname": "h", + "unixname": "u", + "pid": 1, + "start_date": TIMESTAMP_STR, +} + + +@pytest.fixture +def old_ver_client(client): + """Execution API version immediately before ``partition_date`` was added.""" + client.headers["Airflow-API-Version"] = "2026-06-16" + return client + + +class TestPartitionDateFieldBackwardCompat: + @pytest.fixture(autouse=True) + def _freeze_time(self, time_machine): + time_machine.move_to(TIMESTAMP_STR, tick=False) + + def setup_method(self): + clear_db_runs() + + def teardown_method(self): + clear_db_runs() + + def test_old_version_strips_partition_date_from_dag_run( + self, old_ver_client, session, create_task_instance + ): + ti = create_task_instance( + task_id="test_partition_date_downgrade", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=TIMESTAMP, + ) + ti.dag_run.partition_key = "2026-05-20" + ti.dag_run.partition_date = PARTITION_DATE + session.commit() + + response = old_ver_client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY) + assert response.status_code == 200 + dag_run = response.json()["dag_run"] + assert dag_run["partition_key"] == "2026-05-20" + assert "partition_date" not in dag_run + + def test_head_version_includes_partition_date_field(self, client, session, create_task_instance): + ti = create_task_instance( + task_id="test_partition_date_head", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=TIMESTAMP, + ) + ti.dag_run.partition_key = "2026-05-20" + ti.dag_run.partition_date = PARTITION_DATE + session.commit() + + response = client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY) + assert response.status_code == 200 + dag_run = response.json()["dag_run"] + assert dag_run["partition_key"] == "2026-05-20" + assert dag_run["partition_date"] == PARTITION_DATE.isoformat().replace("+00:00", "Z") diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 40d9401b9ffc0..59368c8b56af3 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -30,6 +30,7 @@ from airflow import settings from airflow._shared.observability.metrics.base_stats_logger import StatsLogger +from airflow._shared.timezones import timezone from airflow.assets.manager import AssetManager from airflow.models.asset import ( AssetAliasModel, @@ -260,6 +261,7 @@ def _get_or_create_apdr(): try: return AssetManager._get_or_create_apdr( target_key="test_partition_key", + target_partition_date=None, target_dag=testing_dag, rollup_fingerprint=rollup_fingerprint, asset_id=asm.id, @@ -282,6 +284,122 @@ def _get_or_create_apdr(): assert len(set(ids)) == 1 assert session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 1 + @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle") + def test_get_or_create_apdr_suppresses_conflicting_partition_date(self, session): + """Two events resolving the same target key to different dates → suppress to None. + + Rather than an order-dependent first-event-wins, conflicting carried dates produce a + deterministic ``None`` so the consumer DagRun is not stamped with a wrong, unstable date. + """ + asm = AssetModel(uri="test://asset1/", name="partition_asset", group="asset") + testing_dag = DagModel(dag_id="testing_dag_pd_conflict", is_stale=False, bundle_name="testing") + session.add_all([asm, testing_dag]) + session.commit() + fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": {}}} + + first = AssetManager._get_or_create_apdr( + target_key="2026-05-20", + target_partition_date=timezone.parse("2026-05-20T00:00:00"), + target_dag=testing_dag, + rollup_fingerprint=fp, + asset_id=asm.id, + session=session, + ) + assert first.partition_date == timezone.parse("2026-05-20T00:00:00") + + # A second contributing event resolves the same key to a DIFFERENT date. + second = AssetManager._get_or_create_apdr( + target_key="2026-05-20", + target_partition_date=timezone.parse("2026-05-21T00:00:00"), + target_dag=testing_dag, + rollup_fingerprint=fp, + asset_id=asm.id, + session=session, + ) + assert second.id == first.id # same pending APDR + assert second.partition_date is None # conflict suppressed, deterministic + + @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle") + def test_get_or_create_apdr_keeps_agreeing_partition_date(self, session): + """A later event carrying the same (or no) date does not trip the conflict suppression.""" + asm = AssetModel(uri="test://asset1/", name="partition_asset", group="asset") + testing_dag = DagModel(dag_id="testing_dag_pd_agree", is_stale=False, bundle_name="testing") + session.add_all([asm, testing_dag]) + session.commit() + fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": {}}} + source_date = timezone.parse("2026-05-20T00:00:00") + + kwargs = dict( + target_key="2026-05-20", + target_dag=testing_dag, + rollup_fingerprint=fp, + asset_id=asm.id, + session=session, + ) + first = AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs) + # Same date agrees → kept. + same = AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs) + assert same.id == first.id + assert same.partition_date == source_date + # A None-carrying event (e.g. a temporal mapper, resolved by the scheduler) is not a + # conflict → the existing date is kept. + with_none = AssetManager._get_or_create_apdr(target_partition_date=None, **kwargs) + assert with_none.id == first.id + assert with_none.partition_date == source_date + + @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle") + def test_get_or_create_apdr_adopts_date_when_existing_is_none(self, session): + """An APDR created with no date adopts a later event's carried date (not dropped).""" + asm = AssetModel(uri="test://asset1/", name="partition_asset", group="asset") + testing_dag = DagModel(dag_id="testing_dag_pd_adopt", is_stale=False, bundle_name="testing") + session.add_all([asm, testing_dag]) + session.commit() + fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": {}}} + source_date = timezone.parse("2026-05-20T00:00:00") + + kwargs = dict( + target_key="2026-05-20", + target_dag=testing_dag, + rollup_fingerprint=fp, + asset_id=asm.id, + session=session, + ) + # First event carries no date (e.g. producer had no partition_date). + first = AssetManager._get_or_create_apdr(target_partition_date=None, **kwargs) + assert first.partition_date is None + # A later identity event carries a real date → adopted, not silently dropped. + adopted = AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs) + assert adopted.id == first.id + assert adopted.partition_date == source_date + + @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle") + def test_get_or_create_apdr_recovers_after_conflict(self, session): + """Once a conflict has suppressed the date to None, a later event re-adopts a date.""" + asm = AssetModel(uri="test://asset1/", name="partition_asset", group="asset") + testing_dag = DagModel(dag_id="testing_dag_pd_recover", is_stale=False, bundle_name="testing") + session.add_all([asm, testing_dag]) + session.commit() + fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": {}}} + date_1 = timezone.parse("2026-05-20T00:00:00") + date_2 = timezone.parse("2026-05-21T00:00:00") + + kwargs = dict( + target_key="2026-05-20", + target_dag=testing_dag, + rollup_fingerprint=fp, + asset_id=asm.id, + session=session, + ) + first = AssetManager._get_or_create_apdr(target_partition_date=date_1, **kwargs) + assert first.partition_date == date_1 + # Conflicting date suppresses to None. + conflicted = AssetManager._get_or_create_apdr(target_partition_date=date_2, **kwargs) + assert conflicted.partition_date is None + # A subsequent event re-adopts (suppression is not permanently sticky). + recovered = AssetManager._get_or_create_apdr(target_partition_date=date_2, **kwargs) + assert recovered.id == first.id + assert recovered.partition_date == date_2 + @pytest.mark.need_serialized_dag @pytest.mark.usefixtures("testing_dag_bundle") def test_queue_partitioned_dags_stamps_rollup_fingerprint(self, session, dag_maker): diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index b312f9fd1b0a8..cbe2b0ca155e1 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -10250,6 +10250,7 @@ def _produce_and_register_asset_event( session: Session, dag_maker: DagMaker, expected_partition_key: str | None = None, + partition_date: datetime.datetime | None = None, ) -> AssetPartitionDagRun: if expected_partition_key is None: expected_partition_key = partition_key @@ -10257,7 +10258,11 @@ def _produce_and_register_asset_event( with dag_maker(dag_id=dag_id, schedule=PartitionAtRuntime(), session=session) as dag: EmptyOperator(task_id="hi", outlets=[asset]) - dr = dag_maker.create_dagrun(partition_key=partition_key, session=session) + dr = dag_maker.create_dagrun( + partition_key=partition_key, + partition_date=partition_date, + session=session, + ) [ti] = dr.get_task_instances(session=session) session.commit() @@ -10479,6 +10484,289 @@ def test_partitioned_dag_run_with_customized_mapper( assert asset_event.source_run_id == "test" +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +def test_consumer_dag_run_partition_date_identity_passthrough(dag_maker: DagMaker, session: Session): + """IdentityMapper can't reconstruct a date from its key, so the scheduler resolver falls + back to the producer's source date carried on the APDR and stamps it on the consumer DagRun. + + Temporal and composite mappers are resolved by the scheduler via to_partition_date (covered + by the partition_mapper and resolver tests); this exercises the IdentityMapper carry, which + is the one case the scheduler cannot resolve from the key alone. + """ + asset_1 = Asset(name="asset-1") + source_partition_date = pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC") + + with dag_maker( + dag_id="asset-event-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=IdentityMapper(), + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + + apdr = _produce_and_register_asset_event( + dag_id="asset-event-producer", + asset=asset_1, + partition_key="2026-05-20T01:00:00", + partition_date=source_partition_date, + session=session, + dag_maker=dag_maker, + expected_partition_key="2026-05-20T01:00:00", + ) + partition_dags = runner._create_dagruns_for_partitioned_asset_dags(session=session) + + session.refresh(apdr) + assert apdr.created_dag_run_id is not None + assert partition_dags == {"asset-event-consumer"} + + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_key == "2026-05-20T01:00:00" + assert dag_run.partition_date == source_partition_date + + +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +@mock.patch.object(SchedulerJobRunner, "_resolve_partition_date", autospec=True, return_value=None) +def test_consumer_dag_run_partition_date_not_masked_when_resolver_suppresses( + mock_resolve, dag_maker: DagMaker, session: Session +): + """A carried IdentityMapper date must not mask a resolver suppression. + + When temporal mappers feeding the same APDR conflict (or one raises), the resolver + deliberately returns None and logs it; the carried date is only ever applied inside the + resolver, never at the call site. Here the APDR carries a date (IdentityMapper) but the + resolver returns None, and the consumer DagRun's partition_date must stay None — a + regression re-adding a call-site fallback to ``apdr.partition_date`` would fail this. + """ + asset_1 = Asset(name="asset-1") + source_partition_date = pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC") + + with dag_maker( + dag_id="asset-event-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=IdentityMapper(), + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + + apdr = _produce_and_register_asset_event( + dag_id="asset-event-producer", + asset=asset_1, + partition_key="2026-05-20T01:00:00", + partition_date=source_partition_date, + session=session, + dag_maker=dag_maker, + expected_partition_key="2026-05-20T01:00:00", + ) + session.refresh(apdr) + # The IdentityMapper carry is stored on the APDR... + assert apdr.partition_date == source_partition_date + + runner._create_dagruns_for_partitioned_asset_dags(session=session) + + session.refresh(apdr) + assert apdr.created_dag_run_id is not None + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_key == "2026-05-20T01:00:00" + # ...but the resolver suppressed a date, so the call site must NOT substitute the carry. + assert dag_run.partition_date is None + + +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +def test_consumer_dag_run_partition_date_none_for_non_temporal_mapper( + dag_maker: DagMaker, + session: Session, + custom_partition_mapper_patch: Callable[[], ExitStack], +): + """For mappers that aren't temporal/identity, the consumer DagRun's partition_date stays None.""" + asset_1 = Asset(name="asset-1") + + with custom_partition_mapper_patch(): + with dag_maker( + dag_id="asset-event-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=Key1Mapper(), # type: ignore[arg-type] + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + with custom_partition_mapper_patch(): + apdr = _produce_and_register_asset_event( + dag_id="asset-event-producer", + asset=asset_1, + partition_key="this-is-not-key-1-before-mapped", + partition_date=pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC"), + session=session, + dag_maker=dag_maker, + expected_partition_key="key-1", + ) + runner._create_dagruns_for_partitioned_asset_dags(session=session) + + session.refresh(apdr) + assert apdr.created_dag_run_id is not None + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_key == "key-1" + assert dag_run.partition_date is None + + +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +def test_consumer_dag_run_partition_date_is_none_when_source_has_no_date( + dag_maker: DagMaker, session: Session +): + """When the producer DagRun has no partition_date, IdentityMapper passes None through.""" + asset_1 = Asset(name="asset-1") + + with dag_maker( + dag_id="asset-event-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=IdentityMapper(), + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + + apdr = _produce_and_register_asset_event( + dag_id="asset-event-producer", + asset=asset_1, + partition_key="2026-05-20T01:00:00", + partition_date=None, + session=session, + dag_maker=dag_maker, + expected_partition_key="2026-05-20T01:00:00", + ) + runner._create_dagruns_for_partitioned_asset_dags(session=session) + + session.refresh(apdr) + assert apdr.created_dag_run_id is not None + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_key == "2026-05-20T01:00:00" + assert dag_run.partition_date is None + + +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +def test_consumer_dag_run_partition_date_is_none_when_task_key_diverges( + dag_maker: DagMaker, session: Session +): + """A task-emitted partition_key differing from the DagRun's drops the source date. + + The producer DagRun carries a partition_date, but the task emits an outlet event with a + different partition_key. The run-level date refers to the run-level key, so it must not be + carried onto the divergent partition: the APDR (and the consumer DagRun created from it) keep + partition_date None even though the producer run had one. + """ + asset_1 = Asset(name="asset-1") + + with dag_maker( + dag_id="asset-event-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=IdentityMapper(), + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + + with dag_maker( + dag_id="asset-event-producer", + schedule=PartitionAtRuntime(), + session=session, + ) as dag: + EmptyOperator(task_id="hi", outlets=[asset_1]) + + dr = dag_maker.create_dagrun( + partition_key="scheduler-key", + partition_date=pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC"), + session=session, + ) + [ti] = dr.get_task_instances(session=session) + session.commit() + + serialized_outlets = dag.get_task("hi").outlets + TaskInstance.register_asset_changes_in_db( + ti=ti, + task_outlets=[o.asprofile() for o in serialized_outlets], + outlet_events=[ + { + "dest_asset_key": {"name": "asset-1", "uri": "asset-1"}, + "extra": {}, + "partition_key": "task-key", + }, + ], + session=session, + ) + session.commit() + + event = session.scalar( + select(AssetEvent).where( + AssetEvent.source_dag_id == dag.dag_id, + AssetEvent.source_run_id == dr.run_id, + ) + ) + assert event is not None + assert event.partition_key == "task-key" + + apdr = session.scalar( + select(AssetPartitionDagRun) + .join( + PartitionedAssetKeyLog, + PartitionedAssetKeyLog.asset_partition_dag_run_id == AssetPartitionDagRun.id, + ) + .where(PartitionedAssetKeyLog.asset_event_id == event.id) + ) + assert apdr is not None + # Divergent key → the threaded source date is dropped to None at APDR creation. + assert apdr.partition_key == "task-key" + assert apdr.partition_date is None + + runner._create_dagruns_for_partitioned_asset_dags(session=session) + + session.refresh(apdr) + assert apdr.created_dag_run_id is not None + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_key == "task-key" + assert dag_run.partition_date is None + + @pytest.mark.need_serialized_dag @pytest.mark.usefixtures("clear_asset_partition_rows") def test_consumer_dag_listen_to_two_partitioned_asset( @@ -12265,25 +12553,39 @@ def _make_runner() -> SchedulerJobRunner: ) +_CARRIED_DATE = datetime.datetime(2026, 5, 20, 1, 0, 0, tzinfo=datetime.timezone.utc) + + @pytest.mark.parametrize( - ("mappers", "partition_key", "expected"), + ("mappers", "partition_key", "carried_partition_date", "expected"), [ - # Non-temporal mapper → no anchor. - pytest.param([CoreIdentityMapper()], "some-key", None, id="non-temporal-none"), + # Non-temporal mapper, nothing carried → None. + pytest.param([CoreIdentityMapper()], "some-key", None, None, id="non-temporal-none"), + # Non-temporal mapper with a carried producer date (IdentityMapper) → the carry. + pytest.param( + [CoreIdentityMapper()], + "some-key", + _CARRIED_DATE, + _CARRIED_DATE, + id="non-temporal-returns-carried-date", + ), # StartOfDayMapper(NY): "2024-03-15" → NY midnight = 04:00 UTC (EDT, DST since 2024-03-10), # localised with the mapper's own timezone rather than the global default. pytest.param( [CoreStartOfDayMapper(timezone="America/New_York")], "2024-03-15", + None, datetime.datetime(2024, 3, 15, 4, 0, 0, tzinfo=datetime.timezone.utc), id="non-utc-uses-mapper-timezone", ), - # Key cannot be decoded by the mapper's format → caught → None (no raise). - pytest.param([CoreStartOfDayMapper()], "not-a-date", None, id="decode-failure-none"), + # Key cannot be decoded by the mapper's format → caught → None, and the carried + # date is NOT substituted (the error is logged; masking it would hide that). + pytest.param([CoreStartOfDayMapper()], "not-a-date", _CARRIED_DATE, None, id="decode-failure-none"), # FanOutMapper unwraps to its downstream_mapper (daily), which owns the per-day key. pytest.param( [CoreFanOutMapper(upstream_mapper=CoreStartOfWeekMapper(), window=CoreWeekWindow())], "2024-01-16", + None, datetime.datetime(2024, 1, 16, 0, 0, 0, tzinfo=datetime.timezone.utc), id="fanout-uses-downstream-mapper", ), @@ -12291,13 +12593,16 @@ def _make_runner() -> SchedulerJobRunner: pytest.param( [CoreStartOfDayMapper(), CoreStartOfDayMapper()], "2024-03-15", + None, datetime.datetime(2024, 3, 15, 0, 0, 0, tzinfo=datetime.timezone.utc), id="agreeing-mappers-anchor", ), - # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct instants → None. + # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct instants → None, + # and the carried date is NOT substituted (it would mask the logged conflict). pytest.param( [CoreStartOfDayMapper(timezone="UTC"), CoreStartOfDayMapper(timezone="America/New_York")], "2024-03-15", + _CARRIED_DATE, None, id="conflicting-mappers-none", ), @@ -12306,15 +12611,19 @@ def _make_runner() -> SchedulerJobRunner: pytest.param( [CoreStartOfDayMapper(), CoreStartOfHourMapper()], "2024-03-15", + _CARRIED_DATE, None, id="one-failing-mapper-aborts", ), ], ) -def test_resolve_partition_date(mappers, partition_key, expected): +def test_resolve_partition_date(mappers, partition_key, carried_partition_date, expected): """_resolve_partition_date over mapper compositions: temporal / fan-out / agree / conflict / failure. - The mappers are consumed one per upstream asset, so ``asset_infos`` is sized to ``mappers``. + The carried date (the producer's source date stamped on the APDR, set only for IdentityMapper) + is returned only when no temporal mapper contributes an anchor. On a conflict or a mapper + error the result is None — the carry must not mask the logged suppression. The mappers are + consumed one per upstream asset, so ``asset_infos`` is sized to ``mappers``. """ runner = _make_runner() timetable = mock.MagicMock() @@ -12326,5 +12635,6 @@ def test_resolve_partition_date(mappers, partition_key, expected): asset_infos=asset_infos, partition_key=partition_key, dag_id="test-dag", + carried_partition_date=carried_partition_date, ) assert result == expected diff --git a/airflow-core/tests/unit/partition_mappers/test_base.py b/airflow-core/tests/unit/partition_mappers/test_base.py index e7b0ba02f07f6..ba48057cc12f2 100644 --- a/airflow-core/tests/unit/partition_mappers/test_base.py +++ b/airflow-core/tests/unit/partition_mappers/test_base.py @@ -17,6 +17,7 @@ from __future__ import annotations import re +from datetime import datetime, timezone import pytest @@ -29,6 +30,17 @@ from airflow.serialization.enums import Encoding +class TestCarryPartitionDate: + def test_base_returns_none_by_default(self): + """Non-identity mappers don't carry the producer's date; it's derived from the key instead.""" + dt = datetime(2026, 5, 20, 1, 0, 0, tzinfo=timezone.utc) + assert StartOfDayMapper().carry_partition_date(dt) is None + assert ( + RollupMapper(upstream_mapper=StartOfDayMapper(), window=DayWindow()).carry_partition_date(dt) + is None + ) + + class TestPartitionMapperInitSubclass: """Verify that __init_subclass__ enforces the decode/encode pair contract.""" diff --git a/airflow-core/tests/unit/partition_mappers/test_identity.py b/airflow-core/tests/unit/partition_mappers/test_identity.py index 9a97e583db562..b992e76430e27 100644 --- a/airflow-core/tests/unit/partition_mappers/test_identity.py +++ b/airflow-core/tests/unit/partition_mappers/test_identity.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from datetime import datetime, timezone + from airflow.partition_mappers.identity import IdentityMapper from airflow.serialization.decoders import decode_partition_mapper from airflow.serialization.encoders import encode_partition_mapper @@ -27,6 +29,13 @@ def test_to_downstream(self): pm = IdentityMapper() assert pm.to_downstream("key") == "key" + def test_carry_partition_date_passes_source_through(self): + """IdentityMapper carries the producer's date through (its key can't reconstruct one).""" + pm = IdentityMapper() + dt = datetime(2026, 5, 20, 1, 0, 0, tzinfo=timezone.utc) + assert pm.carry_partition_date(dt) == dt + assert pm.carry_partition_date(None) is None + def test_serialize(self): pm = IdentityMapper() assert pm.serialize() == {} diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 779bb42319193..cb9a1ca15a3cc 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1774,6 +1774,7 @@ class DAGRunResponse(BaseModel): bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None dag_display_name: Annotated[str, Field(title="Dag Display Name")] partition_key: Annotated[str | None, Field(title="Partition Key")] = None + partition_date: Annotated[datetime | None, Field(title="Partition Date")] = None class DAGRunsBatchBody(BaseModel): diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index fb058b924b6ca..8698ddc403b60 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -470,6 +470,8 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): "prev_execution_date", "prev_execution_date_success", } + if AIRFLOW_V_3_3_PLUS: + PENDULUM_SERIALIZABLE_CONTEXT_KEYS.add("partition_date") AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = { "macros", diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 812ca4d041833..bc03569386d14 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -775,6 +775,7 @@ class DagRun(BaseModel): triggering_user_name: Annotated[str | None, Field(title="Triggering User Name")] = None consumed_asset_events: Annotated[list[AssetEventDagRunReference], Field(title="Consumed Asset Events")] partition_key: Annotated[str | None, Field(title="Partition Key")] = None + partition_date: Annotated[AwareDatetime | None, Field(title="Partition Date")] = None note: Annotated[str | None, Field(title="Note")] = None team_name: Annotated[str | None, Field(title="Team Name")] = None diff --git a/task-sdk/src/airflow/sdk/definitions/context.py b/task-sdk/src/airflow/sdk/definitions/context.py index 462af80aeca4a..9d36203c2ebbf 100644 --- a/task-sdk/src/airflow/sdk/definitions/context.py +++ b/task-sdk/src/airflow/sdk/definitions/context.py @@ -64,6 +64,7 @@ class Context(TypedDict, total=False): outlets: list params: dict[str, Any] partition_key: NotRequired[str | None] + partition_date: NotRequired[DateTime | None] prev_data_interval_start_success: NotRequired[DateTime | None] prev_data_interval_end_success: NotRequired[DateTime | None] prev_start_date_success: NotRequired[DateTime | None] diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json index 4807d9f53b353..b75957870d31b 100644 --- a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json +++ b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json @@ -1277,6 +1277,19 @@ "default": null, "title": "Partition Key" }, + "partition_date": { + "anyOf": [ + { + "format": "date-time", + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Partition Date" + }, "note": { "anyOf": [ { @@ -4640,6 +4653,19 @@ ], "title": "Partition Key" }, + "partition_date": { + "anyOf": [ + { + "format": "date-time", + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Partition Date" + }, "note": { "anyOf": [ { diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 7eb52038ce9e3..a39e2f91b53de 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -334,6 +334,7 @@ def get_template_context(self) -> Context: # TODO: Assess if we need to pass these through timezone.coerce_datetime "dag_run": dag_run, # type: ignore[typeddict-item] # Removable after #46522 "partition_key": dag_run.partition_key, + "partition_date": coerce_datetime(dag_run.partition_date), "triggering_asset_events": TriggeringAssetEventsAccessor.build( AssetEventDagRunReferenceResult.from_asset_event_dag_run_reference(event) for event in dag_run.consumed_asset_events diff --git a/task-sdk/src/airflow/sdk/types.py b/task-sdk/src/airflow/sdk/types.py index 029af34496f07..efa3c9f6d9426 100644 --- a/task-sdk/src/airflow/sdk/types.py +++ b/task-sdk/src/airflow/sdk/types.py @@ -123,6 +123,7 @@ class DagRunProtocol(Protocol): triggering_user_name: str | None consumed_asset_events: list[AssetEventDagRunReference] partition_key: str | None + partition_date: AwareDatetime | None note: str | None diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 4893258195446..bd50711eceaa8 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2397,6 +2397,7 @@ class RequestTestCase: "run_id": "prev_run", "logical_date": timezone.parse("2024-01-14T12:00:00Z"), "partition_key": None, + "partition_date": None, "run_type": "scheduled", "start_date": timezone.parse("2024-01-15T12:00:00Z"), "run_after": timezone.parse("2024-01-15T12:00:00Z"), @@ -2450,6 +2451,7 @@ class RequestTestCase: "run_id": "prev_run", "logical_date": timezone.parse("2024-01-14T12:00:00Z"), "partition_key": None, + "partition_date": None, "run_type": "scheduled", "start_date": timezone.parse("2024-01-15T12:00:00Z"), "run_after": timezone.parse("2024-01-15T12:00:00Z"), diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index be5ea4165adbf..c3c9b4295b42b 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -2001,6 +2001,7 @@ def test_get_context_with_ti_context_from_server(self, create_runtime_ti, mock_s "ts_nodash": "20241201T010000", "ts_nodash_with_tz": "20241201T010000+0000", "partition_key": dr.partition_key, + "partition_date": dr.partition_date, } def test_partition_key_in_context(self, create_runtime_ti, mock_supervisor_comms): @@ -2027,6 +2028,37 @@ def test_partition_key_in_context(self, create_runtime_ti, mock_supervisor_comms context = runtime_ti.get_template_context() assert context["partition_key"] == "some-partition" + def test_partition_date_in_context(self, create_runtime_ti, mock_supervisor_comms): + """Test that partition_date from dag_run is exposed in the template context.""" + task = BaseOperator(task_id="hello") + runtime_ti = create_runtime_ti(task=task, dag_id="basic_task") + + dr = runtime_ti._ti_context_from_server.dag_run + + mock_supervisor_comms.send.return_value = PrevSuccessfulDagRunResult( + data_interval_end=dr.logical_date - timedelta(hours=1), + data_interval_start=dr.logical_date - timedelta(hours=2), + start_date=dr.start_date - timedelta(hours=1), + end_date=dr.start_date, + ) + + context = runtime_ti.get_template_context() + + # Default: partition_date is None + assert context["partition_date"] is None + + # Set partition_date on dag_run and verify it surfaces in context + partition_date = timezone.datetime(2026, 5, 20, 1, 0, 0) + dr.partition_date = partition_date + context = runtime_ti.get_template_context() + assert context["partition_date"] == partition_date + + # Naive datetime is coerced to tz-aware so Jinja `| ds` / `| ts` filters + # operate on a real awareness boundary. + dr.partition_date = datetime(2026, 5, 20, 1, 0, 0) + context = runtime_ti.get_template_context() + assert context["partition_date"].tzinfo is not None + def test_lazy_loading_not_triggered_until_accessed(self, create_runtime_ti, mock_supervisor_comms): """Ensure lazy-loaded attributes are not resolved until accessed.""" task = BaseOperator(task_id="hello")