From 1f90147ae038e971e3a7759e8e86f2ce54ac3b13 Mon Sep 17 00:00:00 2001 From: deepinsight coder Date: Fri, 10 Apr 2026 00:38:54 +0000 Subject: [PATCH 1/4] Fix scheduler crash when DAGs share task run IDs --- airflow-core/newsfragments/64957.bugfix.rst | 2 + .../src/airflow/models/taskinstance.py | 1 + .../tests/unit/models/test_taskinstance.py | 42 +++++++++++++++++++ 3 files changed, 45 insertions(+) create mode 100644 airflow-core/newsfragments/64957.bugfix.rst diff --git a/airflow-core/newsfragments/64957.bugfix.rst b/airflow-core/newsfragments/64957.bugfix.rst new file mode 100644 index 0000000000000..5ef19f7e7f6b4 --- /dev/null +++ b/airflow-core/newsfragments/64957.bugfix.rst @@ -0,0 +1,2 @@ +Fixed missing ``dag_id`` filter in ``TaskInstance.get_task_instance`` that could cause scheduler crashes +when multiple DAGs share the same ``task_id`` and ``run_id``. diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 964c345a3bf6c..6fb2b38696cf1 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -794,6 +794,7 @@ def get_task_instance( select(TaskInstance) .options(lazyload(TaskInstance.dag_run)) # lazy load dag run to avoid locking it .filter_by( + dag_id=dag_id, run_id=run_id, task_id=task_id, map_index=map_index, diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 9eba07daaa130..9b36b0502fedc 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -1321,6 +1321,48 @@ def do_something_else(i): assert completed == expect_completed assert ti.state == expect_state + def test_get_task_instance_filters_by_dag_id(self, dag_maker, session): + shared_run_id = "manual__shared_run_id" + task_id = "common_task" + + with dag_maker(dag_id="dag_one", session=session): + EmptyOperator(task_id=task_id) + dag_maker.create_dagrun( + session=session, + run_id=shared_run_id, + run_type=DagRunType.MANUAL, + logical_date=timezone.datetime(2024, 1, 1), + ) + + with dag_maker(dag_id="dag_two", session=session): + EmptyOperator(task_id=task_id) + dag_maker.create_dagrun( + session=session, + run_id=shared_run_id, + run_type=DagRunType.MANUAL, + logical_date=timezone.datetime(2024, 1, 2), + ) + + ti_one = TaskInstance.get_task_instance( + dag_id="dag_one", + run_id=shared_run_id, + task_id=task_id, + map_index=-1, + session=session, + ) + ti_two = TaskInstance.get_task_instance( + dag_id="dag_two", + run_id=shared_run_id, + task_id=task_id, + map_index=-1, + session=session, + ) + + assert ti_one is not None + assert ti_two is not None + assert ti_one.dag_id == "dag_one" + assert ti_two.dag_id == "dag_two" + def test_respects_prev_dagrun_dep(self, dag_maker, session): with dag_maker("test_respects_prev_dagrun_dep", serialized=True) as dag: EmptyOperator(task_id="t") From f30a3ba2caa98a59d1461775da5cff4eafaf1098 Mon Sep 17 00:00:00 2001 From: deepinsight coder Date: Fri, 10 Apr 2026 00:48:49 +0000 Subject: [PATCH 2/4] Fix PR checks for scheduler crash regression note --- airflow-core/newsfragments/64988.bugfix.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 airflow-core/newsfragments/64988.bugfix.rst diff --git a/airflow-core/newsfragments/64988.bugfix.rst b/airflow-core/newsfragments/64988.bugfix.rst new file mode 100644 index 0000000000000..5ef19f7e7f6b4 --- /dev/null +++ b/airflow-core/newsfragments/64988.bugfix.rst @@ -0,0 +1,2 @@ +Fixed missing ``dag_id`` filter in ``TaskInstance.get_task_instance`` that could cause scheduler crashes +when multiple DAGs share the same ``task_id`` and ``run_id``. From 58e725e95753934ecbcfaf76a5424495882aeeb9 Mon Sep 17 00:00:00 2001 From: deepinsight coder Date: Fri, 10 Apr 2026 00:49:45 +0000 Subject: [PATCH 3/4] Remove stale issue-numbered newsfragment from PR --- airflow-core/newsfragments/64957.bugfix.rst | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 airflow-core/newsfragments/64957.bugfix.rst diff --git a/airflow-core/newsfragments/64957.bugfix.rst b/airflow-core/newsfragments/64957.bugfix.rst deleted file mode 100644 index 5ef19f7e7f6b4..0000000000000 --- a/airflow-core/newsfragments/64957.bugfix.rst +++ /dev/null @@ -1,2 +0,0 @@ -Fixed missing ``dag_id`` filter in ``TaskInstance.get_task_instance`` that could cause scheduler crashes -when multiple DAGs share the same ``task_id`` and ``run_id``. From 4979c5ca1ebe9bcea7d93e5dbe191f2dc37a76f0 Mon Sep 17 00:00:00 2001 From: deepinsight coder Date: Fri, 10 Apr 2026 02:17:00 +0000 Subject: [PATCH 4/4] Fix TaskInstance PR newsfragment validation --- airflow-core/newsfragments/64988.bugfix.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow-core/newsfragments/64988.bugfix.rst b/airflow-core/newsfragments/64988.bugfix.rst index 5ef19f7e7f6b4..d7455f77d5383 100644 --- a/airflow-core/newsfragments/64988.bugfix.rst +++ b/airflow-core/newsfragments/64988.bugfix.rst @@ -1,2 +1 @@ -Fixed missing ``dag_id`` filter in ``TaskInstance.get_task_instance`` that could cause scheduler crashes -when multiple DAGs share the same ``task_id`` and ``run_id``. +Fixed missing ``dag_id`` filter in ``TaskInstance.get_task_instance`` that could cause scheduler crashes when multiple DAGs share the same ``task_id`` and ``run_id``.