Reduce DB load incurred by Stale DAG deactivation#21399
Conversation
There was a problem hiding this comment.
This feels like the wrong timeout to use -- processor timeout is how long each file should take to process:
# How long before timing out a DagFileProcessor, which processes a dag file
dag_file_processor_timeout = 50
But that doesn't mean that every dag file should be "reparsed" every 50 seconds
There was a problem hiding this comment.
So there's actually a reason for this.
We're comparing the parse time as reported by the processor manager to the last_parsed_time as seen in the DAG table, however these values are taken independently:
DagModel.last_parsed_time is decided here, when the DAG is written to the DB:
Line 2427 in 960f573
whereas the DagParsingStat.last_finish_time is decided when the file processor finishes:
https://github.com/apache/airflow/blob/dbe723da95143f6d33e5d2594bc2017c4164e687/airflow/dag_processing/manager.py#L915
So because of this, DagParsingStat.last_finish_time is always going to be slightly later than DagModel.last_parsed_time (typically on the order of milliseconds). Thus in order to be certain that the file was processed more recently than the DAG was last observed we can't directly compare the two timestamps and instead have to do something like:
DagParsingStat.last_finish_time > (SOME_BUFFER + DagModel.last_parsed_time)
I chose to use the processor_timeout here because it represents the absolute upper bound on the difference between DagParsingStat.last_finish_time and DagModel.last_parsed_time, and thus we favour false negatives (not deactivating a DAG which is actually gone) over false positives (incorrectly deactivating a DAG because the file processor was blocking for a few seconds after updating the DB)
Let me know what you think - from my testing in breeze this approach appears to work reliably, but it also adds a lot of complexity.
There was a problem hiding this comment.
Ohhhhh! Right yeah that makes sense. Could you try and distil some of this down to a short comment?
There was a problem hiding this comment.
Yup, will do (probably won't have time to clean up this PR until next week though)
There was a problem hiding this comment.
Setting this value to max caused issues due to the following line of code, which led to an overflow:
and (dag.last_parsed_time + self._processor_timeout) < last_parsed[dag.fileloc]3efe1a3 to
2fef453
Compare
|
OK, this is now ready for a proper review - I will patch this into our production 2.2.2 container sometime this week and confirm that it fixes the original performance issue while still managing to clean up stale DAGs. |
|
@SamWheating Did you manage to get this running in prod? |
|
Not yet, I've built a patched version of 2.2.2 with this change but haven't had a chance to roll it out in any large-scale environments. Will do it tomorrow and report back wednesday. |
|
Ok, I have created a patched version of Airflow 2.2.2 with this change and deployed it in our prod-scale staging environment (Airflow 2.2.2). I can confirm that:
|
potiuk
left a comment
There was a problem hiding this comment.
It looks really cool and I think it might handle a lot of stability issues resulting from some synchronisation solutions that cause some intermitttent instabilities of the filesystem and some dynamic dag generation scenarios.
I think it needs a few more eyes though.
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
|
@jedcunningham I've marked this for possible inclusion in 2.2.5 |
2fef453 to
2aac5c5
Compare
2aac5c5 to
aef9f8c
Compare
Deactivating stale DAGs periodically in bulk By moving this logic into the DagFileProcessorManager and running it across all processed file periodically, we can prevent the use of un-indexed queries. The basic logic is that we can look at the last processed time of a file (for a given processor) and compare that to the last_parsed_time of an entry in the dag table. If the file has been processed significantly more recently than the DAG has been updated, then its safe to assume that the DAG is missing and can be marked inactive. (cherry picked from commit f309ea7)
Deactivating stale DAGs periodically in bulk By moving this logic into the DagFileProcessorManager and running it across all processed file periodically, we can prevent the use of un-indexed queries. The basic logic is that we can look at the last processed time of a file (for a given processor) and compare that to the last_parsed_time of an entry in the dag table. If the file has been processed significantly more recently than the DAG has been updated, then its safe to assume that the DAG is missing and can be marked inactive. (cherry picked from commit f309ea7)
Deactivating stale DAGs periodically in bulk By moving this logic into the DagFileProcessorManager and running it across all processed file periodically, we can prevent the use of un-indexed queries. The basic logic is that we can look at the last processed time of a file (for a given processor) and compare that to the last_parsed_time of an entry in the dag table. If the file has been processed significantly more recently than the DAG has been updated, then its safe to assume that the DAG is missing and can be marked inactive. (cherry picked from commit f309ea7)
Deactivating stale DAGs periodically in bulk By moving this logic into the DagFileProcessorManager and running it across all processed file periodically, we can prevent the use of un-indexed queries. The basic logic is that we can look at the last processed time of a file (for a given processor) and compare that to the last_parsed_time of an entry in the dag table. If the file has been processed significantly more recently than the DAG has been updated, then its safe to assume that the DAG is missing and can be marked inactive. (cherry picked from commit f309ea7)
Deactivating stale DAGs periodically in bulk By moving this logic into the DagFileProcessorManager and running it across all processed file periodically, we can prevent the use of un-indexed queries. The basic logic is that we can look at the last processed time of a file (for a given processor) and compare that to the last_parsed_time of an entry in the dag table. If the file has been processed significantly more recently than the DAG has been updated, then its safe to assume that the DAG is missing and can be marked inactive. (cherry picked from commit f309ea7)
Deactivating stale DAGs periodically in bulk By moving this logic into the DagFileProcessorManager and running it across all processed file periodically, we can prevent the use of un-indexed queries. The basic logic is that we can look at the last processed time of a file (for a given processor) and compare that to the last_parsed_time of an entry in the dag table. If the file has been processed significantly more recently than the DAG has been updated, then its safe to assume that the DAG is missing and can be marked inactive. (cherry picked from commit f309ea7)
Re: #21397
By moving this logic into the
DagFileProcessorManagerand running it across all processed file periodically, we can prevent the use of un-indexed queries.The basic logic is that we can look at the last processed time of a file (for a given processor) and compare that to the
last_parsed_timeof an entry in thedagtable. If the file has been processed significantly more recently than the DAG has been updated, then its safe to assume that the DAG is missing and can be marked inactive.Todo: