Only mark SchedulerJobs as failed, not any jobs#19375
Conversation
In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have failed `scheduler_health_check_threshold` to failed, however a missing condition was allowing that timeout to apply to all jobs, not just SchedulerJobs. This is because polymorphic identity isn't included for `update()`: https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update So if we had any running LocalTaskJobs that, for whatever reason, aren't heartbeating faster than `scheduler_health_check_threshold`, their state gets set to failed and they subsequently exit with a log line similar to: State of this instance has been externally set to scheduled. Terminating instance. Note that the state it is set to can be different (e.g. queued or up_for_retry) simply depending on how quickly the scheduler has progressed that task_instance again.
kaxil
left a comment
There was a problem hiding this comment.
I can confirm it was running the following query before:
[2021-11-02 23:37:09,230] {base.py:727} INFO - BEGIN (implicit)
[2021-11-02 23:37:09,231] {base.py:1234} INFO - UPDATE job SET state=%(state)s WHERE job.state = %(state_1)s AND job.latest_heartbeat < %(latest_heartbeat_1)s
[2021-11-02 23:37:09,231] {base.py:1239} INFO - "\x1b[01m{'state': <TaskInstanceState.FAILED: 'failed'>, 'state_1': <TaskInstanceState.RUNNING: 'running'>, 'latest_heartbeat_1': datetime.datetime(2021, 11, 2, 23, 36, 59, 213724, tzinfo=Timezone('UTC'))}\x1b[22m"
and now runs:
[2021-11-02 23:39:30,548] {base.py:1234} INFO - UPDATE job SET state=%(state)s WHERE job.job_type = %(job_type_1)s AND job.state = %(state_1)s AND job.latest_heartbeat < %(latest_heartbeat_1)s
[2021-11-02 23:39:30,548] {base.py:1239} INFO - "\x1b[01m{'state': <TaskInstanceState.FAILED: 'failed'>, 'job_type_1': 'SchedulerJob', 'state_1': <TaskInstanceState.RUNNING: 'running'>, 'latest_heartbeat_1': datetime.datetime(2021, 11, 2, 23, 39, 20, 547621, tzinfo=Timezone('UTC'))}\x1b[22m"
I have also tested it locally
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
|
Good find - I think @ephraimbuddy and I had stumbled upon it sometime back but weren't sure that Polymorphic identity didn't apply to update statements - TIL. |
Yeah! Yesterday during our debugging session with Collin, it came up again. Good that it's now being fixed. Likely related to the many sigterms everyone is complaining about |
|
Hallelujah, great fix |
|
Yeah, shoutout to @ephraimbuddy for spotting this 🎉 |
|
🤦 |
In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have failed `scheduler_health_check_threshold` to failed, however a missing condition was allowing that timeout to apply to all jobs, not just SchedulerJobs. This is because polymorphic identity isn't included for `update()`: https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update So if we had any running LocalTaskJobs that, for whatever reason, aren't heartbeating faster than `scheduler_health_check_threshold`, their state gets set to failed and they subsequently exit with a log line similar to: State of this instance has been externally set to scheduled. Terminating instance. Note that the state it is set to can be different (e.g. queued or up_for_retry) simply depending on how quickly the scheduler has progressed that task_instance again. (cherry picked from commit 38d329b)
In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have failed `scheduler_health_check_threshold` to failed, however a missing condition was allowing that timeout to apply to all jobs, not just SchedulerJobs. This is because polymorphic identity isn't included for `update()`: https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update So if we had any running LocalTaskJobs that, for whatever reason, aren't heartbeating faster than `scheduler_health_check_threshold`, their state gets set to failed and they subsequently exit with a log line similar to: State of this instance has been externally set to scheduled. Terminating instance. Note that the state it is set to can be different (e.g. queued or up_for_retry) simply depending on how quickly the scheduler has progressed that task_instance again. (cherry picked from commit 38d329b)
In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have failed `scheduler_health_check_threshold` to failed, however a missing condition was allowing that timeout to apply to all jobs, not just SchedulerJobs. This is because polymorphic identity isn't included for `update()`: https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update So if we had any running LocalTaskJobs that, for whatever reason, aren't heartbeating faster than `scheduler_health_check_threshold`, their state gets set to failed and they subsequently exit with a log line similar to: State of this instance has been externally set to scheduled. Terminating instance. Note that the state it is set to can be different (e.g. queued or up_for_retry) simply depending on how quickly the scheduler has progressed that task_instance again. (cherry picked from commit 38d329b)
In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have failed `scheduler_health_check_threshold` to failed, however a missing condition was allowing that timeout to apply to all jobs, not just SchedulerJobs. This is because polymorphic identity isn't included for `update()`: https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update So if we had any running LocalTaskJobs that, for whatever reason, aren't heartbeating faster than `scheduler_health_check_threshold`, their state gets set to failed and they subsequently exit with a log line similar to: State of this instance has been externally set to scheduled. Terminating instance. Note that the state it is set to can be different (e.g. queued or up_for_retry) simply depending on how quickly the scheduler has progressed that task_instance again. (cherry picked from commit 38d329b) (cherry picked from commit fa0b998) (cherry picked from commit 2071544)
In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have failed `scheduler_health_check_threshold` to failed, however a missing condition was allowing that timeout to apply to all jobs, not just SchedulerJobs. This is because polymorphic identity isn't included for `update()`: https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update So if we had any running LocalTaskJobs that, for whatever reason, aren't heartbeating faster than `scheduler_health_check_threshold`, their state gets set to failed and they subsequently exit with a log line similar to: State of this instance has been externally set to scheduled. Terminating instance. Note that the state it is set to can be different (e.g. queued or up_for_retry) simply depending on how quickly the scheduler has progressed that task_instance again. (cherry picked from commit 38d329b) (cherry picked from commit fa0b998) (cherry picked from commit 2071544)
In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have failed `scheduler_health_check_threshold` to failed, however a missing condition was allowing that timeout to apply to all jobs, not just SchedulerJobs. This is because polymorphic identity isn't included for `update()`: https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update So if we had any running LocalTaskJobs that, for whatever reason, aren't heartbeating faster than `scheduler_health_check_threshold`, their state gets set to failed and they subsequently exit with a log line similar to: State of this instance has been externally set to scheduled. Terminating instance. Note that the state it is set to can be different (e.g. queued or up_for_retry) simply depending on how quickly the scheduler has progressed that task_instance again. (cherry picked from commit 38d329b) (cherry picked from commit fa0b998)
In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have failed `scheduler_health_check_threshold` to failed, however a missing condition was allowing that timeout to apply to all jobs, not just SchedulerJobs. This is because polymorphic identity isn't included for `update()`: https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update So if we had any running LocalTaskJobs that, for whatever reason, aren't heartbeating faster than `scheduler_health_check_threshold`, their state gets set to failed and they subsequently exit with a log line similar to: State of this instance has been externally set to scheduled. Terminating instance. Note that the state it is set to can be different (e.g. queued or up_for_retry) simply depending on how quickly the scheduler has progressed that task_instance again. (cherry picked from commit 38d329b) (cherry picked from commit fa0b998) (cherry picked from commit 2071544)
In
adopt_or_reset_orphaned_tasks, we set any SchedulerJobs that havefailed
scheduler_health_check_thresholdto failed, however a missingcondition was allowing that timeout to apply to all jobs, not just SchedulerJobs.
This is because polymorphic identity isn't included for
update():https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update
So if we had any running LocalTaskJobs that, for whatever reason, aren't
heartbeating faster than
scheduler_health_check_threshold, their stategets set to failed and they subsequently exit with a log line similar to:
Note that the state it is set to can be different (e.g. queued or
up_for_retry) simply depending on how quickly the scheduler has
progressed that task_instance again.
Closes: #16881
Closes: #16573
Related: #16023 (comment)
Might also fix #19277