Skip to content

TaskInstance.get_task_instance ignores dag_id arg, crashes EdgeExecutor scheduler loop with MultipleResultsFound #65044

Description

@laurentpellegrino

Under which category would you file this issue?

Airflow Core

Apache Airflow version

3.2.0

What happened and how to reproduce it?

TaskInstance.get_task_instance accepts a required dag_id argument but does not include it in the query's filter_by. The query filters only by (run_id, task_id, map_index). Any caller that passes dag_id expecting disambiguation silently gets the wrong row when the triple is unique across DAGs, and gets sqlalchemy.exc.MultipleResultsFound when it isn't.

airflow-core/src/airflow/models/taskinstance.py on main, L784–L807:

@classmethod
@provide_session
def get_task_instance(
    cls,
    dag_id: str,
    run_id: str,
    task_id: str,
    map_index: int,
    lock_for_update: bool = False,
    session: Session = NEW_SESSION,
) -> TaskInstance | None:
    query = (
        select(TaskInstance)
        .options(lazyload(TaskInstance.dag_run))
        .filter_by(
            run_id=run_id,
            task_id=task_id,
            map_index=map_index,
        )
    )

    if lock_for_update:
        for attempt in run_with_db_retries(logger=cls.logger()):
            with attempt:
                return session.execute(query.with_for_update()).scalar_one_or_none()
    else:
        return session.execute(query).scalar_one_or_none()

dag_id is unused.

Real-world impact: permanent EdgeExecutor scheduler crash loop

EdgeExecutor._update_orphaned_jobs (providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py on main, L189, crashing call at L203) calls this on every scheduler sync for every lifeless EdgeJobModel row:

for job in lifeless_jobs:
    ti = TaskInstance.get_task_instance(
        dag_id=job.dag_id,
        run_id=job.run_id,
        task_id=job.task_id,
        map_index=job.map_index,
        session=session,
    )
    job.state = ti.state if ti and ti.state else TaskInstanceState.REMOVED
    ...

If any orphaned edge_job row has a (run_id, task_id, map_index) triple shared with a TaskInstance in a different DAG, this query raises MultipleResultsFound. The exception propagates up _update_orphaned_jobsEdgeExecutor.syncexecutor.heartbeat()_run_scheduler_loop, the scheduler process exits, its supervisor (Kubernetes, systemd, etc.) restarts it, the exact same stale edge_job row is picked up on the next sync, scheduler crashes again, in a permanent crash loop. The scheduler never dispatches another queued task to the edge worker until the offending row is manually removed from the metadata DB.

Triggering the collision is easy in practice:

  • Multiple DAGs on the same cron (e.g. @daily) → identical scheduled__<logical_date> run_ids
  • Shared generic task_ids across those DAGs (done, cleanup, notify, end, start, …)
  • Same map_index (typically -1 for non-mapped tasks)

Any edge worker event that leaves a row stuck in state=RUNNING past [scheduler] task_instance_heartbeat_timeout (default 300s), whether from a worker OOM, a SIGKILL, a network blip on the edge API, or a missed heartbeat while the edge_job completion callback was in flight, is enough to arm the landmine. The scheduler then crashes on its next sync.

Traceback

Traceback (most recent call last):
  File ".../airflow/cli/commands/scheduler_command.py", line 48, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File ".../airflow/jobs/job.py", line 355, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File ".../airflow/jobs/job.py", line 384, in execute_job
    ret = execute_callable()
  File ".../airflow/jobs/scheduler_job_runner.py", line 1463, in _execute
    self._run_scheduler_loop()
  File ".../airflow/jobs/scheduler_job_runner.py", line 1610, in _run_scheduler_loop
    executor.heartbeat()
  File ".../airflow/executors/base_executor.py", line 310, in heartbeat
    self.sync()
  File ".../airflow/providers/edge3/executors/edge_executor.py", line 312, in sync
    orphaned = self._update_orphaned_jobs(session)
  File ".../airflow/providers/edge3/executors/edge_executor.py", line 203, in _update_orphaned_jobs
    ti = TaskInstance.get_task_instance(
        dag_id=job.dag_id,
        run_id=job.run_id,
        task_id=job.task_id,
        map_index=job.map_index,
        session=session,
    )
  File ".../airflow/models/taskinstance.py", line 806, in get_task_instance
    return session.execute(query).scalar_one_or_none()
  File ".../sqlalchemy/engine/result.py", line 1504, in scalar_one_or_none
    return self._only_one_row(raise_for_second_row=True, raise_for_none=False, scalar=True)
  File ".../sqlalchemy/engine/result.py", line 825, in _only_one_row
    raise exc.MultipleResultsFound(...)
sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one or none was required

Reproducer

Two DAGs on the same schedule sharing a task_id:

from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator

for name in ("dag_a", "dag_b"):
    with DAG(name, start_date=datetime(2026, 1, 1), schedule="@daily", catchup=False):
        EmptyOperator(task_id="done")

