Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion airflow-core/docs/authoring-and-scheduling/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,19 @@ partition match can be produced, so the downstream Dag is not triggered for
that key.

Inside partitioned Dag runs, access the resolved partition through
``dag_run.partition_key``.
``dag_run.partition_key``. When the consumer's partition mapper can
resolve the key to a ``datetime``, that value is also available as
``dag_run.partition_date``, so templates can use
``{{ partition_date | ds }}``. This covers the ``StartOf*Mapper`` family
(which decode the key directly), ``IdentityMapper`` (which carries the
producer's ``partition_date`` through), and composite mappers —
``RollupMapper``, ``ChainMapper`` and ``FanOutMapper`` — whose effective
child mapper is temporal (they delegate the anchor to that child).
Mappers whose key carries no temporal meaning (``ProductMapper``,
``AllowedKeyMapper`` and custom mappers that do not implement
``to_partition_date``) leave ``partition_date`` ``None`` even when the
resulting key is date-shaped, so those consumers should keep parsing
``partition_key``.

You can also trigger a DagRun manually with a partition key (for example,
through the Trigger Dag window in the UI, or through the REST API by
Expand Down
4 changes: 3 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``9ff64e1c35d3`` (head) | ``dd5f3a8e2b91`` | ``3.3.0`` | Add indexes on dag_run.created_dag_version_id and |
| ``d2f4e1b3c5a7`` (head) | ``9ff64e1c35d3`` | ``3.3.0`` | Add partition_date to asset_partition_dag_run. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``9ff64e1c35d3`` | ``dd5f3a8e2b91`` | ``3.3.0`` | Add indexes on dag_run.created_dag_version_id and |
| | | | task_instance.dag_version_id. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``dd5f3a8e2b91`` | ``c20871fbf23a`` | ``3.3.0`` | Add rollup_fingerprint to AssetPartitionDagRun and index |
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/docs/templates-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ Variable Type Description
| is enabled in ``airflow.cfg``.
``{{ partition_key }}`` str | None | The partition key from the current :class:`~airflow.models.dagrun.DagRun`.
| Returns ``None`` if no partition key was set. Added in version 3.3.0.
``{{ partition_date }}`` datetime | None | The partition datetime from the current :class:`~airflow.models.dagrun.DagRun`.
| Use ``{{ partition_date | ds }}`` and related filters for formatting.
| Returns ``None`` if no partition date was set. Added in version 3.3.0.
``{{ var.value }}`` Airflow variables. See `Airflow Variables in Templates`_ below.
``{{ var.json }}`` Airflow variables. See `Airflow Variables in Templates`_ below.
``{{ conn }}`` Airflow connections. See `Airflow Connections in Templates`_ below.
Expand Down
1 change: 1 addition & 0 deletions airflow-core/newsfragments/67285.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Propagate ``partition_date`` from producer DagRuns to consumers of partitioned assets, so date-shaped partitions are available in consumer task templates.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class DAGRunResponse(BaseModel):
bundle_version: str | None
dag_display_name: str = Field(validation_alias=AliasPath("dag_model", "dag_display_name"))
partition_key: str | None
partition_date: datetime | None


class DAGRunCollectionResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13704,6 +13704,12 @@ components:
- type: string
- type: 'null'
title: Partition Key
partition_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Partition Date
type: object
required:
- dag_run_id
Expand All @@ -13727,6 +13733,7 @@ components:
- bundle_version
- dag_display_name
- partition_key
- partition_date
title: DAGRunResponse
description: Dag Run serializer for responses.
DAGRunsBatchBody:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ class DagRun(StrictBaseModel):
triggering_user_name: str | None = None
consumed_asset_events: list[AssetEventDagRunReference]
partition_key: str | None
partition_date: UtcDateTime | None = None
note: str | None = None
team_name: str | None = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
AddAssetsByAliasEndpoint,
AddAwaitingInputStatePayload,
AddConnectionTestEndpoint,
AddPartitionDateField,
AddRetryPolicyFields,
AddTaskAndAssetStateStoreEndpoints,
AddTaskInstanceQueueField,
Expand All @@ -63,6 +64,7 @@
AddTeamNameField,
AddTaskAndAssetStateStoreEndpoints,
AddAssetsByAliasEndpoint,
AddPartitionDateField,
),
Version(
"2026-04-06",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,17 @@ class AddTaskAndAssetStateStoreEndpoints(VersionChange):
endpoint("/store/asset/by-uri/value", ["DELETE"]).didnt_exist,
endpoint("/store/asset/by-uri/clear", ["DELETE"]).didnt_exist,
)


class AddPartitionDateField(VersionChange):
"""Expose the consumer DagRun's partition datetime on the execution API so consumer tasks can template it."""

description = __doc__

instructions_to_migrate_to_previous_version = (schema(DagRun).field("partition_date").didnt_exist,)

@convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type]
def remove_partition_date_from_dag_run(response: ResponseInfo) -> None: # type: ignore[misc]
"""Strip ``partition_date`` from the nested ``dag_run`` payload for older clients."""
if "dag_run" in response.body and isinstance(response.body["dag_run"], dict):
response.body["dag_run"].pop("partition_date", None)
Comment thread
nathadfield marked this conversation as resolved.
58 changes: 57 additions & 1 deletion airflow-core/src/airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks

if TYPE_CHECKING:
from datetime import datetime

from sqlalchemy.orm.session import Session

from airflow.models.dag import DagModel
Expand Down Expand Up @@ -274,6 +276,7 @@ def register_asset_change(
source_alias_names: Collection[str] = (),
session: Session,
partition_key: str | None = None,
partition_date: datetime | None = None,
source_is_api: bool = False,
api_user_teams: set[str] | None = None,
api_allow_consumer_teams: list[str] | None = None,
Expand Down Expand Up @@ -394,6 +397,7 @@ def register_asset_change(
source_map_index=asset_event.source_map_index,
source_aliases=[aam.to_serialized() for aam in asset_alias_models],
partition_key=partition_key,
partition_date=partition_date,
)
)

Expand Down Expand Up @@ -440,6 +444,7 @@ def register_asset_change(
asset_id=asset_model.id,
dags_to_queue=dags_to_queue,
partition_key=partition_key,
partition_date=partition_date,
event=asset_event,
task_instance=task_instance,
session=session,
Expand Down Expand Up @@ -485,6 +490,7 @@ def _queue_dagruns(
asset_id: int,
dags_to_queue: set[DagModel],
partition_key: str | None,
partition_date: datetime | None,
event: AssetEvent,
task_instance: TaskInstance | None,
session: Session,
Expand All @@ -499,6 +505,7 @@ def _queue_dagruns(
partition_dags=partition_dags,
event=event,
partition_key=partition_key,
partition_date=partition_date,
task_instance=task_instance,
session=session,
)
Expand Down Expand Up @@ -527,6 +534,7 @@ def _queue_partitioned_dags(
partition_dags: Iterable[DagModel],
event: AssetEvent,
partition_key: str | None,
partition_date: datetime | None,
task_instance: TaskInstance | None,
session: Session,
) -> None:
Expand Down Expand Up @@ -574,9 +582,9 @@ def _queue_partitioned_dags(
if (asset_model := session.scalar(select(AssetModel).where(AssetModel.id == asset_id))) is None:
raise RuntimeError(f"Could not find asset for asset_id={asset_id}")

mapper = timetable.get_partition_mapper(name=asset_model.name, uri=asset_model.uri)
try:
# We'll need to catch every possible exception happen when mapping partition_key.
mapper = timetable.get_partition_mapper(name=asset_model.name, uri=asset_model.uri)
target_key = mapper.to_downstream(partition_key)
except Exception as err:
log.exception(
Expand Down Expand Up @@ -643,9 +651,18 @@ def _queue_partitioned_dags(
)
continue

# The producer's partition_date (threaded in from its DagRun via
# register_asset_change) is carried onto the APDR only by mappers that
# opt in. IdentityMapper does, since its key carries no temporal meaning
# for the scheduler to re-derive at run creation; temporal and composite
# mappers return None here and are resolved from the key by the scheduler
# via PartitionMapper.to_partition_date.
target_partition_date: datetime | None = mapper.carry_partition_date(partition_date)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapper.carry_partition_date() sits outside the try that wraps mapper.to_downstream() just above, so a custom mapper whose carry_partition_date override raises would propagate out and abort the whole register_asset_change write, whereas a to_downstream failure in the same loop is caught, logged, and continued. The two stock mappers can't raise (IdentityMapper returns the date, temporal/composite return None), so the shipped path is safe. For parity with the loop's "catch every possible exception" intent, worth moving this call inside the try and falling back to None on failure.


for target_key in target_keys:
apdr = cls._get_or_create_apdr(
target_key=target_key,
target_partition_date=target_partition_date,
target_dag=target_dag,
rollup_fingerprint=fingerprint,
asset_id=asset_id,
Expand All @@ -666,6 +683,7 @@ def _get_or_create_apdr(
cls,
*,
target_key: str,
target_partition_date: datetime | None,
target_dag: DagModel,
rollup_fingerprint: dict,
asset_id: int,
Expand All @@ -683,6 +701,20 @@ def _get_or_create_apdr(
``rollup_fingerprint`` is the serialized mapper / window definition for all partitioned
assets in the timetable at creation time; the scheduler discards APDRs whose stamp no
longer matches the current timetable's fingerprint (mapper / window may have changed).

Reconciling the carried ``partition_date`` on an existing pending APDR is best-effort:
a partitioned consumer's feeding assets are expected to agree on the partition's
datetime. The carry only matters for ``IdentityMapper`` (whose key the scheduler
cannot decode); temporal/composite feeds re-derive the date from the key at run
creation regardless of what is stored here. Within that contract:

- If the APDR carries no date yet (``None`` — created by an event that carried none),
adopt the incoming date when this event carries one. There is nothing to conflict
with, so a later identity event's date is not dropped.
- If the APDR already carries a date and this event carries a **different** non-null
one, the producing assets disagree; picking one would be order-dependent, so the
carried date is suppressed to ``None`` (and re-adoptable by a later event).
- Otherwise (the dates agree, or this event carries none) the existing value is kept.
"""
with _lock_asset_model(session=session, asset_id=asset_id):
latest_apdr: AssetPartitionDagRun | None = session.scalar(
Expand All @@ -695,6 +727,29 @@ def _get_or_create_apdr(
.limit(1)
)
if latest_apdr and latest_apdr.created_dag_run_id is None:
existing_partition_date = latest_apdr.partition_date
if existing_partition_date is None:
# No carried date yet; adopt the incoming one if present (no conflict
# to resolve). Keeps a later identity event's date from being dropped.
if target_partition_date is not None:
latest_apdr.partition_date = target_partition_date
session.flush()
elif target_partition_date is not None and existing_partition_date != target_partition_date:
# Two contributing events carry conflicting partition_dates for the same
# (target_key, target_dag). Choosing one would be order-dependent, so
# suppress: the consumer DagRun gets partition_date=None rather than a
# wrong, unstable value.
log.warning(
"Conflicting partition_date carried for the same target key; "
"suppressing it so the consumer DagRun's partition_date is None. "
"The producing assets likely disagree on the partition's datetime.",
target_dag_id=target_dag.dag_id,
target_key=target_key,
existing_partition_date=existing_partition_date,
incoming_partition_date=target_partition_date,
)
latest_apdr.partition_date = None
Comment thread
nathadfield marked this conversation as resolved.
session.flush()
cls.logger().debug(
"Existing APDR found for key %s dag_id %s",
target_key,
Expand All @@ -707,6 +762,7 @@ def _get_or_create_apdr(
target_dag_id=target_dag.dag_id,
created_dag_run_id=None,
partition_key=target_key,
partition_date=target_partition_date,
rollup_fingerprint=rollup_fingerprint,
)
session.add(apdr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def combine_player_stats(dag_run=None):
"""Merge the aligned hourly partitions into a combined dataset."""
if TYPE_CHECKING:
assert dag_run
print(dag_run.partition_key)
print(dag_run.partition_key, dag_run.partition_date)

combine_player_stats()

Expand Down
41 changes: 26 additions & 15 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2044,13 +2044,15 @@ def _resolve_partition_date(
asset_infos: Iterable[tuple[str, str]],
partition_key: str,
dag_id: str,
carried_partition_date: datetime | None,
) -> datetime | None:
"""
Return the temporal anchor (period-start datetime) for *partition_key*.
Return the ``partition_date`` the consumer Dag run should be created with.

Resolves the temporal anchor (period-start datetime) for *partition_key*
across *asset_infos* — the ``(name, uri)`` pairs of the upstream assets
that contributed to it. Each upstream mapper resolves the key via
The temporal anchor (period-start datetime) is resolved for
*partition_key* across *asset_infos* — the ``(name, uri)`` pairs of the
upstream assets that contributed to it. Each upstream mapper resolves the
key via
:meth:`~airflow.partition_mappers.base.PartitionMapper.to_partition_date`:
temporal mappers decode the key, composite mappers delegate to their
child, and non-temporal mappers (e.g.
Expand All @@ -2059,16 +2061,19 @@ def _resolve_partition_date(
A partitioned consumer has a single partition identity, so every temporal
mapper feeding it must resolve the same key to the same instant. Anchors
are compared by instant (timezone-aware), so equivalent moments collapse
to one. When the temporal mappers agree, that anchor is returned; when
they disagree — a misconfiguration, e.g. assets mapping the same key under
different timezones — ``partition_date`` is left unset and a warning is
logged rather than silently picking one by scan order. Returns ``None`` if
no mapper is temporal.

A failure in any mapper aborts the whole resolution and returns ``None``
(logged) — anchors accumulated from earlier mappers are discarded rather
than used as a partial result, since a partial set could hide a conflict.
A broken mapper must not crash the scheduler tick.
to one. When the temporal mappers agree, that anchor is returned.

When no temporal mapper contributes at all — an identity key carries no
temporal meaning and cannot be decoded back into a date — the producer's
source date carried on the APDR at queue time (*carried_partition_date*,
set only for ``IdentityMapper``) is returned instead.

When temporal mappers were present but produced no usable anchor — they
disagreed (a misconfiguration, e.g. assets mapping the same key under
different timezones) or one raised — the conflict/error is logged and
``None`` is returned. The carried date is deliberately *not* substituted
here: stamping it would mask the logged suppression. A broken mapper must
not crash the scheduler tick.
"""
anchors: set[datetime] = set()
try:
Expand All @@ -2086,7 +2091,12 @@ def _resolve_partition_date(
return None

if not anchors:
return None
# No temporal mapper contributed an anchor (e.g. an all-IdentityMapper feed),
# so fall back to the date carried on the APDR. A partitioned consumer's feeding
# assets are expected to agree on the partition's datetime; when a temporal mapper
# *does* resolve an anchor it takes precedence over the carried identity date,
# since the key is the authoritative source the scheduler can re-derive.
return carried_partition_date
Comment thread
nathadfield marked this conversation as resolved.
if len(anchors) > 1:
self.log.warning(
"Upstream partition mappers resolved conflicting partition_date values for the same "
Expand Down Expand Up @@ -2288,6 +2298,7 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st
asset_infos=asset_info_per_apdr[apdr.id].values(),
partition_key=apdr.partition_key,
dag_id=apdr.target_dag_id,
carried_partition_date=apdr.partition_date,
)
dag_run = dag.create_dagrun(
run_id=DagRun.generate_run_id(
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/src/airflow/listeners/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import attrs

if TYPE_CHECKING:
from datetime import datetime

from pydantic import JsonValue

from airflow.serialization.definitions.assets import SerializedAsset, SerializedAssetAlias
Expand All @@ -40,3 +42,4 @@ class AssetEvent:
source_map_index: int | None
source_aliases: list[SerializedAssetAlias]
partition_key: str | None
partition_date: datetime | None = None
Loading
Loading