diff --git a/providers/celery/src/airflow/providers/celery/cli/celery_command.py b/providers/celery/src/airflow/providers/celery/cli/celery_command.py index 685163042b8d9..0e2b66aabf10c 100644 --- a/providers/celery/src/airflow/providers/celery/cli/celery_command.py +++ b/providers/celery/src/airflow/providers/celery/cli/celery_command.py @@ -25,9 +25,10 @@ from contextlib import contextmanager, suppress from multiprocessing import Process +import kombu.pools import psutil import sqlalchemy.exc -from celery import maybe_patch_concurrency +from celery import Celery, maybe_patch_concurrency from celery.app.defaults import DEFAULT_TASK_LOG_FMT from celery.app.log import TaskFormatter from celery.signals import after_setup_logger @@ -219,18 +220,27 @@ def worker(args): # Use team_config for config reads in multi-team mode, otherwise use global conf config = team_config if team_config else conf - # Check if a worker with the same hostname already exists + # Check if a worker with the same hostname already exists. The inspect + # call must run on a throwaway Celery app, not on the app handed to + # worker_main below: inspect opens broker connections and initializes + # app.amqp.producer_pool / connection_pool, and kombu.pools is a + # process-global registry keyed by broker URL — any pool state that + # leaks into the real app breaks the prefork worker's dispatch path + # so tasks are received but never run. The kombu.pools.reset() call + # is load-bearing; see https://github.com/apache/airflow/issues/59707. if args.celery_hostname: - inspect = celery_app.control.inspect() - active_workers = inspect.active_queues() - if active_workers: - active_worker_names = list(active_workers.keys()) - # Check if any worker ends with @hostname - if any(name.endswith(f"@{args.celery_hostname}") for name in active_worker_names): + temp_app = Celery() + temp_app.conf.update(celery_app.conf) + try: + active_workers = temp_app.control.inspect().active_queues() + if active_workers and any(name.endswith(f"@{args.celery_hostname}") for name in active_workers): raise SystemExit( f"Error: A worker with hostname '{args.celery_hostname}' is already running. " "Please use a different hostname or stop the existing worker first." ) + finally: + temp_app.close() + kombu.pools.reset() if AIRFLOW_V_3_0_PLUS: from airflow.sdk.log import configure_logging diff --git a/providers/celery/tests/unit/celery/cli/test_celery_command.py b/providers/celery/tests/unit/celery/cli/test_celery_command.py index 5ccc14ffa16e7..a14cadfee1528 100644 --- a/providers/celery/tests/unit/celery/cli/test_celery_command.py +++ b/providers/celery/tests/unit/celery/cli/test_celery_command.py @@ -330,17 +330,17 @@ def setup_class(cls): cls.parser = cli_parser.get_parser() @pytest.mark.db_test - @mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect") - def test_worker_fails_when_hostname_already_exists(self, mock_inspect): + @mock.patch("airflow.providers.celery.cli.celery_command.kombu.pools.reset") + @mock.patch("airflow.providers.celery.cli.celery_command.Celery") + def test_worker_fails_when_hostname_already_exists(self, mock_celery_cls, mock_pools_reset): """Test that worker command fails when trying to start a worker with a duplicate hostname.""" args = self.parser.parse_args(["celery", "worker", "--celery-hostname", "existing_host"]) - # Mock the inspect to return an active worker with the same hostname - mock_instance = MagicMock() - mock_instance.active_queues.return_value = { + mock_temp_app = MagicMock() + mock_temp_app.control.inspect.return_value.active_queues.return_value = { "celery@existing_host": [{"name": "queue1"}], } - mock_inspect.return_value = mock_instance + mock_celery_cls.return_value = mock_temp_app # Test that SystemExit is raised with appropriate error message with pytest.raises(SystemExit) as exc_info: @@ -348,27 +348,38 @@ def test_worker_fails_when_hostname_already_exists(self, mock_inspect): assert "existing_host" in str(exc_info.value) assert "already running" in str(exc_info.value) + # Pool cleanup must run even when the check raises — see issue #59707 + mock_temp_app.close.assert_called_once() + mock_pools_reset.assert_called_once() @pytest.mark.db_test - @mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect") + @mock.patch("airflow.providers.celery.cli.celery_command.kombu.pools.reset") + @mock.patch("airflow.providers.celery.cli.celery_command.Celery") @mock.patch("airflow.providers.celery.cli.celery_command.Process") @mock.patch("airflow.providers.celery.executors.celery_executor.app") - def test_worker_starts_when_hostname_is_unique(self, mock_celery_app, mock_popen, mock_inspect): + def test_worker_starts_when_hostname_is_unique( + self, mock_celery_app, mock_popen, mock_celery_cls, mock_pools_reset + ): """Test that worker command succeeds when the hostname is unique.""" args = self.parser.parse_args(["celery", "worker", "--celery-hostname", "new_host"]) - # Mock the inspect to return active workers without the new hostname - mock_instance = MagicMock() - mock_instance.active_queues.return_value = { + mock_temp_app = MagicMock() + mock_temp_app.control.inspect.return_value.active_queues.return_value = { "celery@existing_host": [{"name": "queue1"}], } - mock_inspect.return_value = mock_instance + mock_celery_cls.return_value = mock_temp_app # Worker should start successfully celery_command.worker(args) - # Verify that worker_main was called + # Verify that worker_main was called on the real app, not the temp app assert mock_celery_app.worker_main.called + mock_temp_app.worker_main.assert_not_called() + # Real app's inspect must not be touched — it would pollute the worker's pools + mock_celery_app.control.inspect.assert_not_called() + # Pool cleanup must still run on the happy path + mock_temp_app.close.assert_called_once() + mock_pools_reset.assert_called_once() @pytest.mark.backend("mysql", "postgres")