diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index eba3c27831123..47eba3ed93d7a 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -326,12 +326,21 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - query = query.limit(max_tis) - task_instances_to_examine: list[TI] = with_row_locks( - query, - of=TI, - session=session, - **skip_locked(session=session), - ).all() + timer = Stats.timer("scheduler.critical_section_query_duration") + timer.start() + + try: + task_instances_to_examine: list[TI] = with_row_locks( + query, + of=TI, + session=session, + **skip_locked(session=session), + ).all() + timer.stop(send=True) + except OperationalError as e: + timer.stop(send=False) + raise e + # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything. # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine)) diff --git a/docs/apache-airflow/logging-monitoring/metrics.rst b/docs/apache-airflow/logging-monitoring/metrics.rst index e81e10cab255b..e37c5aa785458 100644 --- a/docs/apache-airflow/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/logging-monitoring/metrics.rst @@ -161,6 +161,7 @@ Name Description start date and the actual DagRun start date ``scheduler.critical_section_duration`` Milliseconds spent in the critical section of scheduler loop -- only a single scheduler can enter this loop at a time +``scheduler.critical_section_query_duration`` Milliseconds spent running the critical section task instance query ``scheduler.scheduler_loop_duration`` Milliseconds spent running one scheduler loop ``dagrun..first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start ``collect_db_dags`` Milliseconds taken for fetching all Serialized Dags from DB