diff --git a/airflow/providers/celery/executors/celery_executor_utils.py b/airflow/providers/celery/executors/celery_executor_utils.py index 01be860b92288..6e056409d09d3 100644 --- a/airflow/providers/celery/executors/celery_executor_utils.py +++ b/airflow/providers/celery/executors/celery_executor_utils.py @@ -43,11 +43,11 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException, RemovedInAirflow3Warning from airflow.executors.base_executor import BaseExecutor -from airflow.providers.celery.executors.default_celery import DEFAULT_CELERY_CONFIG from airflow.stats import Stats from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname +from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.timeout import timeout log = logging.getLogger(__name__) @@ -65,23 +65,36 @@ # Make it constant for unit test. CELERY_FETCH_ERR_MSG_HEADER = "Error fetching Celery task state" -if conf.has_option("celery", "celery_config_options"): - celery_configuration = conf.getimport("celery", "celery_config_options") - -else: - celery_configuration = DEFAULT_CELERY_CONFIG - -celery_app_name = conf.get("celery", "CELERY_APP_NAME") -if celery_app_name == "airflow.executors.celery_executor": - warnings.warn( - "The celery.CELERY_APP_NAME configuration uses deprecated package name: " - "'airflow.executors.celery_executor'. " - "Change it to `airflow.providers.celery.executors.celery_executor`, and " - "update the `-app` flag in your Celery Health Checks " - "to use `airflow.providers.celery.executors.celery_executor.app`.", - RemovedInAirflow3Warning, - ) -app = Celery(celery_app_name, config_source=celery_configuration) +celery_configuration = None + + +@providers_configuration_loaded +def _get_celery_app() -> Celery: + """Init providers before importing the configuration, so the _SECRET and _CMD options work.""" + global celery_configuration + + if conf.has_option("celery", "celery_config_options"): + celery_configuration = conf.getimport("celery", "celery_config_options") + else: + from airflow.providers.celery.executors.default_celery import DEFAULT_CELERY_CONFIG + + celery_configuration = DEFAULT_CELERY_CONFIG + + celery_app_name = conf.get("celery", "CELERY_APP_NAME") + if celery_app_name == "airflow.executors.celery_executor": + warnings.warn( + "The celery.CELERY_APP_NAME configuration uses deprecated package name: " + "'airflow.executors.celery_executor'. " + "Change it to `airflow.providers.celery.executors.celery_executor`, and " + "update the `-app` flag in your Celery Health Checks " + "to use `airflow.providers.celery.executors.celery_executor.app`.", + RemovedInAirflow3Warning, + ) + + return Celery(celery_app_name, config_source=celery_configuration) + + +app = _get_celery_app() @celery_import_modules.connect