Skip to content
19 changes: 1 addition & 18 deletions airflow/example_dags/example_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,6 @@ def non_root_task():
print_stuff()

third_task = non_root_task()

executor_config_other_ns = {
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(namespace="test-namespace", labels={"release": "stable"})
)
}

@task(executor_config=executor_config_other_ns)
def other_namespace_task():
print_stuff()
Comment thread
amoghrajesh marked this conversation as resolved.

other_ns_task = other_namespace_task()
worker_container_repository = conf.get("kubernetes_executor", "worker_container_repository")
worker_container_tag = conf.get("kubernetes_executor", "worker_container_tag")

Expand Down Expand Up @@ -231,9 +219,4 @@ def task_with_resource_limits():

four_task = task_with_resource_limits()

(
start_task()
>> [volume_task, other_ns_task, sidecar_task]
>> third_task
>> [base_image_task, four_task]
)
(start_task() >> [volume_task, sidecar_task] >> third_task >> [base_image_task, four_task])
1 change: 1 addition & 0 deletions airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class TaskInstance(BaseModel):
pool_slots: int
queue: str
priority_weight: int
executor_config: dict | None = None

# TODO: Task-SDK: Can we replace TastInstanceKey with just the uuid across the codebase?
@property
Expand Down
15 changes: 0 additions & 15 deletions kubernetes_tests/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@

@pytest.mark.skipif(EXECUTOR != "KubernetesExecutor", reason="Only runs on KubernetesExecutor")
class TestKubernetesExecutor(BaseK8STest):
@pytest.mark.skip(
reason="TODO: AIP-72 Porting over executor_config not yet done. Remove once #46892 is handled"
)
@pytest.mark.execution_timeout(300)
def test_integration_run_dag(self):
dag_id = "example_kubernetes_executor"
Expand All @@ -54,9 +51,6 @@ def test_integration_run_dag(self):
)

@pytest.mark.execution_timeout(300)
@pytest.mark.skip(
reason="TODO: AIP-72 Porting over executor_config not yet done. Remove once #46892 is handled"
)
def test_integration_run_dag_with_scheduler_failure(self):
dag_id = "example_kubernetes_executor"

Expand All @@ -75,15 +69,6 @@ def test_integration_run_dag_with_scheduler_failure(self):
timeout=300,
)

self.monitor_task(
host=self.host,
dag_run_id=dag_run_id,
dag_id=dag_id,
task_id="other_namespace_task",
expected_final_state="success",
timeout=300,
)
Comment thread
amoghrajesh marked this conversation as resolved.

self.ensure_dag_expected_state(
host=self.host,
logical_date=logical_date,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,18 @@ def queue_workload(self, workload: workloads.All, session: Session | None) -> No
self.queued_tasks[ti.key] = workload

def _process_workloads(self, workloads: list[workloads.All]) -> None:
from airflow.executors.workloads import ExecuteTask

# Airflow V3 version
for w in workloads:
if not isinstance(w, ExecuteTask):
raise RuntimeError(f"{type(self)} cannot handle workloads of type {type(w)}")

# TODO: AIP-72 handle populating tokens once https://github.com/apache/airflow/issues/45107 is handled.
command = [w]
key = w.ti.key # type: ignore[union-attr]
queue = w.ti.queue # type: ignore[union-attr]

# TODO: will be handled by https://github.com/apache/airflow/issues/46892
executor_config = {} # type: ignore[var-annotated]
key = w.ti.key
queue = w.ti.queue
executor_config = w.ti.executor_config or {}

del self.queued_tasks[key]
self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config) # type: ignore[arg-type]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,9 @@ def run_next(self, next_job: KubernetesJobType) -> None:

if isinstance(command[0], ExecuteTask):
workload = command[0]
ser_input = workload.model_dump_json()
# `executor_config` is a k8s.V1Pod object and we do not need to pass it to the
# execute_workload module. So, we exclude it from the serialisation process.
ser_input = workload.model_dump_json(exclude={"ti": {"executor_config"}})
Comment thread
amoghrajesh marked this conversation as resolved.
command = [
"python",
"-m",
Expand Down