diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index d43e1fb508592..15977aceb81d6 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1870,6 +1870,14 @@ type: string example: ~ default: "30" + - name: deactivate_stale_dags_interval + description: | + How often (in seconds) to check for stale DAGs (DAGs which are no longer present in + the expected files) which should be deactivated. + version_added: 2.3.0 + type: integer + example: ~ + default: "60" - name: dag_dir_list_interval description: | How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index c8d34bb2f94d2..4c5754adf15b9 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -942,6 +942,10 @@ scheduler_idle_sleep_time = 1 # this interval. Keeping this number low will increase CPU usage. min_file_process_interval = 30 +# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in +# the expected files) which should be deactivated. +deactivate_stale_dags_interval = 60 + # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. dag_dir_list_interval = 300 diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 6d5e1fbbaa5cc..58e702483b2a0 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -420,9 +420,12 @@ def __init__( self.last_dag_dir_refresh_time = timezone.make_aware(datetime.fromtimestamp(0)) # Last time stats were printed self.last_stat_print_time = 0 + # Last time we cleaned up DAGs which are no longer in files + self.last_deactivate_stale_dags_time = timezone.make_aware(datetime.fromtimestamp(0)) + # How often to check for DAGs which are no longer in files + self.deactivate_stale_dags_interval = conf.getint('scheduler', 'deactivate_stale_dags_interval') # How long to wait before timing out a process to parse a DAG file self._processor_timeout = processor_timeout - # How often to scan the DAGs directory for new files. Default to 5 minutes. self.dag_dir_list_interval = conf.getint('scheduler', 'dag_dir_list_interval') @@ -471,6 +474,43 @@ def start(self): return self._run_parsing_loop() + @provide_session + def _deactivate_stale_dags(self, session=None): + """Detects DAGs which are no longer present in files and deactivate them.""" + now = timezone.utcnow() + elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds() + if elapsed_time_since_refresh > self.deactivate_stale_dags_interval: + last_parsed = { + fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp) + } + to_deactivate = set() + dags_parsed = ( + session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time) + .filter(DagModel.is_active) + .all() + ) + for dag in dags_parsed: + # The largest valid difference between a DagFileStat's last_finished_time and a DAG's + # last_parsed_time is _processor_timeout. Longer than that indicates that the DAG is + # no longer present in the file. + if ( + dag.fileloc in last_parsed + and (dag.last_parsed_time + self._processor_timeout) < last_parsed[dag.fileloc] + ): + self.log.info(f"DAG {dag.dag_id} is missing and will be deactivated.") + to_deactivate.add(dag.dag_id) + + if to_deactivate: + deactivated = ( + session.query(DagModel) + .filter(DagModel.dag_id.in_(to_deactivate)) + .update({DagModel.is_active: False}, synchronize_session="fetch") + ) + if deactivated: + self.log.info("Deactivated %i DAGs which are no longer present in file.", deactivated) + + self.last_deactivate_stale_dags_time = timezone.utcnow() + def _run_parsing_loop(self): # In sync mode we want timeout=None -- wait forever until a message is received @@ -536,6 +576,7 @@ def _run_parsing_loop(self): self.waitables.pop(sentinel) self._processors.pop(processor.file_path) + self._deactivate_stale_dags() self._refresh_dag_dir() self._kill_timed_out_processors() diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index c60c6ab8eeb53..028255adabfd6 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -628,8 +628,6 @@ def process_file( Stats.incr('dag_file_refresh_error', 1, 1) return 0, 0 - self._deactivate_missing_dags(session, dagbag, file_path) - if len(dagbag.dags) > 0: self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path) else: @@ -659,12 +657,3 @@ def process_file( self.log.exception("Error logging import errors!") return len(dagbag.dags), len(dagbag.import_errors) - - def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None: - deactivated = ( - session.query(DagModel) - .filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids)) - .update({DagModel.is_active: False}, synchronize_session="fetch") - ) - if deactivated: - self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path) diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index d919c835af1e7..dd440b8ad04ba 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -453,6 +453,54 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( > (freezed_base_time - manager.get_last_finish_time("file_1.py")).total_seconds() ) + def test_deactivate_stale_dags(self): + """ + Ensure that DAGs are marked inactive when the file is parsed but the + DagModel.last_parsed_time is not updated. + """ + manager = DagFileProcessorManager( + dag_directory='directory', + max_runs=1, + processor_timeout=timedelta(minutes=10), + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + test_dag_path = str(TEST_DAG_FOLDER / 'test_example_bash_operator.py') + dagbag = DagBag(test_dag_path, read_dags_from_db=False) + + with create_session() as session: + # Add stale DAG to the DB + dag = dagbag.get_dag('test_example_bash_operator') + dag.last_parsed_time = timezone.utcnow() + dag.sync_to_db() + + # Add DAG to the file_parsing_stats + stat = DagFileStat( + num_dags=1, + import_errors=0, + last_finish_time=timezone.utcnow() + timedelta(hours=1), + last_duration=1, + run_count=1, + ) + manager._file_paths = [test_dag_path] + manager._file_stats[test_dag_path] = stat + + active_dags = ( + session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all() + ) + assert len(active_dags) == 1 + + manager._file_stats[test_dag_path] = stat + manager._deactivate_stale_dags() + active_dags = ( + session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all() + ) + + assert len(active_dags) == 0 + @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock) @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill") def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid): diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 4e9cc2ed7fa26..ad94e85c191dc 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -707,31 +707,6 @@ def test_import_error_tracebacks_zip_depth(self, tmpdir): assert import_error.stacktrace == expected_stacktrace.format(invalid_dag_filename) session.rollback() - def test_process_file_should_deactivate_missing_dags(self): - - dag_file = os.path.join( - os.path.dirname(os.path.realpath(__file__)), '../dags/test_only_dummy_tasks.py' - ) - - # write a DAG into the DB which is not present in its specified file - with create_session() as session: - orm_dag = DagModel(dag_id='missing_dag', is_active=True, fileloc=dag_file) - session.merge(orm_dag) - session.commit() - - session = settings.Session() - - dags = session.query(DagModel).all() - assert [dag.dag_id for dag in dags if dag.is_active] == ['missing_dag'] - - # re-parse the file and see that the DAG is no longer there - dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) - dag_file_processor.process_file(dag_file, []) - - dags = session.query(DagModel).all() - assert [dag.dag_id for dag in dags if dag.is_active] == ['test_only_dummy_tasks'] - assert [dag.dag_id for dag in dags if not dag.is_active] == ['missing_dag'] - class TestProcessorAgent: @pytest.fixture(autouse=True)