From 9a6b820b3b05709373a2a533c417fbd659e1f6ae Mon Sep 17 00:00:00 2001 From: RNHTTR Date: Thu, 18 Mar 2021 13:22:52 -0400 Subject: [PATCH 1/5] fix bug trying to get the length of a map --- airflow/executors/celery_executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 9ead841334142..93830fa4df143 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -551,6 +551,7 @@ def get_many(self, async_results) -> Mapping[str, EventBufferValueType]: if isinstance(app.backend, DatabaseBackend): result = self._get_many_from_db_backend(async_results) return result + async_results = list(async_results) result = self._get_many_using_multiprocessing(async_results) self.log.debug("Fetched %d states for %d task", len(result), len(async_results)) return result From ad05167a1b13719bcbf7e7a434338e0bd0c852b8 Mon Sep 17 00:00:00 2001 From: RNHTTR Date: Thu, 18 Mar 2021 13:59:33 -0400 Subject: [PATCH 2/5] fix bug trying to get the length of a map --- airflow/executors/celery_executor.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 93830fa4df143..b91446fb65d5d 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -547,12 +547,12 @@ def get_many(self, async_results) -> Mapping[str, EventBufferValueType]: """Gets status for many Celery tasks using the best method available.""" if isinstance(app.backend, BaseKeyValueStoreBackend): result = self._get_many_from_kv_backend(async_results) - return result - if isinstance(app.backend, DatabaseBackend): + elif isinstance(app.backend, DatabaseBackend): result = self._get_many_from_db_backend(async_results) - return result - async_results = list(async_results) - result = self._get_many_using_multiprocessing(async_results) + else: + async_results = list(async_results) + result = self._get_many_using_multiprocessing(async_results) + async_results = list(async_results) if isinstance(async_results, map) else async_results self.log.debug("Fetched %d states for %d task", len(result), len(async_results)) return result From f163208fee99ff3cd91cc9ba3856bfcd0ae101bf Mon Sep 17 00:00:00 2001 From: RNHTTR Date: Tue, 23 Mar 2021 15:34:59 -0400 Subject: [PATCH 3/5] Fix celery executor bug trying to call len on map --- airflow/executors/celery_executor.py | 7 ++++--- tests/executors/test_celery_executor.py | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index b91446fb65d5d..97df6dd271939 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -550,10 +550,11 @@ def get_many(self, async_results) -> Mapping[str, EventBufferValueType]: elif isinstance(app.backend, DatabaseBackend): result = self._get_many_from_db_backend(async_results) else: - async_results = list(async_results) + async_results = list(async_results) if isinstance(async_results, map) else async_results result = self._get_many_using_multiprocessing(async_results) - async_results = list(async_results) if isinstance(async_results, map) else async_results - self.log.debug("Fetched %d states for %d task", len(result), len(async_results)) + if isinstance(async_results, map) and self.log.level == "DEBUG": + async_results = list(async_results) + self.log.debug("Fetched %d states for %d task", len(result), len(async_results)) return result def _get_many_from_kv_backend(self, async_tasks) -> Mapping[str, EventBufferValueType]: diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 1e562547b8308..6cb478e89de31 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -17,6 +17,7 @@ # under the License. import contextlib import json +import operator import os import sys import unittest @@ -469,3 +470,23 @@ def test_should_support_base_backend(self): ) assert result == {'123': ('SUCCESS', None), '456': ("PENDING", None)} + + @pytest.mark.integration("redis") + @pytest.mark.integration("rabbitmq") + @pytest.mark.backend("mysql", "postgres") + def test_should_support_base_backend_from_try_adopt_task_instances(self): + celery_tasks = { + 123: (ClassWithCustomAttributes(task_id="123", state='SUCCESS'), None), + 456: (ClassWithCustomAttributes(task_id="456", state="PENDING"), None), + } + with _prepare_app(): + mock_backend = mock.MagicMock(autospec=BaseBackend) + + with mock.patch.object(celery_executor.app, 'backend', mock_backend): + + fetcher = BulkStateFetcher(1) + states_by_celery_task_id = fetcher.get_many( + map(operator.itemgetter(0), celery_tasks.values()) + ) + + assert states_by_celery_task_id == {'123': ('SUCCESS', None), '456': ("PENDING", None)} From 8de740e6b53565842cbf9182322ba37714d33cf5 Mon Sep 17 00:00:00 2001 From: RNHTTR Date: Sat, 27 Mar 2021 16:11:50 -0400 Subject: [PATCH 4/5] Fix celery executor bug trying to call len on map --- airflow/executors/celery_executor.py | 19 ++++++----- tests/executors/test_celery_executor.py | 44 ++++++++++++++++++------- 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 97df6dd271939..04cd61bec2d84 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -526,10 +526,6 @@ def fetch_celery_task_state(async_result: AsyncResult) -> Tuple[str, Union[str, return async_result.task_id, ExceptionWithTraceback(e, exception_traceback), None -def _tasks_list_to_task_ids(async_tasks) -> Set[str]: - return {a.task_id for a in async_tasks} - - class BulkStateFetcher(LoggingMixin): """ Gets status for many Celery tasks using the best method available @@ -543,6 +539,9 @@ def __init__(self, sync_parallelism=None): super().__init__() self._sync_parallelism = sync_parallelism + def _tasks_list_to_task_ids(self, async_tasks) -> Set[str]: + return {a.task_id for a in async_tasks} + def get_many(self, async_results) -> Mapping[str, EventBufferValueType]: """Gets status for many Celery tasks using the best method available.""" if isinstance(app.backend, BaseKeyValueStoreBackend): @@ -552,13 +551,15 @@ def get_many(self, async_results) -> Mapping[str, EventBufferValueType]: else: async_results = list(async_results) if isinstance(async_results, map) else async_results result = self._get_many_using_multiprocessing(async_results) - if isinstance(async_results, map) and self.log.level == "DEBUG": - async_results = list(async_results) - self.log.debug("Fetched %d states for %d task", len(result), len(async_results)) + if logging.getLevelName(self.log.level) == "DEBUG": + if isinstance(async_results, map): + self.log.debug("Fetched state for %d task(s)", len(result)) + else: + self.log.debug("Fetched %d state(s) for %d task(s)", len(result), len(async_results)) return result def _get_many_from_kv_backend(self, async_tasks) -> Mapping[str, EventBufferValueType]: - task_ids = _tasks_list_to_task_ids(async_tasks) + task_ids = self._tasks_list_to_task_ids(async_tasks) keys = [app.backend.get_key_for_task(k) for k in task_ids] values = app.backend.mget(keys) task_results = [app.backend.decode_result(v) for v in values if v] @@ -567,7 +568,7 @@ def _get_many_from_kv_backend(self, async_tasks) -> Mapping[str, EventBufferValu return self._prepare_state_and_info_by_task_dict(task_ids, task_results_by_task_id) def _get_many_from_db_backend(self, async_tasks) -> Mapping[str, EventBufferValueType]: - task_ids = _tasks_list_to_task_ids(async_tasks) + task_ids = self._tasks_list_to_task_ids(async_tasks) session = app.backend.ResultSession() task_cls = getattr(app.backend, "task_cls", TaskDb) with session_cleanup(session): diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 6cb478e89de31..bef4d33ed119a 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -413,7 +413,9 @@ class TestBulkStateFetcher(unittest.TestCase): def test_should_support_kv_backend(self, mock_mget): with _prepare_app(): mock_backend = BaseKeyValueStoreBackend(app=celery_executor.app) - with mock.patch.object(celery_executor.app, 'backend', mock_backend): + with mock.patch.object(celery_executor.app, 'backend', mock_backend), self.assertLogs( + "airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG" + ) as cm: fetcher = BulkStateFetcher() result = fetcher.get_many( [ @@ -428,6 +430,9 @@ def test_should_support_kv_backend(self, mock_mget): mock_mget.assert_called_once_with(mock.ANY) assert result == {'123': ('SUCCESS', None), '456': ("PENDING", None)} + assert [ + 'DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)' + ] == cm.output @mock.patch("celery.backends.database.DatabaseBackend.ResultSession") @pytest.mark.integration("redis") @@ -437,21 +442,26 @@ def test_should_support_db_backend(self, mock_session): with _prepare_app(): mock_backend = DatabaseBackend(app=celery_executor.app, url="sqlite3://") - with mock.patch.object(celery_executor.app, 'backend', mock_backend): + with mock.patch.object(celery_executor.app, 'backend', mock_backend), self.assertLogs( + "airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG" + ) as cm: mock_session = mock_backend.ResultSession.return_value # pylint: disable=no-member mock_session.query.return_value.filter.return_value.all.return_value = [ mock.MagicMock(**{"to_dict.return_value": {"status": "SUCCESS", "task_id": "123"}}) ] - fetcher = BulkStateFetcher() - result = fetcher.get_many( - [ - mock.MagicMock(task_id="123"), - mock.MagicMock(task_id="456"), - ] - ) + fetcher = BulkStateFetcher() + result = fetcher.get_many( + [ + mock.MagicMock(task_id="123"), + mock.MagicMock(task_id="456"), + ] + ) assert result == {'123': ('SUCCESS', None), '456': ("PENDING", None)} + assert [ + 'DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)' + ] == cm.output @pytest.mark.integration("redis") @pytest.mark.integration("rabbitmq") @@ -460,7 +470,9 @@ def test_should_support_base_backend(self): with _prepare_app(): mock_backend = mock.MagicMock(autospec=BaseBackend) - with mock.patch.object(celery_executor.app, 'backend', mock_backend): + with mock.patch.object(celery_executor.app, 'backend', mock_backend), self.assertLogs( + "airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG" + ) as cm: fetcher = BulkStateFetcher(1) result = fetcher.get_many( [ @@ -470,11 +482,15 @@ def test_should_support_base_backend(self): ) assert result == {'123': ('SUCCESS', None), '456': ("PENDING", None)} + assert [ + 'DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)' + ] == cm.output @pytest.mark.integration("redis") @pytest.mark.integration("rabbitmq") @pytest.mark.backend("mysql", "postgres") def test_should_support_base_backend_from_try_adopt_task_instances(self): + celery_tasks = { 123: (ClassWithCustomAttributes(task_id="123", state='SUCCESS'), None), 456: (ClassWithCustomAttributes(task_id="456", state="PENDING"), None), @@ -482,11 +498,15 @@ def test_should_support_base_backend_from_try_adopt_task_instances(self): with _prepare_app(): mock_backend = mock.MagicMock(autospec=BaseBackend) - with mock.patch.object(celery_executor.app, 'backend', mock_backend): - + with mock.patch.object(celery_executor.app, 'backend', mock_backend), self.assertLogs( + "airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG" + ) as cm: fetcher = BulkStateFetcher(1) states_by_celery_task_id = fetcher.get_many( map(operator.itemgetter(0), celery_tasks.values()) ) assert states_by_celery_task_id == {'123': ('SUCCESS', None), '456': ("PENDING", None)} + assert [ + 'DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)' + ] == cm.output From 52469bbb23928d740833d95dd3d8c513a0dfe8b9 Mon Sep 17 00:00:00 2001 From: RNHTTR Date: Thu, 1 Apr 2021 14:51:40 -0400 Subject: [PATCH 5/5] Fix celery executor bug trying to call len on map --- airflow/executors/celery_executor.py | 9 ++------- tests/executors/test_celery_executor.py | 26 ------------------------- 2 files changed, 2 insertions(+), 33 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 04cd61bec2d84..bc321c665dbcf 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -476,7 +476,7 @@ def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance return tis states_by_celery_task_id = self.bulk_state_fetcher.get_many( - map(operator.itemgetter(0), celery_tasks.values()) + list(map(operator.itemgetter(0), celery_tasks.values())) ) adopted = [] @@ -549,13 +549,8 @@ def get_many(self, async_results) -> Mapping[str, EventBufferValueType]: elif isinstance(app.backend, DatabaseBackend): result = self._get_many_from_db_backend(async_results) else: - async_results = list(async_results) if isinstance(async_results, map) else async_results result = self._get_many_using_multiprocessing(async_results) - if logging.getLevelName(self.log.level) == "DEBUG": - if isinstance(async_results, map): - self.log.debug("Fetched state for %d task(s)", len(result)) - else: - self.log.debug("Fetched %d state(s) for %d task(s)", len(result), len(async_results)) + self.log.debug("Fetched %d state(s) for %d task(s)", len(result), len(async_results)) return result def _get_many_from_kv_backend(self, async_tasks) -> Mapping[str, EventBufferValueType]: diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index bef4d33ed119a..f454c5a419f23 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -17,7 +17,6 @@ # under the License. import contextlib import json -import operator import os import sys import unittest @@ -485,28 +484,3 @@ def test_should_support_base_backend(self): assert [ 'DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)' ] == cm.output - - @pytest.mark.integration("redis") - @pytest.mark.integration("rabbitmq") - @pytest.mark.backend("mysql", "postgres") - def test_should_support_base_backend_from_try_adopt_task_instances(self): - - celery_tasks = { - 123: (ClassWithCustomAttributes(task_id="123", state='SUCCESS'), None), - 456: (ClassWithCustomAttributes(task_id="456", state="PENDING"), None), - } - with _prepare_app(): - mock_backend = mock.MagicMock(autospec=BaseBackend) - - with mock.patch.object(celery_executor.app, 'backend', mock_backend), self.assertLogs( - "airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG" - ) as cm: - fetcher = BulkStateFetcher(1) - states_by_celery_task_id = fetcher.get_many( - map(operator.itemgetter(0), celery_tasks.values()) - ) - - assert states_by_celery_task_id == {'123': ('SUCCESS', None), '456': ("PENDING", None)} - assert [ - 'DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)' - ] == cm.output