From b3be5736613496b76190621be964d2ee8ae02a62 Mon Sep 17 00:00:00 2001 From: Mikhail Ilchenko Date: Tue, 22 Feb 2022 13:47:26 +0300 Subject: [PATCH] Fix filesystem sensor for directories Fix walking through wildcarded directory in `FileSensor.poke` method --- airflow/sensors/filesystem.py | 2 +- tests/sensors/test_filesystem.py | 36 ++++++++++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py index 0ccf6586951e1..9aa56900e9bf2 100644 --- a/airflow/sensors/filesystem.py +++ b/airflow/sensors/filesystem.py @@ -63,7 +63,7 @@ def poke(self, context: Context): self.log.info('Found File %s last modified: %s', str(path), mod_time) return True - for _, _, files in os.walk(full_path): + for _, _, files in os.walk(path): if len(files) > 0: return True return False diff --git a/tests/sensors/test_filesystem.py b/tests/sensors/test_filesystem.py index 4d23331d064fb..e696f1e9507fc 100644 --- a/tests/sensors/test_filesystem.py +++ b/tests/sensors/test_filesystem.py @@ -17,7 +17,7 @@ # under the License. # -import os.path +import os import shutil import tempfile import unittest @@ -131,6 +131,38 @@ def test_wildcard_file(self): task._hook = self.hook task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_wildcard_empty_directory(self): + with tempfile.TemporaryDirectory() as temp_dir: + with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir): + task = FileSensor( + task_id='test', + filepath=os.path.join(temp_dir, '*dir'), + fs_conn_id='fs_default', + dag=self.dag, + timeout=0, + ) + task._hook = self.hook + + # No files in dir + with pytest.raises(AirflowSensorTimeout): + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + def test_wildcard_directory_with_files(self): + with tempfile.TemporaryDirectory() as temp_dir: + with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir) as subdir: + task = FileSensor( + task_id='test', + filepath=os.path.join(temp_dir, '*dir'), + fs_conn_id='fs_default', + dag=self.dag, + timeout=0, + ) + task._hook = self.hook + + # `touch` the file in subdir + open(os.path.join(subdir, 'file'), 'a').close() + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_wildcared_directory(self): temp_dir = tempfile.mkdtemp() subdir = tempfile.mkdtemp(dir=temp_dir) @@ -146,7 +178,7 @@ def test_wildcared_directory(self): task._hook = self.hook try: - # `touch` the dir + # `touch` the file in subdir open(subdir + "/file", "a").close() task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) finally: