From c5efd9e79b456f2b8ee7aeac52d2b576fa2d3daa Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 17 Feb 2026 18:09:07 +0000 Subject: [PATCH] Fix `DepContext` mutation leak and restore `reschedule-mode` guard Two related fixes for issues introduced in #59604: 1. `are_dependencies_met()` mutated the caller's dep_context.deps set in-place when adding ReadyToRescheduleDep for UP_FOR_RESCHEDULE TIs. Since the scheduler shares one DepContext across all TIs in a loop, this permanently leaked the dep into unrelated TIs. Fix: use `attrs.evolve` to create a new DepContext instead. 2. `ReadyToRescheduleDep` lost its fast-exit guard for non-reschedule tasks in NONE state. Without it, every task hit the `task_reschedule` table on each scheduling loop. Fix: restore the guard that short-circuits for non-reschedule, non-mapped tasks in NONE state while still honoring reschedule_date for tasks explicitly in UP_FOR_RESCHEDULE state. --- .../src/airflow/models/taskinstance.py | 15 ++++++------ .../ti_deps/deps/ready_to_reschedule.py | 8 +++++++ .../tests/unit/models/test_taskinstance.py | 23 +++++++++++++++++++ .../deps/test_ready_to_reschedule_dep.py | 8 +++++++ 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 25168a41c01c4..475cbd7ae68fb 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -942,16 +942,15 @@ def are_dependencies_met( """ dep_context = dep_context or DepContext() if self.state == TaskInstanceState.UP_FOR_RESCHEDULE: - # This DepContext is used when a task instance is in UP_FOR_RESCHEDULE state. - # # Tasks can be put into UP_FOR_RESCHEDULE by the task runner itself (e.g. when - # the worker cannot load the Dag or task). In this case, the scheduler must respect - # the task instance's reschedule_date before scheduling it again. + # the worker cannot load the DAG or task). The scheduler must respect the + # reschedule_date before scheduling it again. # - # ReadyToRescheduleDep is the only dependency that enforces this time-based gating. - # We therefore extend the normal scheduling dependency set with it, instead of - # modifying the global scheduler dependencies. - dep_context.deps.add(ReadyToRescheduleDep()) + # We use attrs.evolve to create a *new* DepContext with ReadyToRescheduleDep added, + # instead of mutating the caller's dep_context.deps set in-place. The same + # dep_context is shared across all TIs in a scheduler loop, so mutating it would + # permanently leak the dep into subsequent, unrelated TIs. + dep_context = attrs.evolve(dep_context, deps=dep_context.deps | {ReadyToRescheduleDep()}) failed = False verbose_aware_logger = self.log.info if verbose else self.log.debug for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session): diff --git a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py index feadd5587c3b0..6f611c43d6129 100644 --- a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py +++ b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py @@ -58,6 +58,14 @@ def _get_dep_statuses( This dependency fails if the latest reschedule request's reschedule date is still in the future. """ + # Fast-exit for non-reschedule tasks in NONE state. When the task is explicitly + # UP_FOR_RESCHEDULE we *always* check TaskReschedule regardless of operator type + # (e.g. startup/DAG-load rescheduling). For NONE-state tasks, only reschedule-mode + # sensors (and mapped tasks whose reschedule attr is unknown) need the DB query. + if ti.state is None and ti.map_index < 0 and not getattr(ti.task, "reschedule", False): + yield self._passing_status(reason="Task is not in reschedule mode.") + return + if dep_context.ignore_in_reschedule_period: yield self._passing_status( reason="The context specified that being in a reschedule period was permitted." diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 5ea9184a8b3db..a013b09cdb0c8 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -1337,6 +1337,29 @@ def test_respects_prev_dagrun_dep(self, dag_maker, session): ): assert ti.are_dependencies_met() + def test_are_dependencies_met_does_not_mutate_shared_dep_context(self, dag_maker, session): + """Verify that calling are_dependencies_met on an UP_FOR_RESCHEDULE TI does not + mutate the caller's DepContext.deps set. The scheduler shares one DepContext across + all TIs in a loop, so mutation would leak ReadyToRescheduleDep into unrelated TIs.""" + with dag_maker("test_depctx_no_mutation", serialized=True): + EmptyOperator(task_id="t") + + dr = dag_maker.create_dagrun(session=session) + ti = dr.get_task_instance(task_id="t", session=session) + ti.state = TaskInstanceState.UP_FOR_RESCHEDULE + session.merge(ti) + session.flush() + + dep_context = DepContext(deps=RUNNING_DEPS) + original_deps = dep_context.deps.copy() + + ti.task = dr.dag.task_dict[ti.task_id] + ti.are_dependencies_met(dep_context=dep_context, session=session) + + assert dep_context.deps == original_deps, ( + "DepContext.deps was mutated — ReadyToRescheduleDep leaked into the shared set" + ) + @pytest.mark.parametrize( ("downstream_ti_state", "expected_are_dependents_done"), [ diff --git a/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py b/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py index 27d8f4674d5f9..04f13388a5f54 100644 --- a/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py +++ b/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py @@ -107,6 +107,14 @@ def test_should_pass_if_not_in_none_state(self, not_expected_tr_db_call): ti = self._get_task_instance(State.UP_FOR_RETRY) assert ReadyToRescheduleDep().is_met(ti=ti) + def test_should_pass_without_db_query_for_non_reschedule_task_in_none_state( + self, not_expected_tr_db_call + ): + """Non-reschedule, non-mapped tasks in NONE state should short-circuit without a DB query.""" + ti = self._get_task_instance(State.NONE) + ti.task.reschedule = False + assert ReadyToRescheduleDep().is_met(ti=ti) + def test_should_pass_if_no_reschedule_record_exists(self): ti = self._get_task_instance(State.NONE) assert ReadyToRescheduleDep().is_met(ti=ti)