Skip to content

Removes the next_dagrun_create_after reset#21214

Closed
avkirilishin wants to merge 5 commits into
apache:mainfrom
avkirilishin:main
Closed

Removes the next_dagrun_create_after reset#21214
avkirilishin wants to merge 5 commits into
apache:mainfrom
avkirilishin:main

Conversation

@avkirilishin

@avkirilishin avkirilishin commented Jan 30, 2022

Copy link
Copy Markdown
Contributor

closes: #21083
closes: #19901

The next_dagrun_create_after reset causes a problem (details in the issue).
But now we check _should_update_dag_next_dagruns before calculate_dagrun_date_fields call. So we can remove the reset.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@ashb

ashb commented Jan 31, 2022

Copy link
Copy Markdown
Member

This needs more thought/a deeper review than I can give right now.

I think by removing that then we might have undone the fix where we introduced this change in the first place

@avkirilishin

Copy link
Copy Markdown
Contributor Author

I think by removing that then we might have undone the fix where we introduced this change in the first place

I checked this. Let's reconstruct the sequence of events.

@ashb ashb requested a review from ephraimbuddy February 3, 2022 10:37

@ephraimbuddy ephraimbuddy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have a test that creates a dagrun and while the dagrun is still running, the scheduler loops again and should not create more dagrun

Comment thread tests/jobs/test_scheduler_job.py Outdated

@ephraimbuddy ephraimbuddy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works fine in that it doesn't create more runs when max_active_runs is reached but it didn't resolve the linked issue.
Can you work on fixing the linked issue too as I like that next_dagrun_create_after is no longer nullified

@avkirilishin

Copy link
Copy Markdown
Contributor Author

but it didn't resolve the linked issue

This fix resolves the linked issue. The new test test_runs_are_created_after_max_active_runs_was_reached verifies this.

I have to explain how it works:

  1. The value of max_active_runs has been reached, and the value of next_dagrun_create_after has been reset:
    dag_model.next_dagrun_create_after = None
  2. Due to filtering next_dagrun_create_after, DagModel.dags_needing_dagruns (referred to in SchedulerJob._create_dagruns_for_dags) cannot select Dags to create new runs. Comparison predicates with NULL values always return UNKNOWN (==FALSE in that case).

    airflow/airflow/models/dag.py

    Lines 2866 to 2876 in ab762a5

    query = (
    session.query(cls)
    .filter(
    cls.is_paused == expression.false(),
    cls.is_active == expression.true(),
    cls.has_import_errors == expression.false(),
    cls.next_dagrun_create_after <= func.now(),
    )
    .order_by(cls.next_dagrun_create_after)
    .limit(cls.NUM_DAGS_PER_DAGRUN_QUERY)
    )
  3. Dag sticks and runs are no longer being created
  4. If AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL is low, the scheduler processes dag's file and calls Dag.bulk_write_to_db, which calls calculate_dagrun_date_fields and calculates the new next_dagrun_create_after value

    airflow/airflow/models/dag.py

    Lines 2441 to 2444 in ab762a5

    if num_active_runs.get(dag.dag_id, 0) >= orm_dag.max_active_runs:
    orm_dag.next_dagrun_create_after = None
    else:
    orm_dag.calculate_dagrun_date_fields(dag, data_interval)
  5. But if AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL is high, nobody calculates next_dagrun_create_after
  6. This merge request corrects situation by removing the next_dagrun_create_after nullify

next_dagrun_create_after has been reset:

Screenshot 2022-02-04 at 21 51 39

min_file_process_interval is 180. After 3 minutes (16:21:36 - 16:24:37), the scheduler processes dag's file and calculates the new next_dagrun_create_after value:

Screenshot 2022-02-04 at 21 56 18

There is a gap in launches:

image

Pay attention to the same last_parsed_time and Queued At in the last two screenshots.

@avkirilishin

Copy link
Copy Markdown
Contributor Author

I did load testing today, and it turned out that now the scheduler consumes a lot more CPU, since it constantly selects all the necessary DAGs in the DagModel.dags_needing_dagruns function. Maybe I can add a dict with the time of the last check? But it won't work for AIRFLOW__SCHEDULER__NUM_RUNS = 1

@ephraimbuddy

Copy link
Copy Markdown
Contributor

@avkirilishin , Can you test with AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=86400

@avkirilishin

Copy link
Copy Markdown
Contributor Author

Can you test with AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=86400

@ephraimbuddy

image

image

image

image

image

@ephraimbuddy

ephraimbuddy commented Feb 6, 2022

Copy link
Copy Markdown
Contributor

Hi @avkirilishin, I debugged this issue and it turned out that the problem is not that the next_dagrun_create_after is being nullified. The problem is that this line

active_runs = dag.get_num_active_runs(only_running=False, session=session)

is returning a wrong figure. You can verify this.

I have not been able to pin out why it's returning a wrong figure but If you change this line of code:

if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):

to

if self._should_update_dag_next_dagruns(dag, dag_model, active_runs-1):

The problem will be gone.

@avkirilishin

avkirilishin commented Feb 7, 2022

Copy link
Copy Markdown
Contributor Author

I have not been able to pin out why it's returning a wrong figure but If you change this line of code:
The problem will be gone.

@ephraimbuddy I tested your suggestion. The problem has not gone away. DAG sticks after the first launch.

