From 80aa3cde9dcfd6552b7baa6611abf2969bd0bb8a Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 20 Feb 2025 16:25:41 +0530 Subject: [PATCH 1/6] AIP-72: Port over executor_config for K8sExecutor when using task sdk --- airflow/example_dags/sleep_dag.py | 33 +++++++ airflow/executors/workloads.py | 1 + kubernetes_tests/test_kubernetes_executor.py | 6 -- .../executors/kubernetes_executor.py | 3 +- .../executors/kubernetes_executor_utils.py | 4 +- .../cncf/kubernetes/pod_generator.py | 86 ++++++++++++------- .../airflow/sdk/execution_time/task_runner.py | 7 +- 7 files changed, 101 insertions(+), 39 deletions(-) create mode 100644 airflow/example_dags/sleep_dag.py diff --git a/airflow/example_dags/sleep_dag.py b/airflow/example_dags/sleep_dag.py new file mode 100644 index 0000000000000..73026648f700a --- /dev/null +++ b/airflow/example_dags/sleep_dag.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow.decorators import dag +from airflow.providers.standard.operators.bash import BashOperator + + +@dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False) +def sleep_dag(): + t1 = BashOperator( + task_id="sleep_10_minutes", + bash_command="sleep 600", + ) + + +sleep_dag() diff --git a/airflow/executors/workloads.py b/airflow/executors/workloads.py index 5cb0102ce66df..3be2ad22d2399 100644 --- a/airflow/executors/workloads.py +++ b/airflow/executors/workloads.py @@ -61,6 +61,7 @@ class TaskInstance(BaseModel): pool_slots: int queue: str priority_weight: int + executor_config: dict | None = None # TODO: Task-SDK: Can we replace TastInstanceKey with just the uuid across the codebase? @property diff --git a/kubernetes_tests/test_kubernetes_executor.py b/kubernetes_tests/test_kubernetes_executor.py index 92e58d98118bd..622a4daaa0df0 100644 --- a/kubernetes_tests/test_kubernetes_executor.py +++ b/kubernetes_tests/test_kubernetes_executor.py @@ -26,9 +26,6 @@ @pytest.mark.skipif(EXECUTOR != "KubernetesExecutor", reason="Only runs on KubernetesExecutor") class TestKubernetesExecutor(BaseK8STest): - @pytest.mark.skip( - reason="TODO: AIP-72 Porting over executor_config not yet done. Remove once #46892 is handled" - ) @pytest.mark.execution_timeout(300) def test_integration_run_dag(self): dag_id = "example_kubernetes_executor" @@ -54,9 +51,6 @@ def test_integration_run_dag(self): ) @pytest.mark.execution_timeout(300) - @pytest.mark.skip( - reason="TODO: AIP-72 Porting over executor_config not yet done. Remove once #46892 is handled" - ) def test_integration_run_dag_with_scheduler_failure(self): dag_id = "example_kubernetes_executor" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 3f6f49070ae0a..42f6d63a5c46d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -259,6 +259,7 @@ def execute_async( self.log.info("Add task %s with command %s", key, command) try: + # here its v1 pod obj kube_executor_config = PodGenerator.from_obj(executor_config) except Exception: self.log.error("Invalid executor_config for %s. Executor_config: %s", key, executor_config) @@ -292,7 +293,7 @@ def _process_workloads(self, workloads: list[workloads.All]) -> None: queue = w.ti.queue # type: ignore[union-attr] # TODO: will be handled by https://github.com/apache/airflow/issues/46892 - executor_config = {} # type: ignore[var-annotated] + executor_config = w.ti.executor_config del self.queued_tasks[key] self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config) # type: ignore[arg-type] diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index dda6b6e6b10be..38c43363da61d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -385,6 +385,7 @@ def _health_check_kube_watchers(self): def run_next(self, next_job: KubernetesJobType) -> None: """Receives the next job to run, builds the pod, and creates it.""" + # here is pod object key, command, kube_executor_config, pod_template_file = next_job dag_id, task_id, run_id, try_number, map_index = key @@ -394,7 +395,8 @@ def run_next(self, next_job: KubernetesJobType) -> None: if isinstance(command[0], ExecuteTask): workload = command[0] - ser_input = workload.model_dump_json() + print("The workload is", workload) + ser_input = workload.model_dump_json(exclude={"ti": {"executor_config"}}) command = [ "python", "-m", diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py index e1f599a40a89b..bd9102c69e725 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py @@ -47,7 +47,7 @@ from airflow.utils import yaml from airflow.utils.hashlib_wrapper import md5 from airflow.version import version as airflow_version -from kubernetes.client import V1EmptyDirVolumeSource, V1Volume, V1VolumeMount, models as k8s +from kubernetes.client import models as k8s from kubernetes.client.api_client import ApiClient if TYPE_CHECKING: @@ -327,6 +327,16 @@ def construct_pod( if run_id: annotations["run_id"] = run_id + log_volume = k8s.V1Volume( + name="logs-volume", + empty_dir=k8s.V1EmptyDirVolumeSource(), + ) + + log_volume_mount = k8s.V1VolumeMount( + name="logs-volume", + mount_path="/opt/airflow/logs", + ) + main_container = k8s.V1Container( name="base", args=args, @@ -334,38 +344,43 @@ def construct_pod( env=[ k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True"), ], - ) - dynamic_pod = k8s.V1Pod( - metadata=k8s.V1ObjectMeta( - namespace=namespace, - annotations=annotations, - name=pod_id, - labels=cls.build_labels_for_k8s_executor_pod( - dag_id=dag_id, - task_id=task_id, - try_number=try_number, - airflow_worker=scheduler_job_id, - map_index=map_index, - logical_date=date, - run_id=run_id, - ), - ), + volume_mounts=[], ) + log_tailer_container = k8s.V1Container( + name="log-tailer", + image="busybox", + command=[ + "sh", + "-c", + """ + while [ -z "$(find /opt/airflow/logs -type f -name '*.log' 2>/dev/null)" ]; do + echo 'Waiting for logs...'; sleep 2; + done; + find /opt/airflow/logs -type f -name '*.log' -print0 | xargs -0 tail -F + """, + ], + volume_mounts=[ + k8s.V1VolumeMount( + name="logs", + mount_path="/opt/airflow/logs", + ) + ], + ) podspec = k8s.V1PodSpec( - containers=[main_container], + containers=[main_container, log_tailer_container], + volumes=[log_volume], ) if content_json_for_volume: import shlex input_file_path = "/tmp/execute/input.json" - execute_volume = V1Volume( + execute_volume = k8s.V1Volume( name="execute-volume", - empty_dir=V1EmptyDirVolumeSource(), + empty_dir=k8s.V1EmptyDirVolumeSource(), ) - - execute_volume_mount = V1VolumeMount( + execute_volume_mount = k8s.V1VolumeMount( name="execute-volume", mount_path="/tmp/execute", read_only=False, @@ -379,17 +394,30 @@ def construct_pod( volume_mounts=[execute_volume_mount], ) - main_container.volume_mounts = [execute_volume_mount] + main_container.volume_mounts.append(execute_volume_mount) main_container.command = args[:-1] main_container.args = args[-1:] - podspec = k8s.V1PodSpec( - containers=[main_container], - volumes=[execute_volume], - init_containers=[init_container], - ) + podspec.init_containers = [init_container] + podspec.volumes.append(execute_volume) - dynamic_pod.spec = podspec + dynamic_pod = k8s.V1Pod( + metadata=k8s.V1ObjectMeta( + namespace=namespace, + annotations=annotations, + name=pod_id, + labels=cls.build_labels_for_k8s_executor_pod( + dag_id=dag_id, + task_id=task_id, + try_number=try_number, + airflow_worker=scheduler_job_id, + map_index=map_index, + logical_date=date, + run_id=run_id, + ), + ), + spec=podspec, + ) # Reconcile the pods starting with the first chronologically, # Pod from the pod_template_File -> Pod from the K8s executor -> Pod from executor_config arg diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py b/task_sdk/src/airflow/sdk/execution_time/task_runner.py index 99967579bb493..4b514208309ba 100644 --- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py @@ -377,7 +377,7 @@ def _xcom_push(ti: RuntimeTaskInstance, key: str, value: Any, mapped_length: int ) -def parse(what: StartupDetails) -> RuntimeTaskInstance: +def parse(what: StartupDetails, log) -> RuntimeTaskInstance: # TODO: Task-SDK: # Using DagBag here is about 98% wrong, but it'll do for now @@ -390,7 +390,10 @@ def parse(what: StartupDetails) -> RuntimeTaskInstance: ) bundle_instance.initialize() + log.info("The bundle instance is", bundle_instance=bundle_instance) + dag_absolute_path = os.fspath(Path(bundle_instance.path, what.dag_rel_path)) + log.info("dag absolute path:", dag_absolute_path=dag_absolute_path, dag_id=what.ti.dag_id) bag = DagBag( dag_folder=dag_absolute_path, include_examples=False, @@ -493,7 +496,7 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]: log = structlog.get_logger(logger_name="task") with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id): - ti = parse(msg) + ti = parse(msg, log) log.debug("DAG file parsed", file=msg.dag_rel_path) else: raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}") From 661772fe510aa70489e10456975f0a74d09788b4 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 20 Feb 2025 18:45:23 +0530 Subject: [PATCH 2/6] removing other ns test --- .../example_dags/example_kubernetes_executor.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py index 395398ee93c8f..bdac81faff9fc 100644 --- a/airflow/example_dags/example_kubernetes_executor.py +++ b/airflow/example_dags/example_kubernetes_executor.py @@ -149,18 +149,6 @@ def non_root_task(): print_stuff() third_task = non_root_task() - - executor_config_other_ns = { - "pod_override": k8s.V1Pod( - metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={"release": "stable"}) - ) - } - - @task(executor_config=executor_config_other_ns) - def other_namespace_task(): - print_stuff() - - other_ns_task = other_namespace_task() worker_container_repository = conf.get("kubernetes_executor", "worker_container_repository") worker_container_tag = conf.get("kubernetes_executor", "worker_container_tag") @@ -233,7 +221,7 @@ def task_with_resource_limits(): ( start_task() - >> [volume_task, other_ns_task, sidecar_task] + >> [volume_task, sidecar_task] >> third_task >> [base_image_task, four_task] ) From 745fb042e266699e1c6354b72e3899d027f6cc21 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 20 Feb 2025 19:47:14 +0530 Subject: [PATCH 3/6] fixing some other tests --- airflow/example_dags/example_kubernetes_executor.py | 7 +------ kubernetes_tests/test_kubernetes_executor.py | 9 --------- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py index bdac81faff9fc..27ffb4cdad6d7 100644 --- a/airflow/example_dags/example_kubernetes_executor.py +++ b/airflow/example_dags/example_kubernetes_executor.py @@ -219,9 +219,4 @@ def task_with_resource_limits(): four_task = task_with_resource_limits() - ( - start_task() - >> [volume_task, sidecar_task] - >> third_task - >> [base_image_task, four_task] - ) + (start_task() >> [volume_task, sidecar_task] >> third_task >> [base_image_task, four_task]) diff --git a/kubernetes_tests/test_kubernetes_executor.py b/kubernetes_tests/test_kubernetes_executor.py index 622a4daaa0df0..63d1389b17103 100644 --- a/kubernetes_tests/test_kubernetes_executor.py +++ b/kubernetes_tests/test_kubernetes_executor.py @@ -69,15 +69,6 @@ def test_integration_run_dag_with_scheduler_failure(self): timeout=300, ) - self.monitor_task( - host=self.host, - dag_run_id=dag_run_id, - dag_id=dag_id, - task_id="other_namespace_task", - expected_final_state="success", - timeout=300, - ) - self.ensure_dag_expected_state( host=self.host, logical_date=logical_date, From c280ac4bdbb81d40676eec561074201d17b3e0a0 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 21 Feb 2025 12:10:18 +0530 Subject: [PATCH 4/6] removing extra stuff --- airflow/example_dags/sleep_dag.py | 33 ------- .../executors/kubernetes_executor.py | 4 +- .../executors/kubernetes_executor_utils.py | 2 - .../cncf/kubernetes/pod_generator.py | 86 +++++++------------ .../airflow/sdk/execution_time/task_runner.py | 7 +- 5 files changed, 32 insertions(+), 100 deletions(-) delete mode 100644 airflow/example_dags/sleep_dag.py diff --git a/airflow/example_dags/sleep_dag.py b/airflow/example_dags/sleep_dag.py deleted file mode 100644 index 73026648f700a..0000000000000 --- a/airflow/example_dags/sleep_dag.py +++ /dev/null @@ -1,33 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from datetime import datetime - -from airflow.decorators import dag -from airflow.providers.standard.operators.bash import BashOperator - - -@dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False) -def sleep_dag(): - t1 = BashOperator( - task_id="sleep_10_minutes", - bash_command="sleep 600", - ) - - -sleep_dag() diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 42f6d63a5c46d..65a7f5225038a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -259,7 +259,6 @@ def execute_async( self.log.info("Add task %s with command %s", key, command) try: - # here its v1 pod obj kube_executor_config = PodGenerator.from_obj(executor_config) except Exception: self.log.error("Invalid executor_config for %s. Executor_config: %s", key, executor_config) @@ -292,8 +291,7 @@ def _process_workloads(self, workloads: list[workloads.All]) -> None: key = w.ti.key # type: ignore[union-attr] queue = w.ti.queue # type: ignore[union-attr] - # TODO: will be handled by https://github.com/apache/airflow/issues/46892 - executor_config = w.ti.executor_config + executor_config = w.ti.executor_config or {} # type: ignore[union-attr] del self.queued_tasks[key] self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config) # type: ignore[arg-type] diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 38c43363da61d..0ef731783e62f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -385,7 +385,6 @@ def _health_check_kube_watchers(self): def run_next(self, next_job: KubernetesJobType) -> None: """Receives the next job to run, builds the pod, and creates it.""" - # here is pod object key, command, kube_executor_config, pod_template_file = next_job dag_id, task_id, run_id, try_number, map_index = key @@ -395,7 +394,6 @@ def run_next(self, next_job: KubernetesJobType) -> None: if isinstance(command[0], ExecuteTask): workload = command[0] - print("The workload is", workload) ser_input = workload.model_dump_json(exclude={"ti": {"executor_config"}}) command = [ "python", diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py index bd9102c69e725..e1f599a40a89b 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/pod_generator.py @@ -47,7 +47,7 @@ from airflow.utils import yaml from airflow.utils.hashlib_wrapper import md5 from airflow.version import version as airflow_version -from kubernetes.client import models as k8s +from kubernetes.client import V1EmptyDirVolumeSource, V1Volume, V1VolumeMount, models as k8s from kubernetes.client.api_client import ApiClient if TYPE_CHECKING: @@ -327,16 +327,6 @@ def construct_pod( if run_id: annotations["run_id"] = run_id - log_volume = k8s.V1Volume( - name="logs-volume", - empty_dir=k8s.V1EmptyDirVolumeSource(), - ) - - log_volume_mount = k8s.V1VolumeMount( - name="logs-volume", - mount_path="/opt/airflow/logs", - ) - main_container = k8s.V1Container( name="base", args=args, @@ -344,43 +334,38 @@ def construct_pod( env=[ k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True"), ], - volume_mounts=[], ) - - log_tailer_container = k8s.V1Container( - name="log-tailer", - image="busybox", - command=[ - "sh", - "-c", - """ - while [ -z "$(find /opt/airflow/logs -type f -name '*.log' 2>/dev/null)" ]; do - echo 'Waiting for logs...'; sleep 2; - done; - find /opt/airflow/logs -type f -name '*.log' -print0 | xargs -0 tail -F - """, - ], - volume_mounts=[ - k8s.V1VolumeMount( - name="logs", - mount_path="/opt/airflow/logs", - ) - ], + dynamic_pod = k8s.V1Pod( + metadata=k8s.V1ObjectMeta( + namespace=namespace, + annotations=annotations, + name=pod_id, + labels=cls.build_labels_for_k8s_executor_pod( + dag_id=dag_id, + task_id=task_id, + try_number=try_number, + airflow_worker=scheduler_job_id, + map_index=map_index, + logical_date=date, + run_id=run_id, + ), + ), ) + podspec = k8s.V1PodSpec( - containers=[main_container, log_tailer_container], - volumes=[log_volume], + containers=[main_container], ) if content_json_for_volume: import shlex input_file_path = "/tmp/execute/input.json" - execute_volume = k8s.V1Volume( + execute_volume = V1Volume( name="execute-volume", - empty_dir=k8s.V1EmptyDirVolumeSource(), + empty_dir=V1EmptyDirVolumeSource(), ) - execute_volume_mount = k8s.V1VolumeMount( + + execute_volume_mount = V1VolumeMount( name="execute-volume", mount_path="/tmp/execute", read_only=False, @@ -394,30 +379,17 @@ def construct_pod( volume_mounts=[execute_volume_mount], ) - main_container.volume_mounts.append(execute_volume_mount) + main_container.volume_mounts = [execute_volume_mount] main_container.command = args[:-1] main_container.args = args[-1:] - podspec.init_containers = [init_container] - podspec.volumes.append(execute_volume) + podspec = k8s.V1PodSpec( + containers=[main_container], + volumes=[execute_volume], + init_containers=[init_container], + ) - dynamic_pod = k8s.V1Pod( - metadata=k8s.V1ObjectMeta( - namespace=namespace, - annotations=annotations, - name=pod_id, - labels=cls.build_labels_for_k8s_executor_pod( - dag_id=dag_id, - task_id=task_id, - try_number=try_number, - airflow_worker=scheduler_job_id, - map_index=map_index, - logical_date=date, - run_id=run_id, - ), - ), - spec=podspec, - ) + dynamic_pod.spec = podspec # Reconcile the pods starting with the first chronologically, # Pod from the pod_template_File -> Pod from the K8s executor -> Pod from executor_config arg diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py b/task_sdk/src/airflow/sdk/execution_time/task_runner.py index 4b514208309ba..99967579bb493 100644 --- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py @@ -377,7 +377,7 @@ def _xcom_push(ti: RuntimeTaskInstance, key: str, value: Any, mapped_length: int ) -def parse(what: StartupDetails, log) -> RuntimeTaskInstance: +def parse(what: StartupDetails) -> RuntimeTaskInstance: # TODO: Task-SDK: # Using DagBag here is about 98% wrong, but it'll do for now @@ -390,10 +390,7 @@ def parse(what: StartupDetails, log) -> RuntimeTaskInstance: ) bundle_instance.initialize() - log.info("The bundle instance is", bundle_instance=bundle_instance) - dag_absolute_path = os.fspath(Path(bundle_instance.path, what.dag_rel_path)) - log.info("dag absolute path:", dag_absolute_path=dag_absolute_path, dag_id=what.ti.dag_id) bag = DagBag( dag_folder=dag_absolute_path, include_examples=False, @@ -496,7 +493,7 @@ def startup() -> tuple[RuntimeTaskInstance, Logger]: log = structlog.get_logger(logger_name="task") with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id): - ti = parse(msg, log) + ti = parse(msg) log.debug("DAG file parsed", file=msg.dag_rel_path) else: raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}") From 1f023b94dc160cd3ad18dd5f745679dc4f3adb16 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Fri, 21 Feb 2025 20:11:23 +0530 Subject: [PATCH 5/6] fixing mypy better --- .../cncf/kubernetes/executors/kubernetes_executor.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 65a7f5225038a..21b18ef45711a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -284,14 +284,18 @@ def queue_workload(self, workload: workloads.All, session: Session | None) -> No self.queued_tasks[ti.key] = workload def _process_workloads(self, workloads: list[workloads.All]) -> None: + from airflow.executors.workloads import ExecuteTask + # Airflow V3 version for w in workloads: + if not isinstance(w, ExecuteTask): + raise RuntimeError(f"{type(self)} cannot handle workloads of type {type(w)}") + # TODO: AIP-72 handle populating tokens once https://github.com/apache/airflow/issues/45107 is handled. command = [w] - key = w.ti.key # type: ignore[union-attr] - queue = w.ti.queue # type: ignore[union-attr] - - executor_config = w.ti.executor_config or {} # type: ignore[union-attr] + key = w.ti.key + queue = w.ti.queue + executor_config = w.ti.executor_config or {} del self.queued_tasks[key] self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config) # type: ignore[arg-type] From 4cc4fa6004d5af24eec591e73e78ca0654487bda Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 24 Feb 2025 12:24:59 +0530 Subject: [PATCH 6/6] adding comment --- .../cncf/kubernetes/executors/kubernetes_executor_utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 0ef731783e62f..6a98bdfda3ab3 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -394,6 +394,8 @@ def run_next(self, next_job: KubernetesJobType) -> None: if isinstance(command[0], ExecuteTask): workload = command[0] + # `executor_config` is a k8s.V1Pod object and we do not need to pass it to the + # execute_workload module. So, we exclude it from the serialisation process. ser_input = workload.model_dump_json(exclude={"ti": {"executor_config"}}) command = [ "python",