Steps:

  1. Configure the EdgeExecutor and run an edge worker.
  2. Trigger both DAGs so each has a TaskInstance for (run_id='scheduled__<logical_date>', task_id='done', map_index=-1).
  3. Leave an edge_job row orphaned: state=running, last_update older than [scheduler] task_instance_heartbeat_timeout (default 300s). Easiest way: kill -9 the edge worker mid-task, or drop the network between worker and API long enough to miss heartbeats.
  4. On the next scheduler sync, _update_orphaned_jobs picks up that row and calls get_task_instance. Because dag_id is ignored in the filter, the query returns both TIs → MultipleResultsFound → scheduler process exits.
  5. The supervisor restarts the scheduler; the same stale row is still in the DB; go to step 4.

Direct DB proof

I ran the exact call the scheduler makes, with the exact arguments, against the live metadata DB:

>>> from airflow.models.taskinstance import TaskInstance
>>> from airflow.utils.session import create_session
>>> with create_session() as s:
...     TaskInstance.get_task_instance(
...         dag_id='dag_a',
...         run_id='scheduled__2026-04-11T01:00:00+00:00',
...         task_id='done',
...         map_index=-1,
...         session=s,
...     )
sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one or none was required

The two matching rows:

dag_id task_id run_id state
dag_a done scheduled__2026-04-11T01:00:00+00:00 success
dag_b done scheduled__2026-04-11T01:00:00+00:00 success

Both TaskInstances had already finished successfully hours earlier. The edge_job rows were stale leftovers whose completion callback never updated edge_job.state away from running, which is what kept them eligible for orphan handling and re-triggered the crash on every sync cycle.

Workaround

  1. Manually delete the stale edge_job rows whose (run_id, task_id, map_index) triples collide across DAGs.
  2. Restart the scheduler.
  3. Rename colliding task_ids in DAGs on identical schedules to be DAG-unique (done<dag_name>_done, etc.) so the collision cannot reoccur while the core bug is unfixed.

What you think should happen instead?

TaskInstance.get_task_instance should include dag_id in its filter. The parameter is part of the method signature, it's required (not optional), and every call site passes it expecting disambiguation. The current behavior is silently wrong: for non-colliding data it returns the "right" row by accident, and for colliding data it raises MultipleResultsFound and surfaces as an exception in whichever caller happens to trigger it first.

Minimal fix in airflow-core/src/airflow/models/taskinstance.py:

query = (
    select(TaskInstance)
    .options(lazyload(TaskInstance.dag_run))
    .filter_by(
        dag_id=dag_id,
        run_id=run_id,
        task_id=task_id,
        map_index=map_index,
    )
)

Other call sites of get_task_instance that pass dag_id under the same assumption should be audited. Any place that pre-filters a set of candidate jobs by dag_id and then looks up their TIs is latently exposed to the same class of bug (silent wrong row in the happy path, crash in the collision path).

Independent defensive hardening in providers/edge3

Even with the core fix merged, the scheduler loop should not be killable by a single poisoned edge_job row. A stray database inconsistency, a future provider regression, or an unrelated IntegrityError/DataError on one row shouldn't take down the whole scheduler. The loss of the entire dispatch pipeline is wildly disproportionate to the blast radius of one stale job.

Wrap the per-row lookup in EdgeExecutor._update_orphaned_jobs (providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py on main, L203) in a try/except, log+skip on failure, and mark the offending row as REMOVED so it doesn't come back on the next sync:

from sqlalchemy.exc import MultipleResultsFound, SQLAlchemyError

for job in lifeless_jobs:
    try:
        ti = TaskInstance.get_task_instance(
            dag_id=job.dag_id,
            run_id=job.run_id,
            task_id=job.task_id,
            map_index=job.map_index,
            session=session,
        )
    except (MultipleResultsFound, SQLAlchemyError):
        self.log.exception(
            "Failed to resolve TaskInstance for orphaned edge_job "
            "(dag_id=%s task_id=%s run_id=%s map_index=%s); marking as REMOVED",
            job.dag_id, job.task_id, job.run_id, job.map_index,
        )
        job.state = TaskInstanceState.REMOVED
        continue

    job.state = ti.state if ti and ti.state else TaskInstanceState.REMOVED
    ...

Both changes are small and independently mergeable. The core fix closes the root cause; the edge3 hardening is the backstop that prevents a future variant of the same class of bug from causing another outage.

Operating System

Talos Linux v1.11.6 (Kubernetes 1.33.10)

Deployment

Other

Apache Airflow Provider(s)

edge3

Versions of Apache Airflow Providers

apache-airflow-providers-edge3==3.3.0

Official Helm Chart version

Not Applicable

Kubernetes Version

No response

Helm Chart configuration

Not Applicable

Docker Image customizations

Base image: apache/airflow:3.2.0
Added packages: elaunira-airflow, elaunira-airflow-providers-r2index, elaunira-r2index, openplanetdata-airflow
No modifications to Airflow core or the edge3 provider.

Anything else?

Frequency: deterministic once the DB holds a poisoned edge_job row. Our scheduler accumulated 75 restarts in ~9 hours and never recovered on its own. Workaround was to delete the 4 stale edge_job rows whose (run_id, task_id, map_index) triples collided across DAGs, then restart the scheduler pod. Renaming shared task_ids (done, cleanup) to DAG-unique names prevents recurrence.

Collision surface is broader than it looks: any two DAGs on the same cron schedule that share a task_id will produce colliding TIs on every run. Generic names like done, cleanup, notify, start, end are common in template DAGs.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetpriority:criticalShowstopper bug that should be patched immediately

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions