Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,16 +942,15 @@ def are_dependencies_met(
"""
dep_context = dep_context or DepContext()
if self.state == TaskInstanceState.UP_FOR_RESCHEDULE:
# This DepContext is used when a task instance is in UP_FOR_RESCHEDULE state.
#
# Tasks can be put into UP_FOR_RESCHEDULE by the task runner itself (e.g. when
# the worker cannot load the Dag or task). In this case, the scheduler must respect
# the task instance's reschedule_date before scheduling it again.
# the worker cannot load the DAG or task). The scheduler must respect the
# reschedule_date before scheduling it again.
#
# ReadyToRescheduleDep is the only dependency that enforces this time-based gating.
# We therefore extend the normal scheduling dependency set with it, instead of
# modifying the global scheduler dependencies.
dep_context.deps.add(ReadyToRescheduleDep())
# We use attrs.evolve to create a *new* DepContext with ReadyToRescheduleDep added,
# instead of mutating the caller's dep_context.deps set in-place. The same
# dep_context is shared across all TIs in a scheduler loop, so mutating it would
# permanently leak the dep into subsequent, unrelated TIs.
dep_context = attrs.evolve(dep_context, deps=dep_context.deps | {ReadyToRescheduleDep()})
failed = False
verbose_aware_logger = self.log.info if verbose else self.log.debug
for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session):
Expand Down
8 changes: 8 additions & 0 deletions airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ def _get_dep_statuses(
This dependency fails if the latest reschedule request's reschedule date is still
in the future.
"""
# Fast-exit for non-reschedule tasks in NONE state. When the task is explicitly
# UP_FOR_RESCHEDULE we *always* check TaskReschedule regardless of operator type
# (e.g. startup/DAG-load rescheduling). For NONE-state tasks, only reschedule-mode
# sensors (and mapped tasks whose reschedule attr is unknown) need the DB query.
if ti.state is None and ti.map_index < 0 and not getattr(ti.task, "reschedule", False):
yield self._passing_status(reason="Task is not in reschedule mode.")
return

if dep_context.ignore_in_reschedule_period:
yield self._passing_status(
reason="The context specified that being in a reschedule period was permitted."
Expand Down
23 changes: 23 additions & 0 deletions airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,29 @@ def test_respects_prev_dagrun_dep(self, dag_maker, session):
):
assert ti.are_dependencies_met()

def test_are_dependencies_met_does_not_mutate_shared_dep_context(self, dag_maker, session):
"""Verify that calling are_dependencies_met on an UP_FOR_RESCHEDULE TI does not
mutate the caller's DepContext.deps set. The scheduler shares one DepContext across
all TIs in a loop, so mutation would leak ReadyToRescheduleDep into unrelated TIs."""
with dag_maker("test_depctx_no_mutation", serialized=True):
EmptyOperator(task_id="t")

dr = dag_maker.create_dagrun(session=session)
ti = dr.get_task_instance(task_id="t", session=session)
ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
session.merge(ti)
session.flush()

dep_context = DepContext(deps=RUNNING_DEPS)
original_deps = dep_context.deps.copy()

ti.task = dr.dag.task_dict[ti.task_id]
ti.are_dependencies_met(dep_context=dep_context, session=session)

assert dep_context.deps == original_deps, (
"DepContext.deps was mutated — ReadyToRescheduleDep leaked into the shared set"
)

@pytest.mark.parametrize(
("downstream_ti_state", "expected_are_dependents_done"),
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ def test_should_pass_if_not_in_none_state(self, not_expected_tr_db_call):
ti = self._get_task_instance(State.UP_FOR_RETRY)
assert ReadyToRescheduleDep().is_met(ti=ti)

def test_should_pass_without_db_query_for_non_reschedule_task_in_none_state(
self, not_expected_tr_db_call
):
"""Non-reschedule, non-mapped tasks in NONE state should short-circuit without a DB query."""
ti = self._get_task_instance(State.NONE)
ti.task.reschedule = False
assert ReadyToRescheduleDep().is_met(ti=ti)

def test_should_pass_if_no_reschedule_record_exists(self):
ti = self._get_task_instance(State.NONE)
assert ReadyToRescheduleDep().is_met(ti=ti)
Expand Down