image

image

image

@ephraimbuddy

ephraimbuddy commented Feb 7, 2022

Copy link
Copy Markdown
Contributor

I tested your suggestion. The problem has not gone away. DAG sticks after the first launch.

Sorry, I have updated my comment, I gave the wrong permalink. You should change line 1088

if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):

@avkirilishin

Copy link
Copy Markdown
Contributor Author

I tested your suggestion. The problem has not gone away. DAG sticks after the first launch.

Sorry, I have updated my comment, I gave the wrong permalink. You should change line 1088

if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):

This is a good change, and I think we need to apply it. But only it won't work if one sets the DagRun state manually (Success or Failed).

@ephraimbuddy

Copy link
Copy Markdown
Contributor

This is a good change, and I think we need to apply it. But only it won't work if one sets the DagRun state manually (Success or Failed).

It worked for me. Can you tests too?

@avkirilishin

Copy link
Copy Markdown
Contributor Author

This is a good change, and I think we need to apply it. But only it won't work if one sets the DagRun state manually (Success or Failed).

It worked for me. Can you tests too?

With main plus "active_runs-1" or with this branch plus "active_runs-1"?

Main plus "active_runs-1" - the manually changed DagRun state leads to a hang for me (with AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=86400).
This branch plus "active_runs-1" - the manually changed DagRun state works fine.

@ephraimbuddy

Copy link
Copy Markdown
Contributor

With main plus "active_runs-1" or with this branch plus "active_runs-1"?

Main plus "active_runs-1" - the manually changed DagRun state leads to a hang for me (with AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=86400). This branch plus "active_runs-1" - the manually changed DagRun state works fine.

It still works for me in main, not sure why. I will suggest we implement active_runs-1 and look at fixing it when the state is manually changed.
We can also create a boolean column on DagModel and instead of nullifying next_dagrun_create_after, we nullify the column. That way, we don't need to calculate the column again. We would just set the new column when max_active_runs is reached and unset it when dagrun finishes running or are marked manually as failed.
That together with active_runs-1 at lines 1088 and 1062 will be a better fix

@avkirilishin

avkirilishin commented Feb 7, 2022

Copy link
Copy Markdown
Contributor Author

But we can (and we do it) calculate the current running DagRun count at any time. Why do we need a new column?

Maybe this way:

  • implement active_runs-1
  • remove nullifying next_dagrun_create_after
  • and a little later I will redo the load testing, maybe my previous conclusions were wrong

?

@ephraimbuddy

Copy link
Copy Markdown
Contributor

But we can (and we do it) calculate the current running DagRun count at any time. Why do we need a new column?

Maybe this way:

  • implement active_runs-1
  • remove nullifying next_dagrun_create_after
  • and a little later I will redo the load testing, maybe my previous conclusions were wrong

?

I want to avoid https://github.com/apache/airflow/pull/21214/files#diff-62c8e300ee91e0d59f81e0ea5d30834f04db71ae74f2e155a10b51056b00b59bR2874. That query is run a lot, the conditional check and providing a list of IDs to look into won't be efficient in large deployments

@avkirilishin

Copy link
Copy Markdown
Contributor Author

Is there anything else I can do? And whether to close this merge request or not?

@ephraimbuddy

Copy link
Copy Markdown
Contributor

Is there anything else I can do? And whether to close this merge request or not?

If you don't mind you can apply the active_runs-1 change and leave out the addition of new column for now until this issue #16078 is resolved.

@ephraimbuddy

Copy link
Copy Markdown
Contributor

Is there anything else I can do? And whether to close this merge request or not?

If you don't mind you can apply the active_runs-1 change and leave out the addition of new column for now until this issue #16078 is resolved.

You can do it in a new PR if you want

@ephraimbuddy

Copy link
Copy Markdown
Contributor

I have added the PR: #21413

@avkirilishin

avkirilishin commented Feb 8, 2022

Copy link
Copy Markdown
Contributor Author

Can we add test_runs_are_created_after_max_active_runs_was_reached to #21413 from this merge request?

def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, session):
"""
Test that when creating runs once max_active_runs is reached the runs does not stick
"""
with conf_vars({('scheduler', 'min_active_runs_check_interval'): '0'}):
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor(do_update=True)
self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
with dag_maker(max_active_runs=1, session=session) as dag:
# Need to use something that doesn't immediately get marked as success by the scheduler
BashOperator(task_id='task', bash_command='true')
dag_run = dag_maker.create_dagrun(
state=State.RUNNING,
session=session,
)
# Reach max_active_runs
for _ in range(3):
self.scheduler_job._do_scheduling(session)
# Complete dagrun
# Add dag_run back in to the session (_do_scheduling does an expunge_all)
dag_run = session.merge(dag_run)
session.refresh(dag_run)
dag_run.get_task_instance(task_id='task', session=session).state = State.SUCCESS
# create new run
for _ in range(3):
self.scheduler_job._do_scheduling(session)
# Assert that new runs has created
dag_runs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(dag_runs) == 2

I will remove min_active_runs_check_interval from it.

@ephraimbuddy

Copy link
Copy Markdown
Contributor

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

A high value of min_file_process_interval & max_active_runs=1 causes stuck dags No scheduling when max_active_runs is 1

3 participants