Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
from contextlib import contextmanager, suppress
from multiprocessing import Process

import kombu.pools
import psutil
import sqlalchemy.exc
from celery import maybe_patch_concurrency
from celery import Celery, maybe_patch_concurrency
from celery.app.defaults import DEFAULT_TASK_LOG_FMT
from celery.app.log import TaskFormatter
from celery.signals import after_setup_logger
Expand Down Expand Up @@ -219,18 +220,27 @@ def worker(args):
# Use team_config for config reads in multi-team mode, otherwise use global conf
config = team_config if team_config else conf

# Check if a worker with the same hostname already exists
# Check if a worker with the same hostname already exists. The inspect
# call must run on a throwaway Celery app, not on the app handed to
# worker_main below: inspect opens broker connections and initializes
# app.amqp.producer_pool / connection_pool, and kombu.pools is a
# process-global registry keyed by broker URL — any pool state that
# leaks into the real app breaks the prefork worker's dispatch path
# so tasks are received but never run. The kombu.pools.reset() call
# is load-bearing; see https://github.com/apache/airflow/issues/59707.
if args.celery_hostname:
inspect = celery_app.control.inspect()
active_workers = inspect.active_queues()
if active_workers:
active_worker_names = list(active_workers.keys())
# Check if any worker ends with @hostname
if any(name.endswith(f"@{args.celery_hostname}") for name in active_worker_names):
temp_app = Celery()
temp_app.conf.update(celery_app.conf)
try:
active_workers = temp_app.control.inspect().active_queues()
if active_workers and any(name.endswith(f"@{args.celery_hostname}") for name in active_workers):
raise SystemExit(
f"Error: A worker with hostname '{args.celery_hostname}' is already running. "
"Please use a different hostname or stop the existing worker first."
)
finally:
temp_app.close()
kombu.pools.reset()

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.log import configure_logging
Expand Down
37 changes: 24 additions & 13 deletions providers/celery/tests/unit/celery/cli/test_celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,45 +330,56 @@ def setup_class(cls):
cls.parser = cli_parser.get_parser()

@pytest.mark.db_test
@mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect")
def test_worker_fails_when_hostname_already_exists(self, mock_inspect):
@mock.patch("airflow.providers.celery.cli.celery_command.kombu.pools.reset")
@mock.patch("airflow.providers.celery.cli.celery_command.Celery")
def test_worker_fails_when_hostname_already_exists(self, mock_celery_cls, mock_pools_reset):
"""Test that worker command fails when trying to start a worker with a duplicate hostname."""
args = self.parser.parse_args(["celery", "worker", "--celery-hostname", "existing_host"])

# Mock the inspect to return an active worker with the same hostname
mock_instance = MagicMock()
mock_instance.active_queues.return_value = {
mock_temp_app = MagicMock()
mock_temp_app.control.inspect.return_value.active_queues.return_value = {
"celery@existing_host": [{"name": "queue1"}],
}
mock_inspect.return_value = mock_instance
mock_celery_cls.return_value = mock_temp_app

# Test that SystemExit is raised with appropriate error message
with pytest.raises(SystemExit) as exc_info:
celery_command.worker(args)

assert "existing_host" in str(exc_info.value)
assert "already running" in str(exc_info.value)
# Pool cleanup must run even when the check raises — see issue #59707
mock_temp_app.close.assert_called_once()
mock_pools_reset.assert_called_once()

@pytest.mark.db_test
@mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect")
@mock.patch("airflow.providers.celery.cli.celery_command.kombu.pools.reset")
@mock.patch("airflow.providers.celery.cli.celery_command.Celery")
@mock.patch("airflow.providers.celery.cli.celery_command.Process")
@mock.patch("airflow.providers.celery.executors.celery_executor.app")
def test_worker_starts_when_hostname_is_unique(self, mock_celery_app, mock_popen, mock_inspect):
def test_worker_starts_when_hostname_is_unique(
self, mock_celery_app, mock_popen, mock_celery_cls, mock_pools_reset
):
"""Test that worker command succeeds when the hostname is unique."""
args = self.parser.parse_args(["celery", "worker", "--celery-hostname", "new_host"])

# Mock the inspect to return active workers without the new hostname
mock_instance = MagicMock()
mock_instance.active_queues.return_value = {
mock_temp_app = MagicMock()
mock_temp_app.control.inspect.return_value.active_queues.return_value = {
"celery@existing_host": [{"name": "queue1"}],
}
mock_inspect.return_value = mock_instance
mock_celery_cls.return_value = mock_temp_app

# Worker should start successfully
celery_command.worker(args)

# Verify that worker_main was called
# Verify that worker_main was called on the real app, not the temp app
assert mock_celery_app.worker_main.called
mock_temp_app.worker_main.assert_not_called()
# Real app's inspect must not be touched — it would pollute the worker's pools
mock_celery_app.control.inspect.assert_not_called()
# Pool cleanup must still run on the happy path
mock_temp_app.close.assert_called_once()
mock_pools_reset.assert_called_once()


@pytest.mark.backend("mysql", "postgres")
Expand Down
Loading