diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 2f1e53e182a10..1f30ef525f82d 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -306,3 +306,7 @@ def __repr__(self) -> str: class TaskDeferralError(AirflowException): """Raised when a task failed during deferral for some reason.""" + + +class PodReconciliationError(AirflowException): + """Raised when an error is encountered while trying to merge pod configs.""" diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index c76cf58f418d4..e510da2b314d6 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -35,7 +35,7 @@ from kubernetes.client.rest import ApiException from urllib3.exceptions import ReadTimeoutError -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, PodReconciliationError from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType from airflow.kubernetes import pod_generator from airflow.kubernetes.kube_client import get_kube_client @@ -300,8 +300,9 @@ def run_next(self, next_job: KubernetesJobType) -> None: and store relevant info in the current_jobs map so we can track the job's status """ - self.log.info('Kubernetes job is %s', str(next_job).replace("\n", " ")) key, command, kube_executor_config, pod_template_file = next_job + self.log.info('Kubernetes job is %s', key) + dag_id, task_id, run_id, try_number, map_index = key if command[0:3] != ["airflow", "tasks", "run"]: @@ -617,6 +618,13 @@ def sync(self) -> None: task = self.task_queue.get_nowait() try: self.kube_scheduler.run_next(task) + except PodReconciliationError as e: + self.log.error( + "Pod reconciliation failed, likely due to kubernetes library upgrade. " + "Try clearing the task to re-run.", + exc_info=True, + ) + self.fail(task[0], e) except ApiException as e: # These codes indicate something is wrong with pod definition; otherwise we assume pod diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 52b45801ccabc..8a86919a65b75 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -34,7 +34,7 @@ from kubernetes.client import models as k8s from kubernetes.client.api_client import ApiClient -from airflow.exceptions import AirflowConfigException +from airflow.exceptions import AirflowConfigException, PodReconciliationError from airflow.kubernetes.pod_generator_deprecated import PodDefaults, PodGenerator as PodGeneratorDeprecated from airflow.utils import yaml from airflow.version import version as airflow_version @@ -389,7 +389,10 @@ def construct_pod( # Pod from the pod_template_File -> Pod from executor_config arg -> Pod from the K8s executor pod_list = [base_worker_pod, pod_override_object, dynamic_pod] - return reduce(PodGenerator.reconcile_pods, pod_list) + try: + return reduce(PodGenerator.reconcile_pods, pod_list) + except Exception as e: + raise PodReconciliationError from e @staticmethod def serialize_pod(pod: k8s.V1Pod) -> dict: diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 0f5d49b819762..7685a53764a12 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -400,6 +400,20 @@ def key(self) -> "TaskInstanceKey": return self +def _executor_config_comparator(x, y): + """ + The TaskInstance.executor_config attribute is a pickled object that may contain + kubernetes objects. If the installed library version has changed since the + object was originally pickled, due to the underlying ``__eq__`` method on these + objects (which converts them to JSON), we may encounter attribute errors. In this + case we should replace the stored object. + """ + try: + return x == y + except AttributeError: + return False + + class TaskInstance(Base, LoggingMixin): """ Task instances store the state of a task instance. This table is the @@ -442,7 +456,7 @@ class TaskInstance(Base, LoggingMixin): queued_dttm = Column(UtcDateTime) queued_by_job_id = Column(Integer) pid = Column(Integer) - executor_config = Column(PickleType(pickler=dill)) + executor_config = Column(PickleType(pickler=dill, comparator=_executor_config_comparator)) external_executor_id = Column(String(ID_LEN, **COLLATION_ARGS)) diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 954f4f06001db..8ffeb5624b8cc 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -29,6 +29,7 @@ from urllib3 import HTTPResponse from airflow import AirflowException +from airflow.exceptions import PodReconciliationError from airflow.models.taskinstance import TaskInstanceKey from airflow.operators.bash import BashOperator from airflow.utils import timezone @@ -272,6 +273,44 @@ def test_run_next_exception_requeue( assert kubernetes_executor.task_queue.empty() assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + @pytest.mark.skipif( + AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed' + ) + @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher') + @mock.patch('airflow.executors.kubernetes_executor.get_kube_client') + def test_run_next_pod_reconciliation_error(self, mock_get_kube_client, mock_kubernetes_job_watcher): + """ + When construct_pod raises PodReconciliationError, we should fail the task. + """ + import sys + + path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml' + + mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True) + fail_msg = 'test message' + mock_kube_client.create_namespaced_pod = mock.MagicMock(side_effect=PodReconciliationError(fail_msg)) + mock_get_kube_client.return_value = mock_kube_client + mock_api_client = mock.MagicMock() + mock_api_client.sanitize_for_serialization.return_value = {} + mock_kube_client.api_client = mock_api_client + config = {('kubernetes', 'pod_template_file'): path} + with conf_vars(config): + kubernetes_executor = self.kubernetes_executor + kubernetes_executor.start() + # Execute a task while the Api Throws errors + try_number = 1 + task_instance_key = TaskInstanceKey('dag', 'task', 'run_id', try_number) + kubernetes_executor.execute_async( + key=task_instance_key, + queue=None, + command=['airflow', 'tasks', 'run', 'true', 'some_parameter'], + ) + kubernetes_executor.sync() + + assert kubernetes_executor.task_queue.empty() + assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg + @mock.patch('airflow.executors.kubernetes_executor.KubeConfig') @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync') @mock.patch('airflow.executors.base_executor.BaseExecutor.trigger_tasks') diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py index d220872187cb1..df5efb0d06c9f 100644 --- a/tests/kubernetes/test_pod_generator.py +++ b/tests/kubernetes/test_pod_generator.py @@ -19,6 +19,7 @@ import sys import uuid from unittest import mock +from unittest.mock import MagicMock import pytest from dateutil import parser @@ -26,7 +27,7 @@ from parameterized import parameterized from airflow import __version__ -from airflow.exceptions import AirflowConfigException +from airflow.exceptions import AirflowConfigException, PodReconciliationError from airflow.kubernetes.pod_generator import ( PodDefaults, PodGenerator, @@ -520,6 +521,33 @@ def test_construct_pod_empty_executor_config(self, mock_uuid): worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config) assert worker_config_result == sanitized_result + @mock.patch('uuid.uuid4') + def test_construct_pod_attribute_error(self, mock_uuid): + """ + After upgrading k8s library we might get attribute error. + In this case it should raise PodReconciliationError + """ + path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml' + worker_config = PodGenerator.deserialize_model_file(path) + mock_uuid.return_value = self.static_uuid + executor_config = MagicMock() + executor_config.side_effect = AttributeError('error') + + with pytest.raises(PodReconciliationError): + PodGenerator.construct_pod( + dag_id='dag_id', + task_id='task_id', + pod_id='pod_id', + kube_image='test-image', + try_number=3, + date=self.execution_date, + args=['command'], + pod_override_object=executor_config, + base_worker_pod=worker_config, + namespace='namespace', + scheduler_job_id='uuid', + ) + @mock.patch('uuid.uuid4') def test_ensure_max_label_length(self, mock_uuid): mock_uuid.return_value = self.static_uuid diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index a1d180fa1ee56..1b17ccc773c05 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -57,7 +57,7 @@ ) from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskfail import TaskFail -from airflow.models.taskinstance import TaskInstance +from airflow.models.taskinstance import TaskInstance, _executor_config_comparator from airflow.models.taskmap import TaskMap from airflow.models.xcom import XCOM_RETURN_KEY from airflow.operators.bash import BashOperator @@ -2859,3 +2859,22 @@ def get_extra_env(): echo_task = dag.get_task("echo") assert "get_extra_env" in echo_task.upstream_task_ids + + +def test_executor_config_comparator(): + """ + When comparison raises AttributeError, return False. + This can happen when executor config contains kubernetes objects pickled + under older kubernetes library version. + """ + + class MockAttrError: + def __eq__(self, other): + raise AttributeError('hello') + + a = MockAttrError() + with pytest.raises(AttributeError): + # just verify for ourselves that this throws + assert a == a + assert _executor_config_comparator(a, a) is False + assert _executor_config_comparator('a', 'a') is True