From 6962b21cd7db0cc49a1190f5a85e13d6a6429599 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 26 Jun 2024 16:23:40 +0100 Subject: [PATCH 1/5] AIP-64: Add REST API endpoints for TI try level details This PR adds REST API endpoints for task instance/mapped task instance try level details. --- .../endpoints/task_instance_endpoint.py | 75 +++++++++++++++++ airflow/api_connexion/openapi/v1.yaml | 59 +++++++++++++ airflow/models/taskinstancehistory.py | 26 ++++++ airflow/www/static/js/types/api-generated.ts | 84 +++++++++++++++++++ 4 files changed, 244 insertions(+) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index de127492998e9..57f2db29b4e2c 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -754,3 +754,78 @@ def get_mapped_task_instance_dependencies( return get_task_instance_dependencies( dag_id=dag_id, dag_run_id=dag_run_id, task_id=task_id, map_index=map_index ) + + +@provide_session +def get_task_instance_try_details( + *, + dag_id: str, + dag_run_id: str, + task_id: str, + task_try_number: int, + map_index: int = -1, + session: Session = NEW_SESSION, +) -> APIResponse: + """Get details of a task instance try.""" + from airflow.models.taskinstancehistory import TaskInstanceHistory + + def _query(orm_object): + query = select(orm_object).where( + orm_object.dag_id == dag_id, + orm_object.run_id == dag_run_id, + orm_object.task_id == task_id, + orm_object.try_number == task_try_number, + ) + query = query.where(orm_object.map_index == map_index) + + query = ( + query.join(orm_object.dag_run) + .outerjoin( + SlaMiss, + and_( + SlaMiss.dag_id == orm_object.dag_id, + SlaMiss.execution_date == DR.execution_date, + SlaMiss.task_id == orm_object.task_id, + ), + ) + .add_columns(SlaMiss) + .options(joinedload(orm_object.rendered_task_instance_fields)) + ) + try: + result = session.execute(query).one_or_none() + except MultipleResultsFound: + raise NotFound( + "Task instance not found", + detail="Task instance is mapped, add the map_index value to the URL", + ) + return result + + result = _query(TI) + + if not result: + result = _query(TaskInstanceHistory) + if result is None: + error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}, map_index={map_index}, try_number={task_try_number}." + raise NotFound(error_message) + return task_instance_schema.dump(result) + + +@provide_session +def get_mapped_task_instance_try_details( + *, + dag_id: str, + dag_run_id: str, + task_id: str, + task_try_number: int, + map_index: int, + session: Session = NEW_SESSION, +) -> APIResponse: + """Get details of a mapped task instance try.""" + return get_task_instance_try_details( + dag_id=dag_id, + dag_run_id=dag_run_id, + task_id=task_id, + task_try_number=task_try_number, + map_index=map_index, + session=session, + ) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index cefb9faa7c0aa..31c5ee7df26c3 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1722,6 +1722,65 @@ paths: "404": $ref: "#/components/responses/NotFound" + /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}: + get: + summary: get taskinstance try + description: | + Get details of a task instance try. + + *New in version 2.10.0* + x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint + operationId: get_task_instance_try_details + tags: [TaskInstance] + parameters: + - $ref: "#/components/parameters/DAGID" + - $ref: "#/components/parameters/DAGRunID" + - $ref: "#/components/parameters/TaskID" + - $ref: "#/components/parameters/TaskTryNumber" + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/TaskInstance" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + + /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}/{map_index}: + get: + summary: get mapped taskinstance try + description: | + Get details of a mapped task instance try. + + *New in version 2.10.0* + x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint + operationId: get_mapped_task_instance_try_details + tags: [TaskInstance] + parameters: + - $ref: "#/components/parameters/DAGID" + - $ref: "#/components/parameters/DAGRunID" + - $ref: "#/components/parameters/TaskID" + - $ref: "#/components/parameters/MapIndex" + - $ref: "#/components/parameters/TaskTryNumber" + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/TaskInstance" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + /variables: get: summary: List variables diff --git a/airflow/models/taskinstancehistory.py b/airflow/models/taskinstancehistory.py index ccdca700af6e9..5b77fd4b02689 100644 --- a/airflow/models/taskinstancehistory.py +++ b/airflow/models/taskinstancehistory.py @@ -33,6 +33,7 @@ text, ) from sqlalchemy.ext.mutable import MutableDict +from sqlalchemy.orm import relationship from airflow.models.base import Base, StringID from airflow.utils import timezone @@ -93,6 +94,31 @@ class TaskInstanceHistory(Base): task_display_name = Column("task_display_name", String(2000), nullable=True) + dag_run = relationship( + "DagRun", + primaryjoin="and_(DagRun.run_id==TaskInstanceHistory.run_id,DagRun.dag_id==TaskInstanceHistory.dag_id)", + foreign_keys=[run_id, dag_id], + viewonly=True, + lazy="joined", + ) + + rendered_task_instance_fields = relationship( + "RenderedTaskInstanceFields", + primaryjoin="and_(RenderedTaskInstanceFields.task_id==TaskInstanceHistory.task_id, RenderedTaskInstanceFields.run_id==TaskInstanceHistory.run_id," + "RenderedTaskInstanceFields.dag_id==TaskInstanceHistory.dag_id, RenderedTaskInstanceFields.map_index==TaskInstanceHistory.map_index)", + uselist=False, + foreign_keys=[dag_id, task_id, run_id, map_index], + viewonly=True, + lazy="noload", + ) + trigger = relationship( + "Trigger", + uselist=False, + primaryjoin="Trigger.id==TaskInstanceHistory.trigger_id", + viewonly=True, + foreign_keys=trigger_id, + ) + def __init__( self, ti: TaskInstance | TaskInstancePydantic, diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index ad5368299d626..7500ec899d525 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -547,6 +547,22 @@ export interface paths { */ post: operations["get_task_instances_batch"]; }; + "/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}": { + /** + * Get details of a task instance try. + * + * *New in version 2.10.0* + */ + get: operations["get_task_instance_try_details"]; + }; + "/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}/{map_index}": { + /** + * Get details of a mapped task instance try. + * + * *New in version 2.10.0* + */ + get: operations["get_mapped_task_instance_try_details"]; + }; "/variables": { /** The collection does not contain data. To get data, you must get a single entity. */ get: operations["get_variables"]; @@ -4264,6 +4280,68 @@ export interface operations { }; }; }; + /** + * Get details of a task instance try. + * + * *New in version 2.10.0* + */ + get_task_instance_try_details: { + parameters: { + path: { + /** The DAG ID. */ + dag_id: components["parameters"]["DAGID"]; + /** The DAG run ID. */ + dag_run_id: components["parameters"]["DAGRunID"]; + /** The task ID. */ + task_id: components["parameters"]["TaskID"]; + /** The task try number. */ + task_try_number: components["parameters"]["TaskTryNumber"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["TaskInstance"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + }; + }; + /** + * Get details of a mapped task instance try. + * + * *New in version 2.10.0* + */ + get_mapped_task_instance_try_details: { + parameters: { + path: { + /** The DAG ID. */ + dag_id: components["parameters"]["DAGID"]; + /** The DAG run ID. */ + dag_run_id: components["parameters"]["DAGRunID"]; + /** The task ID. */ + task_id: components["parameters"]["TaskID"]; + /** The map index. */ + map_index: components["parameters"]["MapIndex"]; + /** The task try number. */ + task_try_number: components["parameters"]["TaskTryNumber"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["TaskInstance"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + }; + }; /** The collection does not contain data. To get data, you must get a single entity. */ get_variables: { parameters: { @@ -5640,6 +5718,12 @@ export type GetMappedTaskInstancesVariables = CamelCasedPropertiesDeep< export type GetTaskInstancesBatchVariables = CamelCasedPropertiesDeep< operations["get_task_instances_batch"]["requestBody"]["content"]["application/json"] >; +export type GetTaskInstanceTryDetailsVariables = CamelCasedPropertiesDeep< + operations["get_task_instance_try_details"]["parameters"]["path"] +>; +export type GetMappedTaskInstanceTryDetailsVariables = CamelCasedPropertiesDeep< + operations["get_mapped_task_instance_try_details"]["parameters"]["path"] +>; export type GetVariablesVariables = CamelCasedPropertiesDeep< operations["get_variables"]["parameters"]["query"] >; From bc33145a0aff93eca01109d9e542dca8e0220263 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 27 Jun 2024 20:08:49 +0100 Subject: [PATCH 2/5] Add tests --- .../endpoints/task_instance_endpoint.py | 3 +- airflow/models/taskinstancehistory.py | 18 +- .../endpoints/test_task_instance_endpoint.py | 340 ++++++++++++++++++ 3 files changed, 359 insertions(+), 2 deletions(-) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 57f2db29b4e2c..be5096a60ba75 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -756,6 +756,7 @@ def get_mapped_task_instance_dependencies( ) +@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE) @provide_session def get_task_instance_try_details( *, @@ -806,7 +807,7 @@ def _query(orm_object): result = _query(TaskInstanceHistory) if result is None: error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}, map_index={map_index}, try_number={task_try_number}." - raise NotFound(error_message) + raise NotFound("Task instance not found", detail=error_message) return task_instance_schema.dump(result) diff --git a/airflow/models/taskinstancehistory.py b/airflow/models/taskinstancehistory.py index 5b77fd4b02689..0e80a24cd16e2 100644 --- a/airflow/models/taskinstancehistory.py +++ b/airflow/models/taskinstancehistory.py @@ -33,6 +33,7 @@ text, ) from sqlalchemy.ext.mutable import MutableDict +from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import relationship from airflow.models.base import Base, StringID @@ -102,6 +103,8 @@ class TaskInstanceHistory(Base): lazy="joined", ) + execution_date = association_proxy("dag_run", "execution_date") + rendered_task_instance_fields = relationship( "RenderedTaskInstanceFields", primaryjoin="and_(RenderedTaskInstanceFields.task_id==TaskInstanceHistory.task_id, RenderedTaskInstanceFields.run_id==TaskInstanceHistory.run_id," @@ -109,7 +112,7 @@ class TaskInstanceHistory(Base): uselist=False, foreign_keys=[dag_id, task_id, run_id, map_index], viewonly=True, - lazy="noload", + lazy="joined", ) trigger = relationship( "Trigger", @@ -117,7 +120,20 @@ class TaskInstanceHistory(Base): primaryjoin="Trigger.id==TaskInstanceHistory.trigger_id", viewonly=True, foreign_keys=trigger_id, + lazy="joined", + ) + + triggerer_job = association_proxy("trigger", "triggerer_job") + + task_instance_note = relationship( + "TaskInstanceNote", + primaryjoin="and_(TaskInstanceNote.dag_id==TaskInstanceHistory.dag_id, TaskInstanceNote.task_id==TaskInstanceHistory.task_id," + "TaskInstanceNote.run_id==TaskInstanceHistory.run_id, TaskInstanceNote.map_index==TaskInstanceHistory.map_index)", + uselist=False, + foreign_keys=[dag_id, task_id, run_id, map_index], + viewonly=True, ) + note = association_proxy("task_instance_note", "content") def __init__( self, diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 62ae45c1bd077..09f43a9b72b49 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -30,6 +30,7 @@ from airflow.jobs.triggerer_job_runner import TriggererJobRunner from airflow.models import DagRun, SlaMiss, TaskInstance, Trigger from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF +from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.security import permissions from airflow.utils.platform import getuser from airflow.utils.session import provide_session @@ -150,6 +151,7 @@ def create_task_instances( update_extras: bool = True, task_instances=None, dag_run_state=State.RUNNING, + with_ti_history=False, ): """Method to create task instances using kwargs and default arguments""" @@ -196,6 +198,16 @@ def create_task_instances( tis.append(ti) session.commit() + if with_ti_history: + for ti in tis: + ti.try_number = 1 + session.merge(ti) + session.commit() + dag.clear() + for ti in tis: + ti.try_number = 2 + session.merge(ti) + session.commit() return tis @@ -2671,3 +2683,331 @@ def test_should_respond_404(self, session): environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 + + +class TestGetTaskInstanceTry(TestTaskInstanceEndpoint): + def setup_method(self): + clear_db_runs() + + def teardown_method(self): + clear_db_runs() + + @pytest.mark.parametrize("username", ["test", "test_dag_read_only", "test_task_read_only"]) + @provide_session + def test_should_respond_200(self, username, session): + self.create_task_instances(session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True) + + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1", + environ_overrides={"REMOTE_USER": username}, + ) + assert response.status_code == 200 + assert response.json == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "execution_date": "2020-01-01T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "sla_miss": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "success", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + "rendered_fields": {}, + "rendered_map_index": None, + "trigger": None, + "triggerer_job": None, + } + + def test_should_respond_200_with_task_state_in_deferred(self, session): + now = pendulum.now("UTC") + ti = self.create_task_instances( + session, + task_instances=[{"state": State.DEFERRED}], + update_extras=True, + )[0] + ti.trigger = Trigger("none", {}) + ti.trigger.created_date = now + ti.triggerer_job = Job() + TriggererJobRunner(job=ti.triggerer_job) + ti.triggerer_job.state = "running" + ti.try_number = 1 + session.merge(ti) + session.flush() + # Record the TaskInstanceHistory + TaskInstanceHistory.record_ti(ti, session=session) + session.flush() + # Change TaskInstance try_number to 2, ensuring api checks TIHistory + ti = session.query(TaskInstance).one_or_none() + ti.try_number = 2 + session.merge(ti) + # Set duration and end_date in TaskInstanceHistory for easy testing + tih = session.query(TaskInstanceHistory).all()[0] + tih.duration = 10000 + tih.end_date = self.default_time + dt.timedelta(days=2) + session.merge(tih) + session.flush() + # Get the task instance details from TIHistory: + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + data = response.json + + # this logic in effect replicates mock.ANY for these values + values_to_ignore = { + "trigger": ["created_date", "id", "triggerer_id"], + "triggerer_job": ["executor_class", "hostname", "id", "latest_heartbeat", "start_date"], + } + for k, v in values_to_ignore.items(): + for elem in v: + del data[k][elem] + + assert response.status_code == 200 + assert data == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "execution_date": "2020-01-01T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "sla_miss": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "failed", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + "rendered_fields": {}, + "rendered_map_index": None, + "trigger": { + "classpath": "none", + "kwargs": "{}", + }, + "triggerer_job": { + "dag_id": None, + "end_date": None, + "job_type": "TriggererJob", + "state": "running", + "unixname": getuser(), + }, + } + + def test_should_respond_200_with_task_state_in_removed(self, session): + self.create_task_instances( + session, task_instances=[{"state": State.REMOVED}], update_extras=True, with_ti_history=True + ) + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "execution_date": "2020-01-01T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "sla_miss": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "removed", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + "rendered_fields": {}, + "rendered_map_index": None, + "trigger": None, + "triggerer_job": None, + } + + def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): + tis = self.create_task_instances( + session, task_instances=[{"state": State.FAILED}], with_ti_history=True + ) + session.query() + sla_miss = SlaMiss( + task_id="print_the_context", + dag_id="example_python_operator", + execution_date=self.default_time, + timestamp=self.default_time, + ) + session.add(sla_miss) + rendered_fields = RTIF(tis[0], render_templates=False) + session.add(rendered_fields) + session.commit() + # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + + assert response.json == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "execution_date": "2020-01-01T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "sla_miss": { + "dag_id": "example_python_operator", + "description": None, + "email_sent": False, + "execution_date": "2020-01-01T00:00:00+00:00", + "notification_sent": False, + "task_id": "print_the_context", + "timestamp": "2020-01-01T00:00:00+00:00", + }, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "failed", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + "rendered_fields": {"op_args": [], "op_kwargs": {}, "templates_dict": None}, + "rendered_map_index": None, + "trigger": None, + "triggerer_job": None, + } + + def test_should_respond_200_mapped_task_instance_with_rtif(self, session): + """Verify we don't duplicate rows through join to RTIF""" + tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}]) + old_ti = tis[0] + for idx in (1, 2): + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + ti.try_number = 1 + for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "note"]: + setattr(ti, attr, getattr(old_ti, attr)) + session.add(ti) + session.commit() + tis = session.query(TaskInstance).all() + + # Record the task instance history + from airflow.models.taskinstance import clear_task_instances + + clear_task_instances(tis, session) + # Simulate the try_number increasing to new values in TI + for ti in tis: + if ti.map_index > 0: + ti.try_number += 1 + session.merge(ti) + session.commit() + + # in each loop, we should get the right mapped TI back + for map_index in (1, 2): + # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" + f"/print_the_context/tries/1/{map_index}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + + assert response.json == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "execution_date": "2020-01-01T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": map_index, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "sla_miss": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "failed", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + "rendered_fields": {"op_args": [], "op_kwargs": {}, "templates_dict": None}, + "rendered_map_index": None, + "trigger": None, + "triggerer_job": None, + } + + def test_should_raises_401_unauthenticated(self): + response = self.client.get( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/0", + ) + assert_401(response) + + def test_should_raise_403_forbidden(self): + response = self.client.get( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/0", + environ_overrides={"REMOTE_USER": "test_no_permissions"}, + ) + assert response.status_code == 403 + + def test_raises_404_for_nonexistent_task_instance(self): + response = self.client.get( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/nonexistent_task/tries/0", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 404 + assert response.json["title"] == "Task instance not found" From 81a2b943c359efea520fe0e7b99b251d4a1b35ef Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 9 Jul 2024 10:35:07 +0100 Subject: [PATCH 3/5] Fix query --- airflow/api_connexion/endpoints/task_instance_endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index be5096a60ba75..5617935b4296e 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -776,8 +776,8 @@ def _query(orm_object): orm_object.run_id == dag_run_id, orm_object.task_id == task_id, orm_object.try_number == task_try_number, + orm_object.map_index == map_index, ) - query = query.where(orm_object.map_index == map_index) query = ( query.join(orm_object.dag_run) From 37d1e6d568878578c1d1924563bda86d449a7c34 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 11 Jul 2024 10:14:34 +0100 Subject: [PATCH 4/5] add more tests --- .../endpoints/task_instance_endpoint.py | 5 +- airflow/api_connexion/openapi/v1.yaml | 2 +- airflow/www/static/js/types/api-generated.ts | 2 +- .../endpoints/test_task_instance_endpoint.py | 114 +++++++++++++++++- 4 files changed, 116 insertions(+), 7 deletions(-) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 5617935b4296e..86e747058ee5f 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -801,10 +801,7 @@ def _query(orm_object): ) return result - result = _query(TI) - - if not result: - result = _query(TaskInstanceHistory) + result = _query(TI) or _query(TaskInstanceHistory) if result is None: error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}, map_index={map_index}, try_number={task_try_number}." raise NotFound("Task instance not found", detail=error_message) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 31c5ee7df26c3..0f99d145e1b72 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1751,7 +1751,7 @@ paths: "404": $ref: "#/components/responses/NotFound" - /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}/{map_index}: + /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}: get: summary: get mapped taskinstance try description: | diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 7500ec899d525..427e62205b5dd 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -555,7 +555,7 @@ export interface paths { */ get: operations["get_task_instance_try_details"]; }; - "/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}/{map_index}": { + "/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}": { /** * Get details of a mapped task instance try. * diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 09f43a9b72b49..77c2901024fc4 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -206,6 +206,7 @@ def create_task_instances( dag.clear() for ti in tis: ti.try_number = 2 + ti.queue = "default_queue" session.merge(ti) session.commit() return tis @@ -2734,6 +2735,117 @@ def test_should_respond_200(self, username, session): "triggerer_job": None, } + @pytest.mark.parametrize("try_number", [1, 2]) + @provide_session + def test_should_respond_200_with_different_try_numbers(self, try_number, session): + self.create_task_instances(session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True) + + response = self.client.get( + f"/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/{try_number}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "execution_date": "2020-01-01T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0 if try_number == 1 else 1, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "sla_miss": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "success" if try_number == 1 else None, + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": try_number, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + "rendered_fields": {}, + "rendered_map_index": None, + "trigger": None, + "triggerer_job": None, + } + + @pytest.mark.parametrize("try_number", [1, 2]) + @provide_session + def test_should_respond_200_with_mapped_task_at_different_try_numbers(self, try_number, session): + tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}]) + old_ti = tis[0] + for idx in (1, 2): + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + ti.try_number = 1 + for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "note"]: + setattr(ti, attr, getattr(old_ti, attr)) + session.add(ti) + session.commit() + tis = session.query(TaskInstance).all() + + # Record the task instance history + from airflow.models.taskinstance import clear_task_instances + + clear_task_instances(tis, session) + # Simulate the try_number increasing to new values in TI + for ti in tis: + if ti.map_index > 0: + ti.try_number += 1 + ti.queue = "default_queue" + session.merge(ti) + session.commit() + + # in each loop, we should get the right mapped TI back + for map_index in (1, 2): + # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" + f"/print_the_context/{map_index}/tries/{try_number}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + + assert response.json == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00+00:00", + "execution_date": "2020-01-01T00:00:00+00:00", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": map_index, + "max_tries": 0 if try_number == 1 else 1, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "sla_miss": None, + "start_date": "2020-01-02T00:00:00+00:00", + "state": "failed" if try_number == 1 else None, + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": try_number, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + "rendered_fields": {"op_args": [], "op_kwargs": {}, "templates_dict": None}, + "rendered_map_index": None, + "trigger": None, + "triggerer_job": None, + } + def test_should_respond_200_with_task_state_in_deferred(self, session): now = pendulum.now("UTC") ti = self.create_task_instances( @@ -2954,7 +3066,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session): # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) response = self.client.get( "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" - f"/print_the_context/tries/1/{map_index}", + f"/print_the_context/{map_index}/tries/1", environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 200 From cf1a4379d9428c4cb90e2aa855fcc87ca6e7d05e Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 19 Jul 2024 20:06:34 +0100 Subject: [PATCH 5/5] Remove relationships --- .../endpoints/task_instance_endpoint.py | 17 +- .../schemas/task_instance_schema.py | 34 ++++ airflow/models/taskinstancehistory.py | 42 ---- .../endpoints/test_task_instance_endpoint.py | 183 +----------------- 4 files changed, 37 insertions(+), 239 deletions(-) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 86e747058ee5f..8094c1863e290 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -38,6 +38,7 @@ task_dependencies_collection_schema, task_instance_batch_form, task_instance_collection_schema, + task_instance_history_schema, task_instance_reference_collection_schema, task_instance_reference_schema, task_instance_schema, @@ -778,20 +779,6 @@ def _query(orm_object): orm_object.try_number == task_try_number, orm_object.map_index == map_index, ) - - query = ( - query.join(orm_object.dag_run) - .outerjoin( - SlaMiss, - and_( - SlaMiss.dag_id == orm_object.dag_id, - SlaMiss.execution_date == DR.execution_date, - SlaMiss.task_id == orm_object.task_id, - ), - ) - .add_columns(SlaMiss) - .options(joinedload(orm_object.rendered_task_instance_fields)) - ) try: result = session.execute(query).one_or_none() except MultipleResultsFound: @@ -805,7 +792,7 @@ def _query(orm_object): if result is None: error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}, map_index={map_index}, try_number={task_try_number}." raise NotFound("Task instance not found", detail=error_message) - return task_instance_schema.dump(result) + return task_instance_history_schema.dump(result[0]) @provide_session diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 8ef2987c88d49..811f92e0a2a1f 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -29,6 +29,7 @@ from airflow.api_connexion.schemas.sla_miss_schema import SlaMissSchema from airflow.api_connexion.schemas.trigger_schema import TriggerSchema from airflow.models import TaskInstance +from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.utils.helpers import exactly_one from airflow.utils.state import TaskInstanceState @@ -86,6 +87,38 @@ def get_attribute(self, obj, attr, default): return get_value(obj[0], attr, default) +class TaskInstanceHistorySchema(SQLAlchemySchema): + """Task instance schema.""" + + class Meta: + """Meta.""" + + model = TaskInstanceHistory + + task_id = auto_field() + dag_id = auto_field() + run_id = auto_field(data_key="dag_run_id") + map_index = auto_field() + start_date = auto_field() + end_date = auto_field() + duration = auto_field() + state = TaskInstanceStateField() + try_number = auto_field() + max_tries = auto_field() + task_display_name = fields.String(attribute="task_display_name", dump_only=True) + hostname = auto_field() + unixname = auto_field() + pool = auto_field() + pool_slots = auto_field() + queue = auto_field() + priority_weight = auto_field() + operator = auto_field() + queued_dttm = auto_field(data_key="queued_when") + pid = auto_field() + executor = auto_field() + executor_config = auto_field() + + class TaskInstanceCollection(NamedTuple): """List of task instances with metadata.""" @@ -245,3 +278,4 @@ class TaskDependencyCollectionSchema(Schema): task_instance_reference_schema = TaskInstanceReferenceSchema() task_instance_reference_collection_schema = TaskInstanceReferenceCollectionSchema() set_task_instance_note_form_schema = SetTaskInstanceNoteFormSchema() +task_instance_history_schema = TaskInstanceHistorySchema() diff --git a/airflow/models/taskinstancehistory.py b/airflow/models/taskinstancehistory.py index 0e80a24cd16e2..ccdca700af6e9 100644 --- a/airflow/models/taskinstancehistory.py +++ b/airflow/models/taskinstancehistory.py @@ -33,8 +33,6 @@ text, ) from sqlalchemy.ext.mutable import MutableDict -from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.orm import relationship from airflow.models.base import Base, StringID from airflow.utils import timezone @@ -95,46 +93,6 @@ class TaskInstanceHistory(Base): task_display_name = Column("task_display_name", String(2000), nullable=True) - dag_run = relationship( - "DagRun", - primaryjoin="and_(DagRun.run_id==TaskInstanceHistory.run_id,DagRun.dag_id==TaskInstanceHistory.dag_id)", - foreign_keys=[run_id, dag_id], - viewonly=True, - lazy="joined", - ) - - execution_date = association_proxy("dag_run", "execution_date") - - rendered_task_instance_fields = relationship( - "RenderedTaskInstanceFields", - primaryjoin="and_(RenderedTaskInstanceFields.task_id==TaskInstanceHistory.task_id, RenderedTaskInstanceFields.run_id==TaskInstanceHistory.run_id," - "RenderedTaskInstanceFields.dag_id==TaskInstanceHistory.dag_id, RenderedTaskInstanceFields.map_index==TaskInstanceHistory.map_index)", - uselist=False, - foreign_keys=[dag_id, task_id, run_id, map_index], - viewonly=True, - lazy="joined", - ) - trigger = relationship( - "Trigger", - uselist=False, - primaryjoin="Trigger.id==TaskInstanceHistory.trigger_id", - viewonly=True, - foreign_keys=trigger_id, - lazy="joined", - ) - - triggerer_job = association_proxy("trigger", "triggerer_job") - - task_instance_note = relationship( - "TaskInstanceNote", - primaryjoin="and_(TaskInstanceNote.dag_id==TaskInstanceHistory.dag_id, TaskInstanceNote.task_id==TaskInstanceHistory.task_id," - "TaskInstanceNote.run_id==TaskInstanceHistory.run_id, TaskInstanceNote.map_index==TaskInstanceHistory.map_index)", - uselist=False, - foreign_keys=[dag_id, task_id, run_id, map_index], - viewonly=True, - ) - note = association_proxy("task_instance_note", "content") - def __init__( self, ti: TaskInstance | TaskInstancePydantic, diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 77c2901024fc4..e77f75c3038a9 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -2707,13 +2707,11 @@ def test_should_respond_200(self, username, session): "dag_id": "example_python_operator", "duration": 10000.0, "end_date": "2020-01-03T00:00:00+00:00", - "execution_date": "2020-01-01T00:00:00+00:00", "executor": None, "executor_config": "{}", "hostname": "", "map_index": -1, "max_tries": 0, - "note": "placeholder-note", "operator": "PythonOperator", "pid": 100, "pool": "default_pool", @@ -2721,7 +2719,6 @@ def test_should_respond_200(self, username, session): "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "success", "task_id": "print_the_context", @@ -2729,10 +2726,6 @@ def test_should_respond_200(self, username, session): "try_number": 1, "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", - "rendered_fields": {}, - "rendered_map_index": None, - "trigger": None, - "triggerer_job": None, } @pytest.mark.parametrize("try_number", [1, 2]) @@ -2749,13 +2742,11 @@ def test_should_respond_200_with_different_try_numbers(self, try_number, session "dag_id": "example_python_operator", "duration": 10000.0, "end_date": "2020-01-03T00:00:00+00:00", - "execution_date": "2020-01-01T00:00:00+00:00", "executor": None, "executor_config": "{}", "hostname": "", "map_index": -1, "max_tries": 0 if try_number == 1 else 1, - "note": "placeholder-note", "operator": "PythonOperator", "pid": 100, "pool": "default_pool", @@ -2763,7 +2754,6 @@ def test_should_respond_200_with_different_try_numbers(self, try_number, session "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "success" if try_number == 1 else None, "task_id": "print_the_context", @@ -2771,10 +2761,6 @@ def test_should_respond_200_with_different_try_numbers(self, try_number, session "try_number": try_number, "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", - "rendered_fields": {}, - "rendered_map_index": None, - "trigger": None, - "triggerer_job": None, } @pytest.mark.parametrize("try_number", [1, 2]) @@ -2818,13 +2804,11 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers(self, try_ "dag_id": "example_python_operator", "duration": 10000.0, "end_date": "2020-01-03T00:00:00+00:00", - "execution_date": "2020-01-01T00:00:00+00:00", "executor": None, "executor_config": "{}", "hostname": "", "map_index": map_index, "max_tries": 0 if try_number == 1 else 1, - "note": "placeholder-note", "operator": "PythonOperator", "pid": 100, "pool": "default_pool", @@ -2832,7 +2816,6 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers(self, try_ "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "failed" if try_number == 1 else None, "task_id": "print_the_context", @@ -2840,10 +2823,6 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers(self, try_ "try_number": try_number, "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", - "rendered_fields": {"op_args": [], "op_kwargs": {}, "templates_dict": None}, - "rendered_map_index": None, - "trigger": None, - "triggerer_job": None, } def test_should_respond_200_with_task_state_in_deferred(self, session): @@ -2882,27 +2861,16 @@ def test_should_respond_200_with_task_state_in_deferred(self, session): assert response.status_code == 200 data = response.json - # this logic in effect replicates mock.ANY for these values - values_to_ignore = { - "trigger": ["created_date", "id", "triggerer_id"], - "triggerer_job": ["executor_class", "hostname", "id", "latest_heartbeat", "start_date"], - } - for k, v in values_to_ignore.items(): - for elem in v: - del data[k][elem] - assert response.status_code == 200 assert data == { "dag_id": "example_python_operator", "duration": 10000.0, "end_date": "2020-01-03T00:00:00+00:00", - "execution_date": "2020-01-01T00:00:00+00:00", "executor": None, "executor_config": "{}", "hostname": "", "map_index": -1, "max_tries": 0, - "note": "placeholder-note", "operator": "PythonOperator", "pid": 100, "pool": "default_pool", @@ -2910,7 +2878,6 @@ def test_should_respond_200_with_task_state_in_deferred(self, session): "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": None, "start_date": "2020-01-02T00:00:00+00:00", "state": "failed", "task_id": "print_the_context", @@ -2918,19 +2885,6 @@ def test_should_respond_200_with_task_state_in_deferred(self, session): "try_number": 1, "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", - "rendered_fields": {}, - "rendered_map_index": None, - "trigger": { - "classpath": "none", - "kwargs": "{}", - }, - "triggerer_job": { - "dag_id": None, - "end_date": None, - "job_type": "TriggererJob", - "state": "running", - "unixname": getuser(), - }, } def test_should_respond_200_with_task_state_in_removed(self, session): @@ -2942,71 +2896,16 @@ def test_should_respond_200_with_task_state_in_removed(self, session): environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 200 - assert response.json == { - "dag_id": "example_python_operator", - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00+00:00", - "execution_date": "2020-01-01T00:00:00+00:00", - "executor": None, - "executor_config": "{}", - "hostname": "", - "map_index": -1, - "max_tries": 0, - "note": "placeholder-note", - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "sla_miss": None, - "start_date": "2020-01-02T00:00:00+00:00", - "state": "removed", - "task_id": "print_the_context", - "task_display_name": "print_the_context", - "try_number": 1, - "unixname": getuser(), - "dag_run_id": "TEST_DAG_RUN_ID", - "rendered_fields": {}, - "rendered_map_index": None, - "trigger": None, - "triggerer_job": None, - } - - def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): - tis = self.create_task_instances( - session, task_instances=[{"state": State.FAILED}], with_ti_history=True - ) - session.query() - sla_miss = SlaMiss( - task_id="print_the_context", - dag_id="example_python_operator", - execution_date=self.default_time, - timestamp=self.default_time, - ) - session.add(sla_miss) - rendered_fields = RTIF(tis[0], render_templates=False) - session.add(rendered_fields) - session.commit() - # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) - response = self.client.get( - "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1", - environ_overrides={"REMOTE_USER": "test"}, - ) - assert response.status_code == 200 assert response.json == { "dag_id": "example_python_operator", "duration": 10000.0, "end_date": "2020-01-03T00:00:00+00:00", - "execution_date": "2020-01-01T00:00:00+00:00", "executor": None, "executor_config": "{}", "hostname": "", "map_index": -1, "max_tries": 0, - "note": "placeholder-note", "operator": "PythonOperator", "pid": 100, "pool": "default_pool", @@ -3014,95 +2913,15 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): "priority_weight": 9, "queue": "default_queue", "queued_when": None, - "sla_miss": { - "dag_id": "example_python_operator", - "description": None, - "email_sent": False, - "execution_date": "2020-01-01T00:00:00+00:00", - "notification_sent": False, - "task_id": "print_the_context", - "timestamp": "2020-01-01T00:00:00+00:00", - }, "start_date": "2020-01-02T00:00:00+00:00", - "state": "failed", + "state": "removed", "task_id": "print_the_context", "task_display_name": "print_the_context", "try_number": 1, "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", - "rendered_fields": {"op_args": [], "op_kwargs": {}, "templates_dict": None}, - "rendered_map_index": None, - "trigger": None, - "triggerer_job": None, } - def test_should_respond_200_mapped_task_instance_with_rtif(self, session): - """Verify we don't duplicate rows through join to RTIF""" - tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}]) - old_ti = tis[0] - for idx in (1, 2): - ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) - ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) - ti.try_number = 1 - for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "note"]: - setattr(ti, attr, getattr(old_ti, attr)) - session.add(ti) - session.commit() - tis = session.query(TaskInstance).all() - - # Record the task instance history - from airflow.models.taskinstance import clear_task_instances - - clear_task_instances(tis, session) - # Simulate the try_number increasing to new values in TI - for ti in tis: - if ti.map_index > 0: - ti.try_number += 1 - session.merge(ti) - session.commit() - - # in each loop, we should get the right mapped TI back - for map_index in (1, 2): - # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) - response = self.client.get( - "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" - f"/print_the_context/{map_index}/tries/1", - environ_overrides={"REMOTE_USER": "test"}, - ) - assert response.status_code == 200 - - assert response.json == { - "dag_id": "example_python_operator", - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00+00:00", - "execution_date": "2020-01-01T00:00:00+00:00", - "executor": None, - "executor_config": "{}", - "hostname": "", - "map_index": map_index, - "max_tries": 0, - "note": "placeholder-note", - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "sla_miss": None, - "start_date": "2020-01-02T00:00:00+00:00", - "state": "failed", - "task_id": "print_the_context", - "task_display_name": "print_the_context", - "try_number": 1, - "unixname": getuser(), - "dag_run_id": "TEST_DAG_RUN_ID", - "rendered_fields": {"op_args": [], "op_kwargs": {}, "templates_dict": None}, - "rendered_map_index": None, - "trigger": None, - "triggerer_job": None, - } - def test_should_raises_401_unauthenticated(self): response = self.client.get( "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/0",