diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst b/airflow-core/docs/authoring-and-scheduling/assets.rst index 994cae815d397..d039fbfd1ffd3 100644 --- a/airflow-core/docs/authoring-and-scheduling/assets.rst +++ b/airflow-core/docs/authoring-and-scheduling/assets.rst @@ -631,7 +631,19 @@ partition match can be produced, so the downstream Dag is not triggered for that key. Inside partitioned Dag runs, access the resolved partition through -``dag_run.partition_key``. +``dag_run.partition_key``. When the consumer's partition mapper can +resolve the key to a ``datetime``, that value is also available as +``dag_run.partition_date``, so templates can use +``{{ partition_date | ds }}``. This covers the ``StartOf*Mapper`` family +(which decode the key directly), ``IdentityMapper`` (which carries the +producer's ``partition_date`` through), and composite mappers — +``RollupMapper``, ``ChainMapper`` and ``FanOutMapper`` — whose effective +child mapper is temporal (they delegate the anchor to that child). +Mappers whose key carries no temporal meaning (``ProductMapper``, +``AllowedKeyMapper`` and custom mappers that do not implement +``to_partition_date``) leave ``partition_date`` ``None`` even when the +resulting key is date-shaped, so those consumers should keep parsing +``partition_key``. You can also trigger a DagRun manually with a partition key (for example, through the Trigger Dag window in the UI, or through the REST API by diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index a69a8aa205825..d2cfde623523f 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``9ff64e1c35d3`` (head) | ``dd5f3a8e2b91`` | ``3.3.0`` | Add indexes on dag_run.created_dag_version_id and | +| ``d2f4e1b3c5a7`` (head) | ``9ff64e1c35d3`` | ``3.3.0`` | Add partition_date to asset_partition_dag_run. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``9ff64e1c35d3`` | ``dd5f3a8e2b91`` | ``3.3.0`` | Add indexes on dag_run.created_dag_version_id and | | | | | task_instance.dag_version_id. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``dd5f3a8e2b91`` | ``c20871fbf23a`` | ``3.3.0`` | Add rollup_fingerprint to AssetPartitionDagRun and index | diff --git a/airflow-core/docs/templates-ref.rst b/airflow-core/docs/templates-ref.rst index b1609ff0c943e..aeb907dbe470f 100644 --- a/airflow-core/docs/templates-ref.rst +++ b/airflow-core/docs/templates-ref.rst @@ -87,6 +87,9 @@ Variable Type Description | is enabled in ``airflow.cfg``. ``{{ partition_key }}`` str | None | The partition key from the current :class:`~airflow.models.dagrun.DagRun`. | Returns ``None`` if no partition key was set. Added in version 3.3.0. +``{{ partition_date }}`` datetime | None | The partition datetime from the current :class:`~airflow.models.dagrun.DagRun`. + | Use ``{{ partition_date | ds }}`` and related filters for formatting. + | Returns ``None`` if no partition date was set. Added in version 3.3.0. ``{{ var.value }}`` Airflow variables. See `Airflow Variables in Templates`_ below. ``{{ var.json }}`` Airflow variables. See `Airflow Variables in Templates`_ below. ``{{ conn }}`` Airflow connections. See `Airflow Connections in Templates`_ below. diff --git a/airflow-core/newsfragments/67285.feature.rst b/airflow-core/newsfragments/67285.feature.rst new file mode 100644 index 0000000000000..2c4370d5ff63a --- /dev/null +++ b/airflow-core/newsfragments/67285.feature.rst @@ -0,0 +1 @@ +Propagate ``partition_date`` from producer DagRuns to consumers of partitioned assets, so date-shaped partitions are available in consumer task templates. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 033437e2ad98b..0a530578644d7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -119,6 +119,7 @@ class DAGRunResponse(BaseModel): bundle_version: str | None dag_display_name: str = Field(validation_alias=AliasPath("dag_model", "dag_display_name")) partition_key: str | None + partition_date: datetime | None class DAGRunCollectionResponse(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index f82e665c9bea9..639f6fb990461 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -13704,6 +13704,12 @@ components: - type: string - type: 'null' title: Partition Key + partition_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Partition Date type: object required: - dag_run_id @@ -13727,6 +13733,7 @@ components: - bundle_version - dag_display_name - partition_key + - partition_date title: DAGRunResponse description: Dag Run serializer for responses. DAGRunsBatchBody: diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index fe7f5a5ce050d..bf20bcbc30c55 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -341,6 +341,7 @@ class DagRun(StrictBaseModel): triggering_user_name: str | None = None consumed_asset_events: list[AssetEventDagRunReference] partition_key: str | None + partition_date: UtcDateTime | None = None note: str | None = None team_name: str | None = None diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 332ddb28704ed..dc7035d31e3c9 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -44,6 +44,7 @@ AddAssetsByAliasEndpoint, AddAwaitingInputStatePayload, AddConnectionTestEndpoint, + AddPartitionDateField, AddRetryPolicyFields, AddTaskAndAssetStateStoreEndpoints, AddTaskInstanceQueueField, @@ -63,6 +64,7 @@ AddTeamNameField, AddTaskAndAssetStateStoreEndpoints, AddAssetsByAliasEndpoint, + AddPartitionDateField, ), Version( "2026-04-06", diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py index cbd801c0a9b0b..e89e2ed04cc5d 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py @@ -127,3 +127,17 @@ class AddTaskAndAssetStateStoreEndpoints(VersionChange): endpoint("/store/asset/by-uri/value", ["DELETE"]).didnt_exist, endpoint("/store/asset/by-uri/clear", ["DELETE"]).didnt_exist, ) + + +class AddPartitionDateField(VersionChange): + """Expose the consumer DagRun's partition datetime on the execution API so consumer tasks can template it.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = (schema(DagRun).field("partition_date").didnt_exist,) + + @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] + def remove_partition_date_from_dag_run(response: ResponseInfo) -> None: # type: ignore[misc] + """Strip ``partition_date`` from the nested ``dag_run`` payload for older clients.""" + if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): + response.body["dag_run"].pop("partition_date", None) diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index c8d2edef4f104..567c45a05bbd9 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -49,6 +49,8 @@ from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks if TYPE_CHECKING: + from datetime import datetime + from sqlalchemy.orm.session import Session from airflow.models.dag import DagModel @@ -274,6 +276,7 @@ def register_asset_change( source_alias_names: Collection[str] = (), session: Session, partition_key: str | None = None, + partition_date: datetime | None = None, source_is_api: bool = False, api_user_teams: set[str] | None = None, api_allow_consumer_teams: list[str] | None = None, @@ -394,6 +397,7 @@ def register_asset_change( source_map_index=asset_event.source_map_index, source_aliases=[aam.to_serialized() for aam in asset_alias_models], partition_key=partition_key, + partition_date=partition_date, ) ) @@ -440,6 +444,7 @@ def register_asset_change( asset_id=asset_model.id, dags_to_queue=dags_to_queue, partition_key=partition_key, + partition_date=partition_date, event=asset_event, task_instance=task_instance, session=session, @@ -485,6 +490,7 @@ def _queue_dagruns( asset_id: int, dags_to_queue: set[DagModel], partition_key: str | None, + partition_date: datetime | None, event: AssetEvent, task_instance: TaskInstance | None, session: Session, @@ -499,6 +505,7 @@ def _queue_dagruns( partition_dags=partition_dags, event=event, partition_key=partition_key, + partition_date=partition_date, task_instance=task_instance, session=session, ) @@ -527,6 +534,7 @@ def _queue_partitioned_dags( partition_dags: Iterable[DagModel], event: AssetEvent, partition_key: str | None, + partition_date: datetime | None, task_instance: TaskInstance | None, session: Session, ) -> None: @@ -574,9 +582,9 @@ def _queue_partitioned_dags( if (asset_model := session.scalar(select(AssetModel).where(AssetModel.id == asset_id))) is None: raise RuntimeError(f"Could not find asset for asset_id={asset_id}") + mapper = timetable.get_partition_mapper(name=asset_model.name, uri=asset_model.uri) try: # We'll need to catch every possible exception happen when mapping partition_key. - mapper = timetable.get_partition_mapper(name=asset_model.name, uri=asset_model.uri) target_key = mapper.to_downstream(partition_key) except Exception as err: log.exception( @@ -643,9 +651,18 @@ def _queue_partitioned_dags( ) continue + # The producer's partition_date (threaded in from its DagRun via + # register_asset_change) is carried onto the APDR only by mappers that + # opt in. IdentityMapper does, since its key carries no temporal meaning + # for the scheduler to re-derive at run creation; temporal and composite + # mappers return None here and are resolved from the key by the scheduler + # via PartitionMapper.to_partition_date. + target_partition_date: datetime | None = mapper.carry_partition_date(partition_date) + for target_key in target_keys: apdr = cls._get_or_create_apdr( target_key=target_key, + target_partition_date=target_partition_date, target_dag=target_dag, rollup_fingerprint=fingerprint, asset_id=asset_id, @@ -666,6 +683,7 @@ def _get_or_create_apdr( cls, *, target_key: str, + target_partition_date: datetime | None, target_dag: DagModel, rollup_fingerprint: dict, asset_id: int, @@ -683,6 +701,20 @@ def _get_or_create_apdr( ``rollup_fingerprint`` is the serialized mapper / window definition for all partitioned assets in the timetable at creation time; the scheduler discards APDRs whose stamp no longer matches the current timetable's fingerprint (mapper / window may have changed). + + Reconciling the carried ``partition_date`` on an existing pending APDR is best-effort: + a partitioned consumer's feeding assets are expected to agree on the partition's + datetime. The carry only matters for ``IdentityMapper`` (whose key the scheduler + cannot decode); temporal/composite feeds re-derive the date from the key at run + creation regardless of what is stored here. Within that contract: + + - If the APDR carries no date yet (``None`` — created by an event that carried none), + adopt the incoming date when this event carries one. There is nothing to conflict + with, so a later identity event's date is not dropped. + - If the APDR already carries a date and this event carries a **different** non-null + one, the producing assets disagree; picking one would be order-dependent, so the + carried date is suppressed to ``None`` (and re-adoptable by a later event). + - Otherwise (the dates agree, or this event carries none) the existing value is kept. """ with _lock_asset_model(session=session, asset_id=asset_id): latest_apdr: AssetPartitionDagRun | None = session.scalar( @@ -695,6 +727,29 @@ def _get_or_create_apdr( .limit(1) ) if latest_apdr and latest_apdr.created_dag_run_id is None: + existing_partition_date = latest_apdr.partition_date + if existing_partition_date is None: + # No carried date yet; adopt the incoming one if present (no conflict + # to resolve). Keeps a later identity event's date from being dropped. + if target_partition_date is not None: + latest_apdr.partition_date = target_partition_date + session.flush() + elif target_partition_date is not None and existing_partition_date != target_partition_date: + # Two contributing events carry conflicting partition_dates for the same + # (target_key, target_dag). Choosing one would be order-dependent, so + # suppress: the consumer DagRun gets partition_date=None rather than a + # wrong, unstable value. + log.warning( + "Conflicting partition_date carried for the same target key; " + "suppressing it so the consumer DagRun's partition_date is None. " + "The producing assets likely disagree on the partition's datetime.", + target_dag_id=target_dag.dag_id, + target_key=target_key, + existing_partition_date=existing_partition_date, + incoming_partition_date=target_partition_date, + ) + latest_apdr.partition_date = None + session.flush() cls.logger().debug( "Existing APDR found for key %s dag_id %s", target_key, @@ -707,6 +762,7 @@ def _get_or_create_apdr( target_dag_id=target_dag.dag_id, created_dag_run_id=None, partition_key=target_key, + partition_date=target_partition_date, rollup_fingerprint=rollup_fingerprint, ) session.add(apdr) diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py b/airflow-core/src/airflow/example_dags/example_asset_partition.py index baba6ced1fcc8..7c03d3f8f83f3 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_partition.py +++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py @@ -106,7 +106,7 @@ def combine_player_stats(dag_run=None): """Merge the aligned hourly partitions into a combined dataset.""" if TYPE_CHECKING: assert dag_run - print(dag_run.partition_key) + print(dag_run.partition_key, dag_run.partition_date) combine_player_stats() diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 93b155f2257ac..fc2cc2d8418c4 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2044,13 +2044,15 @@ def _resolve_partition_date( asset_infos: Iterable[tuple[str, str]], partition_key: str, dag_id: str, + carried_partition_date: datetime | None, ) -> datetime | None: """ - Return the temporal anchor (period-start datetime) for *partition_key*. + Return the ``partition_date`` the consumer Dag run should be created with. - Resolves the temporal anchor (period-start datetime) for *partition_key* - across *asset_infos* — the ``(name, uri)`` pairs of the upstream assets - that contributed to it. Each upstream mapper resolves the key via + The temporal anchor (period-start datetime) is resolved for + *partition_key* across *asset_infos* — the ``(name, uri)`` pairs of the + upstream assets that contributed to it. Each upstream mapper resolves the + key via :meth:`~airflow.partition_mappers.base.PartitionMapper.to_partition_date`: temporal mappers decode the key, composite mappers delegate to their child, and non-temporal mappers (e.g. @@ -2059,16 +2061,19 @@ def _resolve_partition_date( A partitioned consumer has a single partition identity, so every temporal mapper feeding it must resolve the same key to the same instant. Anchors are compared by instant (timezone-aware), so equivalent moments collapse - to one. When the temporal mappers agree, that anchor is returned; when - they disagree — a misconfiguration, e.g. assets mapping the same key under - different timezones — ``partition_date`` is left unset and a warning is - logged rather than silently picking one by scan order. Returns ``None`` if - no mapper is temporal. - - A failure in any mapper aborts the whole resolution and returns ``None`` - (logged) — anchors accumulated from earlier mappers are discarded rather - than used as a partial result, since a partial set could hide a conflict. - A broken mapper must not crash the scheduler tick. + to one. When the temporal mappers agree, that anchor is returned. + + When no temporal mapper contributes at all — an identity key carries no + temporal meaning and cannot be decoded back into a date — the producer's + source date carried on the APDR at queue time (*carried_partition_date*, + set only for ``IdentityMapper``) is returned instead. + + When temporal mappers were present but produced no usable anchor — they + disagreed (a misconfiguration, e.g. assets mapping the same key under + different timezones) or one raised — the conflict/error is logged and + ``None`` is returned. The carried date is deliberately *not* substituted + here: stamping it would mask the logged suppression. A broken mapper must + not crash the scheduler tick. """ anchors: set[datetime] = set() try: @@ -2086,7 +2091,12 @@ def _resolve_partition_date( return None if not anchors: - return None + # No temporal mapper contributed an anchor (e.g. an all-IdentityMapper feed), + # so fall back to the date carried on the APDR. A partitioned consumer's feeding + # assets are expected to agree on the partition's datetime; when a temporal mapper + # *does* resolve an anchor it takes precedence over the carried identity date, + # since the key is the authoritative source the scheduler can re-derive. + return carried_partition_date if len(anchors) > 1: self.log.warning( "Upstream partition mappers resolved conflicting partition_date values for the same " @@ -2288,6 +2298,7 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st asset_infos=asset_info_per_apdr[apdr.id].values(), partition_key=apdr.partition_key, dag_id=apdr.target_dag_id, + carried_partition_date=apdr.partition_date, ) dag_run = dag.create_dagrun( run_id=DagRun.generate_run_id( diff --git a/airflow-core/src/airflow/listeners/types.py b/airflow-core/src/airflow/listeners/types.py index 120b8ef503a6e..fa9ebdafaca0f 100644 --- a/airflow-core/src/airflow/listeners/types.py +++ b/airflow-core/src/airflow/listeners/types.py @@ -23,6 +23,8 @@ import attrs if TYPE_CHECKING: + from datetime import datetime + from pydantic import JsonValue from airflow.serialization.definitions.assets import SerializedAsset, SerializedAssetAlias @@ -40,3 +42,4 @@ class AssetEvent: source_map_index: int | None source_aliases: list[SerializedAssetAlias] partition_key: str | None + partition_date: datetime | None = None diff --git a/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py new file mode 100644 index 0000000000000..ba483eb88e037 --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add partition_date to asset_partition_dag_run. + +The target datetime is frozen at APDR creation time so the consumer DagRun's +``partition_date`` is consistent with the partition mapper that produced its +``partition_key``. + +Revision ID: d2f4e1b3c5a7 +Revises: 9ff64e1c35d3 +Create Date: 2026-05-21 09:00:00.000000 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +from airflow.utils.sqlalchemy import UtcDateTime + +revision = "d2f4e1b3c5a7" +down_revision = "9ff64e1c35d3" +branch_labels = None +depends_on = None +airflow_version = "3.3.0" + + +def upgrade(): + """Add partition_date column to asset_partition_dag_run.""" + with op.batch_alter_table("asset_partition_dag_run", schema=None) as batch_op: + batch_op.add_column(sa.Column("partition_date", UtcDateTime, nullable=True)) + + +def downgrade(): + """Remove partition_date column from asset_partition_dag_run.""" + with op.batch_alter_table("asset_partition_dag_run", schema=None) as batch_op: + batch_op.drop_column("partition_date") diff --git a/airflow-core/src/airflow/models/asset.py b/airflow-core/src/airflow/models/asset.py index 34e4ffaec3e38..60256f9cd8852 100644 --- a/airflow-core/src/airflow/models/asset.py +++ b/airflow-core/src/airflow/models/asset.py @@ -921,6 +921,7 @@ class AssetPartitionDagRun(Base): target_dag_id: Mapped[str] = mapped_column(StringID(), nullable=False) created_dag_run_id: Mapped[int | None] = mapped_column(Integer(), nullable=True) partition_key: Mapped[str] = mapped_column(StringID(), nullable=False) + partition_date: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True) # Serialized snapshot of the rollup definition (mapper + window for every # partitioned asset in the timetable) at the time this APDR was created. # The scheduler discards APDRs whose stored fingerprint no longer matches diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 348d6ed9813e4..47de1190b3775 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -433,6 +433,7 @@ def dag_run_data(self) -> DRDataModel: conf=self.conf, consumed_asset_events=[], partition_key=self.partition_key, + partition_date=self.partition_date, ) @property @@ -1099,6 +1100,8 @@ def _emit_dagrun_span(self, state: DagRunState): attributes["airflow.dag_run.logical_date"] = str(self.logical_date) if self.partition_key: attributes["airflow.dag_run.partition_key"] = str(self.partition_key) + if self.partition_date: + attributes["airflow.dag_run.partition_date"] = self.partition_date.isoformat() # TODO: make the empty parent context optional. Default should be to # nest the dag run span under the currently active parent span (by diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 5388b4b6b488b..b39c9f3b322d6 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1554,6 +1554,7 @@ def register_asset_changes_in_db( OutletEventPayload(extra=outlet_event["extra"], partition_key=partition_key) ) dag_run_partition_key = ti.dag_run.partition_key + dag_run_partition_date = ti.dag_run.partition_date asset_keys = { SerializedAssetUniqueKey(o.name, o.uri) @@ -1589,6 +1590,7 @@ def _register(am: AssetModel, key: SerializedAssetUniqueKey) -> None: asset=am, extra=None, partition_key=dag_run_partition_key, + partition_date=dag_run_partition_date, session=session, ) return @@ -1596,11 +1598,26 @@ def _register(am: AssetModel, key: SerializedAssetUniqueKey) -> None: effective_pk = ( payload.partition_key if payload.partition_key is not None else dag_run_partition_key ) + # Carry partition_date only when the effective key matches the + # DagRun's — the run-level date refers to the run-level key and + # would mis-label an event emitted for a different partition. + if effective_pk == dag_run_partition_key: + payload_partition_date = dag_run_partition_date + else: + payload_partition_date = None + if dag_run_partition_date is not None: + ti.log.debug( + "Task-emitted partition_key %r differs from DagRun partition_key %r; " + "consumer partition_date will be None.", + payload.partition_key, + dag_run_partition_key, + ) asset_manager.register_asset_change( task_instance=ti, asset=am, extra=payload.extra, partition_key=effective_pk, + partition_date=payload_partition_date, session=session, ) @@ -1684,6 +1701,7 @@ def _asset_event_extras_from_aliases() -> dict[tuple[SerializedAssetUniqueKey, s source_alias_names=event_aliase_names, extra=asset_event_extra, partition_key=dag_run_partition_key, + partition_date=dag_run_partition_date, session=session, ) if event is None: @@ -1696,6 +1714,7 @@ def _asset_event_extras_from_aliases() -> dict[tuple[SerializedAssetUniqueKey, s source_alias_names=event_aliase_names, extra=asset_event_extra, partition_key=dag_run_partition_key, + partition_date=dag_run_partition_date, session=session, ) diff --git a/airflow-core/src/airflow/partition_mappers/base.py b/airflow-core/src/airflow/partition_mappers/base.py index 0be82d8f6c5bf..f9d05b6d8d3c1 100644 --- a/airflow-core/src/airflow/partition_mappers/base.py +++ b/airflow-core/src/airflow/partition_mappers/base.py @@ -117,6 +117,21 @@ def to_partition_date(self, downstream_key: str) -> datetime | None: """ return None + def carry_partition_date(self, source_partition_date: datetime | None) -> datetime | None: + """ + Return the producer's ``partition_date`` to carry onto the consumer APDR. + + Captured at queue time as an asset event arrives, *source_partition_date* + is the producing run's ``partition_date``. The base implementation returns + ``None``: for most mappers the consumer's date is derived from its own + downstream key by :meth:`to_partition_date` at run creation, not carried + from the producer. + :class:`~airflow.partition_mappers.identity.IdentityMapper` overrides to + pass it through, since the consumer's key equals the producer's and the + key carries no temporal meaning to decode. + """ + return None + def serialize(self) -> dict[str, Any]: if self.max_downstream_keys is None: return {} diff --git a/airflow-core/src/airflow/partition_mappers/identity.py b/airflow-core/src/airflow/partition_mappers/identity.py index e0b2e25b1ae47..c538747e1835f 100644 --- a/airflow-core/src/airflow/partition_mappers/identity.py +++ b/airflow-core/src/airflow/partition_mappers/identity.py @@ -16,11 +16,22 @@ # under the License. from __future__ import annotations +from typing import TYPE_CHECKING + from airflow.partition_mappers.base import PartitionMapper +if TYPE_CHECKING: + from datetime import datetime + class IdentityMapper(PartitionMapper): """Partition mapper that does not change the key.""" def to_downstream(self, key: str) -> str: return key + + def carry_partition_date(self, source_partition_date: datetime | None) -> datetime | None: + # Identity passthrough: the consumer's key equals the producer's, so the + # producer's partition_date is the consumer's. to_partition_date cannot + # recover it (the key carries no temporal meaning), so it is carried here. + return source_partition_date diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 413dbef3d7e42..1d5cfc5d86be7 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3473,10 +3473,22 @@ export const $DAGRunResponse = { } ], title: 'Partition Key' + }, + partition_date: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Partition Date' } }, type: 'object', - required: ['dag_run_id', 'dag_id', 'logical_date', 'queued_at', 'start_date', 'end_date', 'duration', 'data_interval_start', 'data_interval_end', 'run_after', 'last_scheduling_decision', 'run_type', 'state', 'triggered_by', 'triggering_user_name', 'conf', 'note', 'dag_versions', 'bundle_version', 'dag_display_name', 'partition_key'], + required: ['dag_run_id', 'dag_id', 'logical_date', 'queued_at', 'start_date', 'end_date', 'duration', 'data_interval_start', 'data_interval_end', 'run_after', 'last_scheduling_decision', 'run_type', 'state', 'triggered_by', 'triggering_user_name', 'conf', 'note', 'dag_versions', 'bundle_version', 'dag_display_name', 'partition_key', 'partition_date'], title: 'DAGRunResponse', description: 'Dag Run serializer for responses.' } as const; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 31f716d85f885..f8cbb1fbc1875 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -917,6 +917,7 @@ export type DAGRunResponse = { bundle_version: string | null; dag_display_name: string; partition_key: string | null; + partition_date: string | null; }; /** diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 2155ca50d33b3..f24f3636132ae 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -116,7 +116,7 @@ class MappedClassProtocol(Protocol): "3.1.0": "cc92b33c6709", "3.1.8": "509b94a1042d", "3.2.0": "1d6611b6ab7c", - "3.3.0": "9ff64e1c35d3", + "3.3.0": "d2f4e1b3c5a7", } # Prefix used to identify tables holding data moved during migration. diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py index 89957e179c2e2..b65036b72d9b3 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py @@ -1543,6 +1543,7 @@ def test_should_respond_200(self, test_client): "dag_versions": mock.ANY, "logical_date": None, "partition_key": None, + "partition_date": None, "queued_at": mock.ANY, "run_after": mock.ANY, "start_date": None, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index d953e950dea6a..ff14aaa8980f5 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -167,6 +167,9 @@ def setup(request, dag_maker, *, session=None): # The value uses the ProductMapper default delimiter (|) to form a composite key so we can # verify that the filter treats | as a literal character, not an OR separator. dag_run1.partition_key = "2026-01-01|us" + # Set a real partition_date so the GET/list responses exercise the serialized + # (non-None) partition_date path, not just the None case. + dag_run1.partition_date = datetime(2026, 1, 1, tzinfo=timezone.utc) dag_run1.note = (DAG1_RUN1_NOTE, "not_test") # Set end_date for testing duration filter dag_run1.end_date = dag_run1.start_date + timedelta(seconds=101) @@ -317,6 +320,9 @@ def get_dag_run_dict(run: DagRun): "note": run.note, "dag_versions": get_dag_versions_dict(run.dag_versions), "partition_key": run.partition_key, + "partition_date": from_datetime_to_zulu_without_ms(run.partition_date) + if run.partition_date + else None, } @@ -2371,6 +2377,7 @@ def test_should_respond_200( "triggered_by": "rest_api", "triggering_user_name": "test", "partition_key": None, + "partition_date": None, } assert response.json() == expected_response_json @@ -2601,6 +2608,7 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client): "conf": {}, "note": note, "partition_key": None, + "partition_date": None, } assert response_2.status_code == 409 @@ -2690,6 +2698,7 @@ def test_should_respond_200_with_null_logical_date(self, test_client): "conf": {}, "note": None, "partition_key": None, + "partition_date": None, } @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index fbe7cc56e4a08..5bd6d6b36e373 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -452,6 +452,7 @@ def test_get_state(self, client, session, dag_maker): "end_date": "2025-12-13T00:00:00Z", "logical_date": None, "partition_key": None, + "partition_date": None, "run_after": "2025-12-13T00:00:00Z", "run_id": "previous", "run_type": "manual", diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index af44599776107..90325bb9c8542 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -236,6 +236,7 @@ def test_ti_run_state_to_running( "triggering_user_name": None, "consumed_asset_events": [], "partition_key": None, + "partition_date": None, "note": None, "team_name": None, }, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py new file mode 100644 index 0000000000000..b2cc016be84d7 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pytest + +from airflow._shared.timezones import timezone +from airflow.utils.state import DagRunState, State + +from tests_common.test_utils.db import clear_db_runs + +pytestmark = pytest.mark.db_test + +TIMESTAMP_STR = "2024-09-30T12:00:00Z" +TIMESTAMP = timezone.parse(TIMESTAMP_STR) +PARTITION_DATE = timezone.parse("2026-05-20T01:00:00") + +RUN_PATCH_BODY = { + "state": "running", + "hostname": "h", + "unixname": "u", + "pid": 1, + "start_date": TIMESTAMP_STR, +} + + +@pytest.fixture +def old_ver_client(client): + """Execution API version immediately before ``partition_date`` was added.""" + client.headers["Airflow-API-Version"] = "2026-06-16" + return client + + +class TestPartitionDateFieldBackwardCompat: + @pytest.fixture(autouse=True) + def _freeze_time(self, time_machine): + time_machine.move_to(TIMESTAMP_STR, tick=False) + + def setup_method(self): + clear_db_runs() + + def teardown_method(self): + clear_db_runs() + + def test_old_version_strips_partition_date_from_dag_run( + self, old_ver_client, session, create_task_instance + ): + ti = create_task_instance( + task_id="test_partition_date_downgrade", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=TIMESTAMP, + ) + ti.dag_run.partition_key = "2026-05-20" + ti.dag_run.partition_date = PARTITION_DATE + session.commit() + + response = old_ver_client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY) + assert response.status_code == 200 + dag_run = response.json()["dag_run"] + assert dag_run["partition_key"] == "2026-05-20" + assert "partition_date" not in dag_run + + def test_head_version_includes_partition_date_field(self, client, session, create_task_instance): + ti = create_task_instance( + task_id="test_partition_date_head", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=TIMESTAMP, + ) + ti.dag_run.partition_key = "2026-05-20" + ti.dag_run.partition_date = PARTITION_DATE + session.commit() + + response = client.patch(f"/execution/task-instances/{ti.id}/run", json=RUN_PATCH_BODY) + assert response.status_code == 200 + dag_run = response.json()["dag_run"] + assert dag_run["partition_key"] == "2026-05-20" + assert dag_run["partition_date"] == PARTITION_DATE.isoformat().replace("+00:00", "Z") diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 40d9401b9ffc0..59368c8b56af3 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -30,6 +30,7 @@ from airflow import settings from airflow._shared.observability.metrics.base_stats_logger import StatsLogger +from airflow._shared.timezones import timezone from airflow.assets.manager import AssetManager from airflow.models.asset import ( AssetAliasModel, @@ -260,6 +261,7 @@ def _get_or_create_apdr(): try: return AssetManager._get_or_create_apdr( target_key="test_partition_key", + target_partition_date=None, target_dag=testing_dag, rollup_fingerprint=rollup_fingerprint, asset_id=asm.id, @@ -282,6 +284,122 @@ def _get_or_create_apdr(): assert len(set(ids)) == 1 assert session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 1 + @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle") + def test_get_or_create_apdr_suppresses_conflicting_partition_date(self, session): + """Two events resolving the same target key to different dates → suppress to None. + + Rather than an order-dependent first-event-wins, conflicting carried dates produce a + deterministic ``None`` so the consumer DagRun is not stamped with a wrong, unstable date. + """ + asm = AssetModel(uri="test://asset1/", name="partition_asset", group="asset") + testing_dag = DagModel(dag_id="testing_dag_pd_conflict", is_stale=False, bundle_name="testing") + session.add_all([asm, testing_dag]) + session.commit() + fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": {}}} + + first = AssetManager._get_or_create_apdr( + target_key="2026-05-20", + target_partition_date=timezone.parse("2026-05-20T00:00:00"), + target_dag=testing_dag, + rollup_fingerprint=fp, + asset_id=asm.id, + session=session, + ) + assert first.partition_date == timezone.parse("2026-05-20T00:00:00") + + # A second contributing event resolves the same key to a DIFFERENT date. + second = AssetManager._get_or_create_apdr( + target_key="2026-05-20", + target_partition_date=timezone.parse("2026-05-21T00:00:00"), + target_dag=testing_dag, + rollup_fingerprint=fp, + asset_id=asm.id, + session=session, + ) + assert second.id == first.id # same pending APDR + assert second.partition_date is None # conflict suppressed, deterministic + + @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle") + def test_get_or_create_apdr_keeps_agreeing_partition_date(self, session): + """A later event carrying the same (or no) date does not trip the conflict suppression.""" + asm = AssetModel(uri="test://asset1/", name="partition_asset", group="asset") + testing_dag = DagModel(dag_id="testing_dag_pd_agree", is_stale=False, bundle_name="testing") + session.add_all([asm, testing_dag]) + session.commit() + fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": {}}} + source_date = timezone.parse("2026-05-20T00:00:00") + + kwargs = dict( + target_key="2026-05-20", + target_dag=testing_dag, + rollup_fingerprint=fp, + asset_id=asm.id, + session=session, + ) + first = AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs) + # Same date agrees → kept. + same = AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs) + assert same.id == first.id + assert same.partition_date == source_date + # A None-carrying event (e.g. a temporal mapper, resolved by the scheduler) is not a + # conflict → the existing date is kept. + with_none = AssetManager._get_or_create_apdr(target_partition_date=None, **kwargs) + assert with_none.id == first.id + assert with_none.partition_date == source_date + + @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle") + def test_get_or_create_apdr_adopts_date_when_existing_is_none(self, session): + """An APDR created with no date adopts a later event's carried date (not dropped).""" + asm = AssetModel(uri="test://asset1/", name="partition_asset", group="asset") + testing_dag = DagModel(dag_id="testing_dag_pd_adopt", is_stale=False, bundle_name="testing") + session.add_all([asm, testing_dag]) + session.commit() + fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": {}}} + source_date = timezone.parse("2026-05-20T00:00:00") + + kwargs = dict( + target_key="2026-05-20", + target_dag=testing_dag, + rollup_fingerprint=fp, + asset_id=asm.id, + session=session, + ) + # First event carries no date (e.g. producer had no partition_date). + first = AssetManager._get_or_create_apdr(target_partition_date=None, **kwargs) + assert first.partition_date is None + # A later identity event carries a real date → adopted, not silently dropped. + adopted = AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs) + assert adopted.id == first.id + assert adopted.partition_date == source_date + + @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle") + def test_get_or_create_apdr_recovers_after_conflict(self, session): + """Once a conflict has suppressed the date to None, a later event re-adopts a date.""" + asm = AssetModel(uri="test://asset1/", name="partition_asset", group="asset") + testing_dag = DagModel(dag_id="testing_dag_pd_recover", is_stale=False, bundle_name="testing") + session.add_all([asm, testing_dag]) + session.commit() + fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": {}}} + date_1 = timezone.parse("2026-05-20T00:00:00") + date_2 = timezone.parse("2026-05-21T00:00:00") + + kwargs = dict( + target_key="2026-05-20", + target_dag=testing_dag, + rollup_fingerprint=fp, + asset_id=asm.id, + session=session, + ) + first = AssetManager._get_or_create_apdr(target_partition_date=date_1, **kwargs) + assert first.partition_date == date_1 + # Conflicting date suppresses to None. + conflicted = AssetManager._get_or_create_apdr(target_partition_date=date_2, **kwargs) + assert conflicted.partition_date is None + # A subsequent event re-adopts (suppression is not permanently sticky). + recovered = AssetManager._get_or_create_apdr(target_partition_date=date_2, **kwargs) + assert recovered.id == first.id + assert recovered.partition_date == date_2 + @pytest.mark.need_serialized_dag @pytest.mark.usefixtures("testing_dag_bundle") def test_queue_partitioned_dags_stamps_rollup_fingerprint(self, session, dag_maker): diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index b312f9fd1b0a8..cbe2b0ca155e1 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -10250,6 +10250,7 @@ def _produce_and_register_asset_event( session: Session, dag_maker: DagMaker, expected_partition_key: str | None = None, + partition_date: datetime.datetime | None = None, ) -> AssetPartitionDagRun: if expected_partition_key is None: expected_partition_key = partition_key @@ -10257,7 +10258,11 @@ def _produce_and_register_asset_event( with dag_maker(dag_id=dag_id, schedule=PartitionAtRuntime(), session=session) as dag: EmptyOperator(task_id="hi", outlets=[asset]) - dr = dag_maker.create_dagrun(partition_key=partition_key, session=session) + dr = dag_maker.create_dagrun( + partition_key=partition_key, + partition_date=partition_date, + session=session, + ) [ti] = dr.get_task_instances(session=session) session.commit() @@ -10479,6 +10484,289 @@ def test_partitioned_dag_run_with_customized_mapper( assert asset_event.source_run_id == "test" +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +def test_consumer_dag_run_partition_date_identity_passthrough(dag_maker: DagMaker, session: Session): + """IdentityMapper can't reconstruct a date from its key, so the scheduler resolver falls + back to the producer's source date carried on the APDR and stamps it on the consumer DagRun. + + Temporal and composite mappers are resolved by the scheduler via to_partition_date (covered + by the partition_mapper and resolver tests); this exercises the IdentityMapper carry, which + is the one case the scheduler cannot resolve from the key alone. + """ + asset_1 = Asset(name="asset-1") + source_partition_date = pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC") + + with dag_maker( + dag_id="asset-event-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=IdentityMapper(), + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + + apdr = _produce_and_register_asset_event( + dag_id="asset-event-producer", + asset=asset_1, + partition_key="2026-05-20T01:00:00", + partition_date=source_partition_date, + session=session, + dag_maker=dag_maker, + expected_partition_key="2026-05-20T01:00:00", + ) + partition_dags = runner._create_dagruns_for_partitioned_asset_dags(session=session) + + session.refresh(apdr) + assert apdr.created_dag_run_id is not None + assert partition_dags == {"asset-event-consumer"} + + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_key == "2026-05-20T01:00:00" + assert dag_run.partition_date == source_partition_date + + +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +@mock.patch.object(SchedulerJobRunner, "_resolve_partition_date", autospec=True, return_value=None) +def test_consumer_dag_run_partition_date_not_masked_when_resolver_suppresses( + mock_resolve, dag_maker: DagMaker, session: Session +): + """A carried IdentityMapper date must not mask a resolver suppression. + + When temporal mappers feeding the same APDR conflict (or one raises), the resolver + deliberately returns None and logs it; the carried date is only ever applied inside the + resolver, never at the call site. Here the APDR carries a date (IdentityMapper) but the + resolver returns None, and the consumer DagRun's partition_date must stay None — a + regression re-adding a call-site fallback to ``apdr.partition_date`` would fail this. + """ + asset_1 = Asset(name="asset-1") + source_partition_date = pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC") + + with dag_maker( + dag_id="asset-event-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=IdentityMapper(), + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + + apdr = _produce_and_register_asset_event( + dag_id="asset-event-producer", + asset=asset_1, + partition_key="2026-05-20T01:00:00", + partition_date=source_partition_date, + session=session, + dag_maker=dag_maker, + expected_partition_key="2026-05-20T01:00:00", + ) + session.refresh(apdr) + # The IdentityMapper carry is stored on the APDR... + assert apdr.partition_date == source_partition_date + + runner._create_dagruns_for_partitioned_asset_dags(session=session) + + session.refresh(apdr) + assert apdr.created_dag_run_id is not None + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_key == "2026-05-20T01:00:00" + # ...but the resolver suppressed a date, so the call site must NOT substitute the carry. + assert dag_run.partition_date is None + + +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +def test_consumer_dag_run_partition_date_none_for_non_temporal_mapper( + dag_maker: DagMaker, + session: Session, + custom_partition_mapper_patch: Callable[[], ExitStack], +): + """For mappers that aren't temporal/identity, the consumer DagRun's partition_date stays None.""" + asset_1 = Asset(name="asset-1") + + with custom_partition_mapper_patch(): + with dag_maker( + dag_id="asset-event-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=Key1Mapper(), # type: ignore[arg-type] + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + with custom_partition_mapper_patch(): + apdr = _produce_and_register_asset_event( + dag_id="asset-event-producer", + asset=asset_1, + partition_key="this-is-not-key-1-before-mapped", + partition_date=pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC"), + session=session, + dag_maker=dag_maker, + expected_partition_key="key-1", + ) + runner._create_dagruns_for_partitioned_asset_dags(session=session) + + session.refresh(apdr) + assert apdr.created_dag_run_id is not None + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_key == "key-1" + assert dag_run.partition_date is None + + +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +def test_consumer_dag_run_partition_date_is_none_when_source_has_no_date( + dag_maker: DagMaker, session: Session +): + """When the producer DagRun has no partition_date, IdentityMapper passes None through.""" + asset_1 = Asset(name="asset-1") + + with dag_maker( + dag_id="asset-event-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=IdentityMapper(), + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + + apdr = _produce_and_register_asset_event( + dag_id="asset-event-producer", + asset=asset_1, + partition_key="2026-05-20T01:00:00", + partition_date=None, + session=session, + dag_maker=dag_maker, + expected_partition_key="2026-05-20T01:00:00", + ) + runner._create_dagruns_for_partitioned_asset_dags(session=session) + + session.refresh(apdr) + assert apdr.created_dag_run_id is not None + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_key == "2026-05-20T01:00:00" + assert dag_run.partition_date is None + + +@pytest.mark.need_serialized_dag +@pytest.mark.usefixtures("clear_asset_partition_rows") +def test_consumer_dag_run_partition_date_is_none_when_task_key_diverges( + dag_maker: DagMaker, session: Session +): + """A task-emitted partition_key differing from the DagRun's drops the source date. + + The producer DagRun carries a partition_date, but the task emits an outlet event with a + different partition_key. The run-level date refers to the run-level key, so it must not be + carried onto the divergent partition: the APDR (and the consumer DagRun created from it) keep + partition_date None even though the producer run had one. + """ + asset_1 = Asset(name="asset-1") + + with dag_maker( + dag_id="asset-event-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=IdentityMapper(), + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + runner = SchedulerJobRunner( + job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)] + ) + + with dag_maker( + dag_id="asset-event-producer", + schedule=PartitionAtRuntime(), + session=session, + ) as dag: + EmptyOperator(task_id="hi", outlets=[asset_1]) + + dr = dag_maker.create_dagrun( + partition_key="scheduler-key", + partition_date=pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC"), + session=session, + ) + [ti] = dr.get_task_instances(session=session) + session.commit() + + serialized_outlets = dag.get_task("hi").outlets + TaskInstance.register_asset_changes_in_db( + ti=ti, + task_outlets=[o.asprofile() for o in serialized_outlets], + outlet_events=[ + { + "dest_asset_key": {"name": "asset-1", "uri": "asset-1"}, + "extra": {}, + "partition_key": "task-key", + }, + ], + session=session, + ) + session.commit() + + event = session.scalar( + select(AssetEvent).where( + AssetEvent.source_dag_id == dag.dag_id, + AssetEvent.source_run_id == dr.run_id, + ) + ) + assert event is not None + assert event.partition_key == "task-key" + + apdr = session.scalar( + select(AssetPartitionDagRun) + .join( + PartitionedAssetKeyLog, + PartitionedAssetKeyLog.asset_partition_dag_run_id == AssetPartitionDagRun.id, + ) + .where(PartitionedAssetKeyLog.asset_event_id == event.id) + ) + assert apdr is not None + # Divergent key → the threaded source date is dropped to None at APDR creation. + assert apdr.partition_key == "task-key" + assert apdr.partition_date is None + + runner._create_dagruns_for_partitioned_asset_dags(session=session) + + session.refresh(apdr) + assert apdr.created_dag_run_id is not None + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + assert dag_run.partition_key == "task-key" + assert dag_run.partition_date is None + + @pytest.mark.need_serialized_dag @pytest.mark.usefixtures("clear_asset_partition_rows") def test_consumer_dag_listen_to_two_partitioned_asset( @@ -12265,25 +12553,39 @@ def _make_runner() -> SchedulerJobRunner: ) +_CARRIED_DATE = datetime.datetime(2026, 5, 20, 1, 0, 0, tzinfo=datetime.timezone.utc) + + @pytest.mark.parametrize( - ("mappers", "partition_key", "expected"), + ("mappers", "partition_key", "carried_partition_date", "expected"), [ - # Non-temporal mapper → no anchor. - pytest.param([CoreIdentityMapper()], "some-key", None, id="non-temporal-none"), + # Non-temporal mapper, nothing carried → None. + pytest.param([CoreIdentityMapper()], "some-key", None, None, id="non-temporal-none"), + # Non-temporal mapper with a carried producer date (IdentityMapper) → the carry. + pytest.param( + [CoreIdentityMapper()], + "some-key", + _CARRIED_DATE, + _CARRIED_DATE, + id="non-temporal-returns-carried-date", + ), # StartOfDayMapper(NY): "2024-03-15" → NY midnight = 04:00 UTC (EDT, DST since 2024-03-10), # localised with the mapper's own timezone rather than the global default. pytest.param( [CoreStartOfDayMapper(timezone="America/New_York")], "2024-03-15", + None, datetime.datetime(2024, 3, 15, 4, 0, 0, tzinfo=datetime.timezone.utc), id="non-utc-uses-mapper-timezone", ), - # Key cannot be decoded by the mapper's format → caught → None (no raise). - pytest.param([CoreStartOfDayMapper()], "not-a-date", None, id="decode-failure-none"), + # Key cannot be decoded by the mapper's format → caught → None, and the carried + # date is NOT substituted (the error is logged; masking it would hide that). + pytest.param([CoreStartOfDayMapper()], "not-a-date", _CARRIED_DATE, None, id="decode-failure-none"), # FanOutMapper unwraps to its downstream_mapper (daily), which owns the per-day key. pytest.param( [CoreFanOutMapper(upstream_mapper=CoreStartOfWeekMapper(), window=CoreWeekWindow())], "2024-01-16", + None, datetime.datetime(2024, 1, 16, 0, 0, 0, tzinfo=datetime.timezone.utc), id="fanout-uses-downstream-mapper", ), @@ -12291,13 +12593,16 @@ def _make_runner() -> SchedulerJobRunner: pytest.param( [CoreStartOfDayMapper(), CoreStartOfDayMapper()], "2024-03-15", + None, datetime.datetime(2024, 3, 15, 0, 0, 0, tzinfo=datetime.timezone.utc), id="agreeing-mappers-anchor", ), - # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct instants → None. + # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct instants → None, + # and the carried date is NOT substituted (it would mask the logged conflict). pytest.param( [CoreStartOfDayMapper(timezone="UTC"), CoreStartOfDayMapper(timezone="America/New_York")], "2024-03-15", + _CARRIED_DATE, None, id="conflicting-mappers-none", ), @@ -12306,15 +12611,19 @@ def _make_runner() -> SchedulerJobRunner: pytest.param( [CoreStartOfDayMapper(), CoreStartOfHourMapper()], "2024-03-15", + _CARRIED_DATE, None, id="one-failing-mapper-aborts", ), ], ) -def test_resolve_partition_date(mappers, partition_key, expected): +def test_resolve_partition_date(mappers, partition_key, carried_partition_date, expected): """_resolve_partition_date over mapper compositions: temporal / fan-out / agree / conflict / failure. - The mappers are consumed one per upstream asset, so ``asset_infos`` is sized to ``mappers``. + The carried date (the producer's source date stamped on the APDR, set only for IdentityMapper) + is returned only when no temporal mapper contributes an anchor. On a conflict or a mapper + error the result is None — the carry must not mask the logged suppression. The mappers are + consumed one per upstream asset, so ``asset_infos`` is sized to ``mappers``. """ runner = _make_runner() timetable = mock.MagicMock() @@ -12326,5 +12635,6 @@ def test_resolve_partition_date(mappers, partition_key, expected): asset_infos=asset_infos, partition_key=partition_key, dag_id="test-dag", + carried_partition_date=carried_partition_date, ) assert result == expected diff --git a/airflow-core/tests/unit/partition_mappers/test_base.py b/airflow-core/tests/unit/partition_mappers/test_base.py index e7b0ba02f07f6..ba48057cc12f2 100644 --- a/airflow-core/tests/unit/partition_mappers/test_base.py +++ b/airflow-core/tests/unit/partition_mappers/test_base.py @@ -17,6 +17,7 @@ from __future__ import annotations import re +from datetime import datetime, timezone import pytest @@ -29,6 +30,17 @@ from airflow.serialization.enums import Encoding +class TestCarryPartitionDate: + def test_base_returns_none_by_default(self): + """Non-identity mappers don't carry the producer's date; it's derived from the key instead.""" + dt = datetime(2026, 5, 20, 1, 0, 0, tzinfo=timezone.utc) + assert StartOfDayMapper().carry_partition_date(dt) is None + assert ( + RollupMapper(upstream_mapper=StartOfDayMapper(), window=DayWindow()).carry_partition_date(dt) + is None + ) + + class TestPartitionMapperInitSubclass: """Verify that __init_subclass__ enforces the decode/encode pair contract.""" diff --git a/airflow-core/tests/unit/partition_mappers/test_identity.py b/airflow-core/tests/unit/partition_mappers/test_identity.py index 9a97e583db562..b992e76430e27 100644 --- a/airflow-core/tests/unit/partition_mappers/test_identity.py +++ b/airflow-core/tests/unit/partition_mappers/test_identity.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from datetime import datetime, timezone + from airflow.partition_mappers.identity import IdentityMapper from airflow.serialization.decoders import decode_partition_mapper from airflow.serialization.encoders import encode_partition_mapper @@ -27,6 +29,13 @@ def test_to_downstream(self): pm = IdentityMapper() assert pm.to_downstream("key") == "key" + def test_carry_partition_date_passes_source_through(self): + """IdentityMapper carries the producer's date through (its key can't reconstruct one).""" + pm = IdentityMapper() + dt = datetime(2026, 5, 20, 1, 0, 0, tzinfo=timezone.utc) + assert pm.carry_partition_date(dt) == dt + assert pm.carry_partition_date(None) is None + def test_serialize(self): pm = IdentityMapper() assert pm.serialize() == {} diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 779bb42319193..cb9a1ca15a3cc 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1774,6 +1774,7 @@ class DAGRunResponse(BaseModel): bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None dag_display_name: Annotated[str, Field(title="Dag Display Name")] partition_key: Annotated[str | None, Field(title="Partition Key")] = None + partition_date: Annotated[datetime | None, Field(title="Partition Date")] = None class DAGRunsBatchBody(BaseModel): diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index fb058b924b6ca..8698ddc403b60 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -470,6 +470,8 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): "prev_execution_date", "prev_execution_date_success", } + if AIRFLOW_V_3_3_PLUS: + PENDULUM_SERIALIZABLE_CONTEXT_KEYS.add("partition_date") AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = { "macros", diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 812ca4d041833..bc03569386d14 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -775,6 +775,7 @@ class DagRun(BaseModel): triggering_user_name: Annotated[str | None, Field(title="Triggering User Name")] = None consumed_asset_events: Annotated[list[AssetEventDagRunReference], Field(title="Consumed Asset Events")] partition_key: Annotated[str | None, Field(title="Partition Key")] = None + partition_date: Annotated[AwareDatetime | None, Field(title="Partition Date")] = None note: Annotated[str | None, Field(title="Note")] = None team_name: Annotated[str | None, Field(title="Team Name")] = None diff --git a/task-sdk/src/airflow/sdk/definitions/context.py b/task-sdk/src/airflow/sdk/definitions/context.py index 462af80aeca4a..9d36203c2ebbf 100644 --- a/task-sdk/src/airflow/sdk/definitions/context.py +++ b/task-sdk/src/airflow/sdk/definitions/context.py @@ -64,6 +64,7 @@ class Context(TypedDict, total=False): outlets: list params: dict[str, Any] partition_key: NotRequired[str | None] + partition_date: NotRequired[DateTime | None] prev_data_interval_start_success: NotRequired[DateTime | None] prev_data_interval_end_success: NotRequired[DateTime | None] prev_start_date_success: NotRequired[DateTime | None] diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json index 4807d9f53b353..b75957870d31b 100644 --- a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json +++ b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json @@ -1277,6 +1277,19 @@ "default": null, "title": "Partition Key" }, + "partition_date": { + "anyOf": [ + { + "format": "date-time", + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Partition Date" + }, "note": { "anyOf": [ { @@ -4640,6 +4653,19 @@ ], "title": "Partition Key" }, + "partition_date": { + "anyOf": [ + { + "format": "date-time", + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Partition Date" + }, "note": { "anyOf": [ { diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 7eb52038ce9e3..a39e2f91b53de 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -334,6 +334,7 @@ def get_template_context(self) -> Context: # TODO: Assess if we need to pass these through timezone.coerce_datetime "dag_run": dag_run, # type: ignore[typeddict-item] # Removable after #46522 "partition_key": dag_run.partition_key, + "partition_date": coerce_datetime(dag_run.partition_date), "triggering_asset_events": TriggeringAssetEventsAccessor.build( AssetEventDagRunReferenceResult.from_asset_event_dag_run_reference(event) for event in dag_run.consumed_asset_events diff --git a/task-sdk/src/airflow/sdk/types.py b/task-sdk/src/airflow/sdk/types.py index 029af34496f07..efa3c9f6d9426 100644 --- a/task-sdk/src/airflow/sdk/types.py +++ b/task-sdk/src/airflow/sdk/types.py @@ -123,6 +123,7 @@ class DagRunProtocol(Protocol): triggering_user_name: str | None consumed_asset_events: list[AssetEventDagRunReference] partition_key: str | None + partition_date: AwareDatetime | None note: str | None diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 4893258195446..bd50711eceaa8 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2397,6 +2397,7 @@ class RequestTestCase: "run_id": "prev_run", "logical_date": timezone.parse("2024-01-14T12:00:00Z"), "partition_key": None, + "partition_date": None, "run_type": "scheduled", "start_date": timezone.parse("2024-01-15T12:00:00Z"), "run_after": timezone.parse("2024-01-15T12:00:00Z"), @@ -2450,6 +2451,7 @@ class RequestTestCase: "run_id": "prev_run", "logical_date": timezone.parse("2024-01-14T12:00:00Z"), "partition_key": None, + "partition_date": None, "run_type": "scheduled", "start_date": timezone.parse("2024-01-15T12:00:00Z"), "run_after": timezone.parse("2024-01-15T12:00:00Z"), diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index be5ea4165adbf..c3c9b4295b42b 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -2001,6 +2001,7 @@ def test_get_context_with_ti_context_from_server(self, create_runtime_ti, mock_s "ts_nodash": "20241201T010000", "ts_nodash_with_tz": "20241201T010000+0000", "partition_key": dr.partition_key, + "partition_date": dr.partition_date, } def test_partition_key_in_context(self, create_runtime_ti, mock_supervisor_comms): @@ -2027,6 +2028,37 @@ def test_partition_key_in_context(self, create_runtime_ti, mock_supervisor_comms context = runtime_ti.get_template_context() assert context["partition_key"] == "some-partition" + def test_partition_date_in_context(self, create_runtime_ti, mock_supervisor_comms): + """Test that partition_date from dag_run is exposed in the template context.""" + task = BaseOperator(task_id="hello") + runtime_ti = create_runtime_ti(task=task, dag_id="basic_task") + + dr = runtime_ti._ti_context_from_server.dag_run + + mock_supervisor_comms.send.return_value = PrevSuccessfulDagRunResult( + data_interval_end=dr.logical_date - timedelta(hours=1), + data_interval_start=dr.logical_date - timedelta(hours=2), + start_date=dr.start_date - timedelta(hours=1), + end_date=dr.start_date, + ) + + context = runtime_ti.get_template_context() + + # Default: partition_date is None + assert context["partition_date"] is None + + # Set partition_date on dag_run and verify it surfaces in context + partition_date = timezone.datetime(2026, 5, 20, 1, 0, 0) + dr.partition_date = partition_date + context = runtime_ti.get_template_context() + assert context["partition_date"] == partition_date + + # Naive datetime is coerced to tz-aware so Jinja `| ds` / `| ts` filters + # operate on a real awareness boundary. + dr.partition_date = datetime(2026, 5, 20, 1, 0, 0) + context = runtime_ti.get_template_context() + assert context["partition_date"].tzinfo is not None + def test_lazy_loading_not_triggered_until_accessed(self, create_runtime_ti, mock_supervisor_comms): """Ensure lazy-loaded attributes are not resolved until accessed.""" task = BaseOperator(task_id="hello")