diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index de127492998e9..8094c1863e290 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -38,6 +38,7 @@ task_dependencies_collection_schema, task_instance_batch_form, task_instance_collection_schema, + task_instance_history_schema, task_instance_reference_collection_schema, task_instance_reference_schema, task_instance_schema, @@ -754,3 +755,62 @@ def get_mapped_task_instance_dependencies( return get_task_instance_dependencies( dag_id=dag_id, dag_run_id=dag_run_id, task_id=task_id, map_index=map_index ) + + +@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE) +@provide_session +def get_task_instance_try_details( + *, + dag_id: str, + dag_run_id: str, + task_id: str, + task_try_number: int, + map_index: int = -1, + session: Session = NEW_SESSION, +) -> APIResponse: + """Get details of a task instance try.""" + from airflow.models.taskinstancehistory import TaskInstanceHistory + + def _query(orm_object): + query = select(orm_object).where( + orm_object.dag_id == dag_id, + orm_object.run_id == dag_run_id, + orm_object.task_id == task_id, + orm_object.try_number == task_try_number, + orm_object.map_index == map_index, + ) + try: + result = session.execute(query).one_or_none() + except MultipleResultsFound: + raise NotFound( + "Task instance not found", + detail="Task instance is mapped, add the map_index value to the URL", + ) + return result + + result = _query(TI) or _query(TaskInstanceHistory) + if result is None: + error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}, map_index={map_index}, try_number={task_try_number}." + raise NotFound("Task instance not found", detail=error_message) + return task_instance_history_schema.dump(result[0]) + + +@provide_session +def get_mapped_task_instance_try_details( + *, + dag_id: str, + dag_run_id: str, + task_id: str, + task_try_number: int, + map_index: int, + session: Session = NEW_SESSION, +) -> APIResponse: + """Get details of a mapped task instance try.""" + return get_task_instance_try_details( + dag_id=dag_id, + dag_run_id=dag_run_id, + task_id=task_id, + task_try_number=task_try_number, + map_index=map_index, + session=session, + ) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index cefb9faa7c0aa..0f99d145e1b72 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1722,6 +1722,65 @@ paths: "404": $ref: "#/components/responses/NotFound" + /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}: + get: + summary: get taskinstance try + description: | + Get details of a task instance try. + + *New in version 2.10.0* + x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint + operationId: get_task_instance_try_details + tags: [TaskInstance] + parameters: + - $ref: "#/components/parameters/DAGID" + - $ref: "#/components/parameters/DAGRunID" + - $ref: "#/components/parameters/TaskID" + - $ref: "#/components/parameters/TaskTryNumber" + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/TaskInstance" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + + /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}: + get: + summary: get mapped taskinstance try + description: | + Get details of a mapped task instance try. + + *New in version 2.10.0* + x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint + operationId: get_mapped_task_instance_try_details + tags: [TaskInstance] + parameters: + - $ref: "#/components/parameters/DAGID" + - $ref: "#/components/parameters/DAGRunID" + - $ref: "#/components/parameters/TaskID" + - $ref: "#/components/parameters/MapIndex" + - $ref: "#/components/parameters/TaskTryNumber" + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/TaskInstance" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + /variables: get: summary: List variables diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 8ef2987c88d49..811f92e0a2a1f 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -29,6 +29,7 @@ from airflow.api_connexion.schemas.sla_miss_schema import SlaMissSchema from airflow.api_connexion.schemas.trigger_schema import TriggerSchema from airflow.models import TaskInstance +from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.utils.helpers import exactly_one from airflow.utils.state import TaskInstanceState @@ -86,6 +87,38 @@ def get_attribute(self, obj, attr, default): return get_value(obj[0], attr, default) +class TaskInstanceHistorySchema(SQLAlchemySchema): + """Task instance schema.""" + + class Meta: + """Meta.""" + + model = TaskInstanceHistory + + task_id = auto_field() + dag_id = auto_field() + run_id = auto_field(data_key="dag_run_id") + map_index = auto_field() + start_date = auto_field() + end_date = auto_field() + duration = auto_field() + state = TaskInstanceStateField() + try_number = auto_field() + max_tries = auto_field() + task_display_name = fields.String(attribute="task_display_name", dump_only=True) + hostname = auto_field() + unixname = auto_field() + pool = auto_field() + pool_slots = auto_field() + queue = auto_field() + priority_weight = auto_field() + operator = auto_field() + queued_dttm = auto_field(data_key="queued_when") + pid = auto_field() + executor = auto_field() + executor_config = auto_field() + + class TaskInstanceCollection(NamedTuple): """List of task instances with metadata.""" @@ -245,3 +278,4 @@ class TaskDependencyCollectionSchema(Schema): task_instance_reference_schema = TaskInstanceReferenceSchema() task_instance_reference_collection_schema = TaskInstanceReferenceCollectionSchema() set_task_instance_note_form_schema = SetTaskInstanceNoteFormSchema() +task_instance_history_schema = TaskInstanceHistorySchema() diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index ad5368299d626..427e62205b5dd 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -547,6 +547,22 @@ export interface paths { */ post: operations["get_task_instances_batch"]; }; + "/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}": { + /** + * Get details of a task instance try. + * + * *New in version 2.10.0* + */ + get: operations["get_task_instance_try_details"]; + }; + "/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}": { + /** + * Get details of a mapped task instance try. + * + * *New in version 2.10.0* + */ + get: operations["get_mapped_task_instance_try_details"]; + }; "/variables": { /** The collection does not contain data. To get data, you must get a single entity. */ get: operations["get_variables"]; @@ -4264,6 +4280,68 @@ export interface operations { }; }; }; + /** + * Get details of a task instance try. + * + * *New in version 2.10.0* + */ + get_task_instance_try_details: { + parameters: { + path: { + /** The DAG ID. */ + dag_id: components["parameters"]["DAGID"]; + /** The DAG run ID. */ + dag_run_id: components["parameters"]["DAGRunID"]; + /** The task ID. */ + task_id: components["parameters"]["TaskID"]; + /** The task try number. */ + task_try_number: components["parameters"]["TaskTryNumber"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["TaskInstance"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + }; + }; + /** + * Get details of a mapped task instance try. + * + * *New in version 2.10.0* + */ + get_mapped_task_instance_try_details: { + parameters: { + path: { + /** The DAG ID. */ + dag_id: components["parameters"]["DAGID"]; + /** The DAG run ID. */ + dag_run_id: components["parameters"]["DAGRunID"]; + /** The task ID. */ + task_id: components["parameters"]["TaskID"]; + /** The map index. */ + map_index: components["parameters"]["MapIndex"]; + /** The task try number. */ + task_try_number: components["parameters"]["TaskTryNumber"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["TaskInstance"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + }; + }; /** The collection does not contain data. To get data, you must get a single entity. */ get_variables: { parameters: { @@ -5640,6 +5718,12 @@ export type GetMappedTaskInstancesVariables = CamelCasedPropertiesDeep< export type GetTaskInstancesBatchVariables = CamelCasedPropertiesDeep< operations["get_task_instances_batch"]["requestBody"]["content"]["application/json"] >; +export type GetTaskInstanceTryDetailsVariables = CamelCasedPropertiesDeep< + operations["get_task_instance_try_details"]["parameters"]["path"] +>; +export type GetMappedTaskInstanceTryDetailsVariables = CamelCasedPropertiesDeep< + operations["get_mapped_task_instance_try_details"]["parameters"]["path"] +>; export type GetVariablesVariables = CamelCasedPropertiesDeep< operations["get_variables"]["parameters"]["query"] >; diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 62ae45c1bd077..e77f75c3038a9 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -30,6 +30,7 @@ from airflow.jobs.triggerer_job_runner import TriggererJobRunner from airflow.models import DagRun, SlaMiss, TaskInstance, Trigger from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF +from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.security import permissions from airflow.utils.platform import getuser from airflow.utils.session import provide_session @@ -150,6 +151,7 @@ def create_task_instances( update_extras: bool = True, task_instances=None, dag_run_state=State.RUNNING, + with_ti_history=False, ): """Method to create task instances using kwargs and default arguments""" @@ -196,6 +198,17 @@ def create_task_instances( tis.append(ti) session.commit() + if with_ti_history: + for ti in tis: + ti.try_number = 1 + session.merge(ti) + session.commit() + dag.clear() + for ti in tis: + ti.try_number = 2 + ti.queue = "default_queue" + session.merge(ti) + session.commit() return tis @@ -2671,3 +2684,261 @@ def test_should_respond_404(self, session): environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 + + +class TestGetTaskInstanceTry(TestTaskInstanceEndpoint): + def setup_method(self): + clear_db_runs() + + def teardown_method(self): + clear_db_runs() + + @pytest.mark.parametrize("username", ["test", "test_dag_read_only", "test_task_read_only"]) + @provide_session + def test_should_respond_200(self, username, session): + self.create_task_instances(session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True) + + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1", + environ_overrides={"REMOTE_USER": username}, + ) + assert response.status_code == 200 + assert response.json == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "success", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + + @pytest.mark.parametrize("try_number", [1, 2]) + @provide_session + def test_should_respond_200_with_different_try_numbers(self, try_number, session): + self.create_task_instances(session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True) + + response = self.client.get( + f"/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/{try_number}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0 if try_number == 1 else 1, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "success" if try_number == 1 else None, + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": try_number, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + + @pytest.mark.parametrize("try_number", [1, 2]) + @provide_session + def test_should_respond_200_with_mapped_task_at_different_try_numbers(self, try_number, session): + tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}]) + old_ti = tis[0] + for idx in (1, 2): + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + ti.try_number = 1 + for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "note"]: + setattr(ti, attr, getattr(old_ti, attr)) + session.add(ti) + session.commit() + tis = session.query(TaskInstance).all() + + # Record the task instance history + from airflow.models.taskinstance import clear_task_instances + + clear_task_instances(tis, session) + # Simulate the try_number increasing to new values in TI + for ti in tis: + if ti.map_index > 0: + ti.try_number += 1 + ti.queue = "default_queue" + session.merge(ti) + session.commit() + + # in each loop, we should get the right mapped TI back + for map_index in (1, 2): + # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" + f"/print_the_context/{map_index}/tries/{try_number}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + + assert response.json == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": map_index, + "max_tries": 0 if try_number == 1 else 1, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "failed" if try_number == 1 else None, + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": try_number, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + + def test_should_respond_200_with_task_state_in_deferred(self, session): + now = pendulum.now("UTC") + ti = self.create_task_instances( + session, + task_instances=[{"state": State.DEFERRED}], + update_extras=True, + )[0] + ti.trigger = Trigger("none", {}) + ti.trigger.created_date = now + ti.triggerer_job = Job() + TriggererJobRunner(job=ti.triggerer_job) + ti.triggerer_job.state = "running" + ti.try_number = 1 + session.merge(ti) + session.flush() + # Record the TaskInstanceHistory + TaskInstanceHistory.record_ti(ti, session=session) + session.flush() + # Change TaskInstance try_number to 2, ensuring api checks TIHistory + ti = session.query(TaskInstance).one_or_none() + ti.try_number = 2 + session.merge(ti) + # Set duration and end_date in TaskInstanceHistory for easy testing + tih = session.query(TaskInstanceHistory).all()[0] + tih.duration = 10000 + tih.end_date = self.default_time + dt.timedelta(days=2) + session.merge(tih) + session.flush() + # Get the task instance details from TIHistory: + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + data = response.json + + assert response.status_code == 200 + assert data == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "failed", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + + def test_should_respond_200_with_task_state_in_removed(self, session): + self.create_task_instances( + session, task_instances=[{"state": State.REMOVED}], update_extras=True, with_ti_history=True + ) + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + + assert response.json == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "removed", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + + def test_should_raises_401_unauthenticated(self): + response = self.client.get( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/0", + ) + assert_401(response) + + def test_should_raise_403_forbidden(self): + response = self.client.get( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/0", + environ_overrides={"REMOTE_USER": "test_no_permissions"}, + ) + assert response.status_code == 403 + + def test_raises_404_for_nonexistent_task_instance(self): + response = self.client.get( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/nonexistent_task/tries/0", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 404 + assert response.json["title"] == "Task instance not found"