Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1870,6 +1870,14 @@
type: string
example: ~
default: "30"
- name: deactivate_stale_dags_interval
description: |
How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
the expected files) which should be deactivated.
version_added: 2.3.0
type: integer
example: ~
default: "60"
- name: dag_dir_list_interval
description: |
How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,10 @@ scheduler_idle_sleep_time = 1
# this interval. Keeping this number low will increase CPU usage.
min_file_process_interval = 30

# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
# the expected files) which should be deactivated.
deactivate_stale_dags_interval = 60

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300

Expand Down
43 changes: 42 additions & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,12 @@ def __init__(
self.last_dag_dir_refresh_time = timezone.make_aware(datetime.fromtimestamp(0))
# Last time stats were printed
self.last_stat_print_time = 0
# Last time we cleaned up DAGs which are no longer in files
self.last_deactivate_stale_dags_time = timezone.make_aware(datetime.fromtimestamp(0))
# How often to check for DAGs which are no longer in files
self.deactivate_stale_dags_interval = conf.getint('scheduler', 'deactivate_stale_dags_interval')
# How long to wait before timing out a process to parse a DAG file
self._processor_timeout = processor_timeout

# How often to scan the DAGs directory for new files. Default to 5 minutes.
self.dag_dir_list_interval = conf.getint('scheduler', 'dag_dir_list_interval')

Expand Down Expand Up @@ -471,6 +474,43 @@ def start(self):

return self._run_parsing_loop()

@provide_session
def _deactivate_stale_dags(self, session=None):
"""Detects DAGs which are no longer present in files and deactivate them."""
now = timezone.utcnow()
elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
last_parsed = {
fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
}
to_deactivate = set()
dags_parsed = (
session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time)
.filter(DagModel.is_active)
.all()
)
for dag in dags_parsed:
# The largest valid difference between a DagFileStat's last_finished_time and a DAG's
# last_parsed_time is _processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file.
if (
dag.fileloc in last_parsed
and (dag.last_parsed_time + self._processor_timeout) < last_parsed[dag.fileloc]
):
self.log.info(f"DAG {dag.dag_id} is missing and will be deactivated.")
to_deactivate.add(dag.dag_id)

if to_deactivate:
deactivated = (
session.query(DagModel)
.filter(DagModel.dag_id.in_(to_deactivate))
.update({DagModel.is_active: False}, synchronize_session="fetch")
)
if deactivated:
self.log.info("Deactivated %i DAGs which are no longer present in file.", deactivated)

self.last_deactivate_stale_dags_time = timezone.utcnow()

def _run_parsing_loop(self):

# In sync mode we want timeout=None -- wait forever until a message is received
Expand Down Expand Up @@ -536,6 +576,7 @@ def _run_parsing_loop(self):
self.waitables.pop(sentinel)
self._processors.pop(processor.file_path)

self._deactivate_stale_dags()
self._refresh_dag_dir()

self._kill_timed_out_processors()
Expand Down
11 changes: 0 additions & 11 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,6 @@ def process_file(
Stats.incr('dag_file_refresh_error', 1, 1)
return 0, 0

self._deactivate_missing_dags(session, dagbag, file_path)

if len(dagbag.dags) > 0:
self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
else:
Expand Down Expand Up @@ -659,12 +657,3 @@ def process_file(
self.log.exception("Error logging import errors!")

return len(dagbag.dags), len(dagbag.import_errors)

def _deactivate_missing_dags(self, session: Session, dagbag: DagBag, file_path: str) -> None:
deactivated = (
session.query(DagModel)
.filter(DagModel.fileloc == file_path, DagModel.is_active, ~DagModel.dag_id.in_(dagbag.dag_ids))
.update({DagModel.is_active: False}, synchronize_session="fetch")
)
if deactivated:
self.log.info("Deactivated %i DAGs which are no longer present in %s", deactivated, file_path)
48 changes: 48 additions & 0 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,54 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(
> (freezed_base_time - manager.get_last_finish_time("file_1.py")).total_seconds()
)

def test_deactivate_stale_dags(self):
"""
Ensure that DAGs are marked inactive when the file is parsed but the
DagModel.last_parsed_time is not updated.
"""
manager = DagFileProcessorManager(
dag_directory='directory',
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
)

test_dag_path = str(TEST_DAG_FOLDER / 'test_example_bash_operator.py')
dagbag = DagBag(test_dag_path, read_dags_from_db=False)

with create_session() as session:
# Add stale DAG to the DB
dag = dagbag.get_dag('test_example_bash_operator')
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db()

# Add DAG to the file_parsing_stats
stat = DagFileStat(
num_dags=1,
import_errors=0,
last_finish_time=timezone.utcnow() + timedelta(hours=1),
last_duration=1,
run_count=1,
)
manager._file_paths = [test_dag_path]
manager._file_stats[test_dag_path] = stat

active_dags = (
session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all()
)
assert len(active_dags) == 1

manager._file_stats[test_dag_path] = stat
manager._deactivate_stale_dags()
active_dags = (
session.query(DagModel).filter(DagModel.is_active, DagModel.fileloc == test_dag_path).all()
)

assert len(active_dags) == 0

@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
Expand Down
25 changes: 0 additions & 25 deletions tests/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,31 +707,6 @@ def test_import_error_tracebacks_zip_depth(self, tmpdir):
assert import_error.stacktrace == expected_stacktrace.format(invalid_dag_filename)
session.rollback()

def test_process_file_should_deactivate_missing_dags(self):

dag_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), '../dags/test_only_dummy_tasks.py'
)

# write a DAG into the DB which is not present in its specified file
with create_session() as session:
orm_dag = DagModel(dag_id='missing_dag', is_active=True, fileloc=dag_file)
session.merge(orm_dag)
session.commit()

session = settings.Session()

dags = session.query(DagModel).all()
assert [dag.dag_id for dag in dags if dag.is_active] == ['missing_dag']

# re-parse the file and see that the DAG is no longer there
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag_file_processor.process_file(dag_file, [])

dags = session.query(DagModel).all()
assert [dag.dag_id for dag in dags if dag.is_active] == ['test_only_dummy_tasks']
assert [dag.dag_id for dag in dags if not dag.is_active] == ['missing_dag']


class TestProcessorAgent:
@pytest.fixture(autouse=True)
Expand Down