From bc3a926c7a74bd04db7b88d3f91e9ac8ae68a501 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Fri, 24 Apr 2026 10:22:08 -0700 Subject: [PATCH] Small param rename in BaseExecutor and CeleryExecutor --- airflow-core/src/airflow/executors/base_executor.py | 4 ++-- .../src/airflow/providers/celery/executors/celery_executor.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/executors/base_executor.py b/airflow-core/src/airflow/executors/base_executor.py index 8f20f23f8da05..d21bfa4fed875 100644 --- a/airflow-core/src/airflow/executors/base_executor.py +++ b/airflow-core/src/airflow/executors/base_executor.py @@ -295,7 +295,7 @@ def _get_workloads_to_schedule(self, open_slots: int) -> list[tuple[WorkloadKey, return workloads_to_schedule - def _process_workloads(self, workloads: Sequence[ExecutorWorkload]) -> None: + def _process_workloads(self, workload_items: Sequence[ExecutorWorkload]) -> None: """ Process the given workloads. @@ -303,7 +303,7 @@ def _process_workloads(self, workloads: Sequence[ExecutorWorkload]) -> None: the execution of workloads (e.g., queuing them to workers, submitting to external systems, etc.). - :param workloads: List of workloads to process + :param workload_items: List of workloads to process """ raise NotImplementedError(f"{type(self).__name__} must implement _process_workloads()") diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py index 16d634fbdefbb..7fc388c9608a8 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py @@ -173,7 +173,7 @@ def _process_tasks(self, task_tuples: Sequence[TaskTuple]) -> None: self._send_workloads(task_tuples_to_send) - def _process_workloads(self, workloads: Sequence[workloads.All]) -> None: + def _process_workloads(self, workload_items: Sequence[workloads.All]) -> None: # Airflow V3 version -- have to delay imports until we know we are on v3. from airflow.executors.workloads import ExecuteTask @@ -181,7 +181,7 @@ def _process_workloads(self, workloads: Sequence[workloads.All]) -> None: from airflow.executors.workloads import ExecuteCallback workloads_to_be_sent: list[WorkloadInCelery] = [] - for workload in workloads: + for workload in workload_items: if isinstance(workload, ExecuteTask): workloads_to_be_sent.append((workload.ti.key, workload, workload.ti.queue, self.team_name)) elif AIRFLOW_V_3_2_PLUS and isinstance(workload, ExecuteCallback):