diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 02715a84ad93b..40563a11d4e21 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -113,6 +113,7 @@ class BaseExecutor(LoggingMixin): callback_sink: BaseCallbackSink | None = None is_local: bool = False + change_sensor_mode_to_reschedule: bool = False serve_logs: bool = False diff --git a/airflow/executors/celery_kubernetes_executor.py b/airflow/executors/celery_kubernetes_executor.py index af3d42a816718..b477d25eaad14 100644 --- a/airflow/executors/celery_kubernetes_executor.py +++ b/airflow/executors/celery_kubernetes_executor.py @@ -41,6 +41,7 @@ class CeleryKubernetesExecutor(LoggingMixin): supports_ad_hoc_ti_run: bool = True supports_pickling: bool = True supports_sentry: bool = False + change_sensor_mode_to_reschedule: bool = False callback_sink: BaseCallbackSink | None = None diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index e0261f9815fea..18ba0f8798f64 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -42,6 +42,7 @@ class DebugExecutor(BaseExecutor): """ _terminated = threading.Event() + change_sensor_mode_to_reschedule: bool = True def __init__(self): super().__init__() diff --git a/airflow/executors/local_kubernetes_executor.py b/airflow/executors/local_kubernetes_executor.py index c935a13af7ef0..f723ab99986fe 100644 --- a/airflow/executors/local_kubernetes_executor.py +++ b/airflow/executors/local_kubernetes_executor.py @@ -41,6 +41,7 @@ class LocalKubernetesExecutor(LoggingMixin): supports_ad_hoc_ti_run: bool = True supports_pickling: bool = False supports_sentry: bool = False + change_sensor_mode_to_reschedule: bool = False callback_sink: BaseCallbackSink | None = None diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index f20c7d7771dca..3ecc10a26bb68 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -32,6 +32,7 @@ AirflowSensorTimeout, AirflowSkipException, ) +from airflow.executors.executor_loader import ExecutorLoader from airflow.models.baseoperator import BaseOperator from airflow.models.skipmixin import SkipMixin from airflow.models.taskreschedule import TaskReschedule @@ -257,11 +258,13 @@ def _get_next_poke_interval( def prepare_for_execution(self) -> BaseOperator: task = super().prepare_for_execution() + # Sensors in `poke` mode can block execution of DAGs when running # with single process executor, thus we change the mode to`reschedule` # to allow parallel task being scheduled and executed - if conf.get("core", "executor") == "DebugExecutor": - self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.") + executor, _ = ExecutorLoader.import_default_executor_cls() + if executor.change_sensor_mode_to_reschedule: + self.log.warning("%s changes sensor mode to 'reschedule'.", executor.__name__) task.mode = "reschedule" return task diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index 7417c183d8147..34135a51f27c8 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -24,6 +24,23 @@ import time_machine from airflow.exceptions import AirflowException, AirflowRescheduleException, AirflowSensorTimeout +from airflow.executors.celery_executor import CeleryExecutor +from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor +from airflow.executors.debug_executor import DebugExecutor +from airflow.executors.executor_constants import ( + CELERY_EXECUTOR, + CELERY_KUBERNETES_EXECUTOR, + DASK_EXECUTOR, + DEBUG_EXECUTOR, + KUBERNETES_EXECUTOR, + LOCAL_EXECUTOR, + LOCAL_KUBERNETES_EXECUTOR, + SEQUENTIAL_EXECUTOR, +) +from airflow.executors.kubernetes_executor import KubernetesExecutor +from airflow.executors.local_executor import LocalExecutor +from airflow.executors.local_kubernetes_executor import LocalKubernetesExecutor +from airflow.executors.sequential_executor import SequentialExecutor from airflow.models import TaskReschedule from airflow.models.xcom import XCom from airflow.operators.empty import EmptyOperator @@ -675,6 +692,50 @@ def test_sensor_with_xcom_fails(self, make_sensor): ) assert actual_xcom_value is None + @pytest.mark.parametrize( + "executor_cls_mode", + [ + (CELERY_EXECUTOR, CeleryExecutor, "poke"), + (CELERY_KUBERNETES_EXECUTOR, CeleryKubernetesExecutor, "poke"), + (DEBUG_EXECUTOR, DebugExecutor, "reschedule"), + (KUBERNETES_EXECUTOR, KubernetesExecutor, "poke"), + (LOCAL_EXECUTOR, LocalExecutor, "poke"), + (LOCAL_KUBERNETES_EXECUTOR, LocalKubernetesExecutor, "poke"), + (SEQUENTIAL_EXECUTOR, SequentialExecutor, "poke"), + (DASK_EXECUTOR, DebugExecutor, "poke"), + ], + ids=[ + CELERY_EXECUTOR, + CELERY_KUBERNETES_EXECUTOR, + DEBUG_EXECUTOR, + KUBERNETES_EXECUTOR, + LOCAL_EXECUTOR, + LOCAL_KUBERNETES_EXECUTOR, + SEQUENTIAL_EXECUTOR, + DASK_EXECUTOR, + ], + ) + def test_prepare_for_execution(self, executor_cls_mode): + """ + Should change mode of the task to reschedule if using DEBUG_EXECUTOR + """ + executor_name, executor_cls, mode = executor_cls_mode + sensor = DummySensor( + task_id=SENSOR_OP, + return_value=None, + poke_interval=10, + timeout=60, + exponential_backoff=True, + max_wait=timedelta(seconds=30), + ) + with patch("airflow.configuration.conf.get") as get, patch( + "airflow.executors.executor_loader.ExecutorLoader.load_executor" + ) as load_executor: + get.return_value = executor_name + load_executor.return_value = executor_cls + task = sensor.prepare_for_execution() + assert task.mode == mode + @poke_mode_only class DummyPokeOnlySensor(BaseSensorOperator):