Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,11 +608,13 @@ def update_state(
self.data_interval_end,
self.dag_hash,
)
session.flush()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall lgtm, should this be down with the merge though so it happens regardless of the ultimate state?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my initial thought but I don't know if there would be a regression when added there since it would apply to running dagruns as well. It use to have a commit down there, after merge but was removed here: 73b9163#diff-649fbbf224bab54417f03338c27d0fdb3c3336e53a522a13dfd9806c99f63137L377

cc: @ashb

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did move this down with merge and query count increased...If we are cool with it I will fix the tests


self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis)
self._emit_duration_stats_for_finished_state()

session.merge(self)
# We do not flush here for performance reasons(It increases queries count by +20)

return schedulable_tis, callback

Expand Down
35 changes: 35 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,41 @@ def test_queued_dagruns_stops_creating_when_max_active_is_reached(self, dag_make
assert session.query(DagRun.state).filter(DagRun.state == State.QUEUED).count() == 0
assert orm_dag.next_dagrun_create_after is None

def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, session):
"""
Test that when creating runs once max_active_runs is reached the runs does not stick
"""
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor(do_update=True)
self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)

with dag_maker(max_active_runs=1, session=session) as dag:
# Need to use something that doesn't immediately get marked as success by the scheduler
BashOperator(task_id='task', bash_command='true')

dag_run = dag_maker.create_dagrun(
state=State.RUNNING,
session=session,
)

# Reach max_active_runs
for _ in range(3):
self.scheduler_job._do_scheduling(session)

# Complete dagrun
# Add dag_run back in to the session (_do_scheduling does an expunge_all)
dag_run = session.merge(dag_run)
session.refresh(dag_run)
dag_run.get_task_instance(task_id='task', session=session).state = State.SUCCESS

# create new run
for _ in range(3):
self.scheduler_job._do_scheduling(session)

# Assert that new runs has created
dag_runs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(dag_runs) == 2

def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):
"""
Test if a a dagrun will not be scheduled if max_dag_runs
Expand Down