Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions airflow/providers/celery/executors/celery_executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.celery.executors.default_celery import DEFAULT_CELERY_CONFIG
from airflow.stats import Stats
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.timeout import timeout

log = logging.getLogger(__name__)
Expand All @@ -65,23 +65,36 @@
# Make it constant for unit test.
CELERY_FETCH_ERR_MSG_HEADER = "Error fetching Celery task state"

if conf.has_option("celery", "celery_config_options"):
celery_configuration = conf.getimport("celery", "celery_config_options")

else:
celery_configuration = DEFAULT_CELERY_CONFIG

celery_app_name = conf.get("celery", "CELERY_APP_NAME")
if celery_app_name == "airflow.executors.celery_executor":
warnings.warn(
"The celery.CELERY_APP_NAME configuration uses deprecated package name: "
"'airflow.executors.celery_executor'. "
"Change it to `airflow.providers.celery.executors.celery_executor`, and "
"update the `-app` flag in your Celery Health Checks "
"to use `airflow.providers.celery.executors.celery_executor.app`.",
RemovedInAirflow3Warning,
)
app = Celery(celery_app_name, config_source=celery_configuration)
celery_configuration = None


@providers_configuration_loaded
def _get_celery_app() -> Celery:
"""Init providers before importing the configuration, so the _SECRET and _CMD options work."""
global celery_configuration

if conf.has_option("celery", "celery_config_options"):
celery_configuration = conf.getimport("celery", "celery_config_options")
else:
from airflow.providers.celery.executors.default_celery import DEFAULT_CELERY_CONFIG

celery_configuration = DEFAULT_CELERY_CONFIG

celery_app_name = conf.get("celery", "CELERY_APP_NAME")
if celery_app_name == "airflow.executors.celery_executor":
warnings.warn(
"The celery.CELERY_APP_NAME configuration uses deprecated package name: "
"'airflow.executors.celery_executor'. "
"Change it to `airflow.providers.celery.executors.celery_executor`, and "
"update the `-app` flag in your Celery Health Checks "
"to use `airflow.providers.celery.executors.celery_executor.app`.",
RemovedInAirflow3Warning,
)

return Celery(celery_app_name, config_source=celery_configuration)


app = _get_celery_app()


@celery_import_modules.connect
Expand Down