From e384e9d416098f231dcce0dac43a67710e8bd27d Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Mon, 25 Mar 2024 12:24:12 +0530 Subject: [PATCH 1/8] Add task failed dependencies to details page. --- airflow/api_connexion/openapi/v1.yaml | 16 ++++++++ .../schemas/task_instance_schema.py | 29 ++++++++++++++ .../js/dag/details/taskInstance/Details.tsx | 38 ++++++++++++++++++- airflow/www/static/js/types/api-generated.ts | 13 +++++++ 4 files changed, 95 insertions(+), 1 deletion(-) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index eddfb8ca2f5e1..0bc03cfd7bb52 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -3557,6 +3557,14 @@ components: type: integer nullable: true + TaskFailedDependency: + type: object + properties: + name: + type: string + reason: + type: string + Job: type: object nullable: true @@ -3683,6 +3691,14 @@ components: $ref: "#/components/schemas/Trigger" triggerer_job: $ref: "#/components/schemas/Job" + task_failed_deps: + description: | + Array of failed dependencies blocking task from getting scheduled. + + *New in version 2.9.0* + type: array + items: + $ref: "#/components/schemas/TaskFailedDependency" note: type: string description: | diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index f4ea4bdddf72b..0e0a66a93270a 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -28,7 +28,11 @@ from airflow.api_connexion.schemas.job_schema import JobSchema from airflow.api_connexion.schemas.sla_miss_schema import SlaMissSchema from airflow.api_connexion.schemas.trigger_schema import TriggerSchema +from airflow.exceptions import TaskNotFound from airflow.models import TaskInstance +from airflow.ti_deps.dep_context import DepContext +from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS +from airflow.utils.airflow_flask_app import get_airflow_app from airflow.utils.helpers import exactly_one from airflow.utils.state import TaskInstanceState @@ -72,6 +76,7 @@ class Meta: rendered_fields = JsonObjectField(dump_default={}) trigger = fields.Nested(TriggerSchema) triggerer_job = fields.Nested(JobSchema) + task_failed_deps = fields.Method("get_task_failed_deps", dump_default=[]) def get_attribute(self, obj, attr, default): if attr == "sla_miss": @@ -84,6 +89,30 @@ def get_attribute(self, obj, attr, default): return get_value(obj[0], "rendered_task_instance_fields.rendered_fields", default) return get_value(obj[0], attr, default) + @staticmethod + def get_task_failed_deps(obj): + # Get failed deps only for tasks in None or scheduled state + if obj and (obj[0].state in [None, TaskInstanceState.SCHEDULED]): + ti = obj[0] + dag = get_airflow_app().dag_bag.get_dag(ti.dag_id) + + if dag: + try: + ti.task = dag.get_task(ti.task_id) + except TaskNotFound: + return [] + + dep_context = DepContext(SCHEDULER_QUEUED_DEPS) + return sorted( + [ + {"name": dep.dep_name, "reason": dep.reason} + for dep in ti.get_failed_dep_statuses(dep_context=dep_context) + ], + key=lambda x: x["name"], + ) + else: + return [] + class TaskInstanceCollection(NamedTuple): """List of task instances with metadata.""" diff --git a/airflow/www/static/js/dag/details/taskInstance/Details.tsx b/airflow/www/static/js/dag/details/taskInstance/Details.tsx index 80e26d79ca29e..2efbb36eda61a 100644 --- a/airflow/www/static/js/dag/details/taskInstance/Details.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/Details.tsx @@ -18,7 +18,17 @@ */ import React from "react"; -import { Text, Flex, Table, Tbody, Tr, Td, Code, Box } from "@chakra-ui/react"; +import { + Text, + Flex, + Table, + Tbody, + Tr, + Th, + Td, + Code, + Box, +} from "@chakra-ui/react"; import { snakeCase } from "lodash"; import { getGroupAndMapSummary } from "src/utils"; @@ -95,10 +105,36 @@ const Details = ({ gridInstance, taskInstance, group }: Props) => { const isStateFinal = state && ["success", "failed", "upstream_failed", "skipped"].includes(state); + const isStateScheduled = !state || (state && ["scheduled"].includes(state)); const isOverall = (isMapped || isGroup) && "Overall "; return ( + {isStateScheduled && ( + + + Task Failed Dependencies + +
+
+ Dependencies Blocking Task From Getting Scheduled + + + + + + + {taskInstance?.taskFailedDeps && + taskInstance.taskFailedDeps.map((dep) => ( + + + + + ))} + +
DependencyReason
{dep.name}{dep.reason}
+
+ )} Task Instance Details diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 5b8f33d7ee318..5e5cb9d747f72 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1394,6 +1394,10 @@ export interface components { created_date?: string; triggerer_id?: number | null; } | null; + TaskFailedDependency: { + name?: string; + reason?: string; + }; Job: { id?: number; dag_id?: string | null; @@ -1462,6 +1466,12 @@ export interface components { rendered_fields?: { [key: string]: unknown }; trigger?: components["schemas"]["Trigger"]; triggerer_job?: components["schemas"]["Job"]; + /** + * @description Array of failed dependencies blocking task from getting scheduled. + * + * *New in version 2.9.0* + */ + task_failed_deps?: components["schemas"]["TaskFailedDependency"][]; /** * @description Contains manually entered notes by the user about the TaskInstance. * @@ -5148,6 +5158,9 @@ export type SLAMiss = CamelCasedPropertiesDeep< export type Trigger = CamelCasedPropertiesDeep< components["schemas"]["Trigger"] >; +export type TaskFailedDependency = CamelCasedPropertiesDeep< + components["schemas"]["TaskFailedDependency"] +>; export type Job = CamelCasedPropertiesDeep; export type TaskInstance = CamelCasedPropertiesDeep< components["schemas"]["TaskInstance"] From 8c9eb2725a14d93dce6ae8bd37b7d521f1561f15 Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Mon, 25 Mar 2024 13:09:09 +0530 Subject: [PATCH 2/8] Fix tests. --- .../api_connexion/endpoints/test_task_instance_endpoint.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 78bb91113de8c..6d8a54d18d722 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -251,6 +251,7 @@ def test_should_respond_200(self, username, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, + "task_failed_deps": [], } def test_should_respond_200_with_task_state_in_deferred(self, session): @@ -318,6 +319,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, session): "state": "running", "unixname": getuser(), }, + "task_failed_deps": [], } def test_should_respond_200_with_task_state_in_removed(self, session): @@ -356,6 +358,7 @@ def test_should_respond_200_with_task_state_in_removed(self, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, + "task_failed_deps": [], } def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): @@ -414,6 +417,7 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, + "task_failed_deps": [], } def test_should_respond_200_mapped_task_instance_with_rtif(self, session): @@ -466,6 +470,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, + "task_failed_deps": [], } def test_should_raises_401_unauthenticated(self): @@ -2391,6 +2396,7 @@ def test_should_respond_200(self, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, + "task_failed_deps": [], } ti = session.scalars(select(TaskInstance).where(TaskInstance.task_id == "print_the_context")).one() assert ti.task_instance_note.user_id is not None @@ -2450,6 +2456,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, + "task_failed_deps": [], } def test_should_respond_200_when_note_is_empty(self, session): From 3ae89a400ff147657799e4c4f9dd90d9d28aa695 Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Mon, 25 Mar 2024 15:09:39 +0530 Subject: [PATCH 3/8] Explicitly pass attached session for reuse so that the session is available during serialization. --- airflow/api_connexion/schemas/task_instance_schema.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 0e0a66a93270a..e3c69ba58c589 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -21,6 +21,7 @@ from marshmallow import Schema, ValidationError, fields, validate, validates_schema from marshmallow.utils import get_value from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field +from sqlalchemy import inspect from airflow.api_connexion.parameters import validate_istimezone from airflow.api_connexion.schemas.common_schema import JsonObjectField @@ -94,7 +95,9 @@ def get_task_failed_deps(obj): # Get failed deps only for tasks in None or scheduled state if obj and (obj[0].state in [None, TaskInstanceState.SCHEDULED]): ti = obj[0] - dag = get_airflow_app().dag_bag.get_dag(ti.dag_id) + + session = inspect(ti).session + dag = get_airflow_app().dag_bag.get_dag(ti.dag_id, session=session) if dag: try: @@ -106,7 +109,7 @@ def get_task_failed_deps(obj): return sorted( [ {"name": dep.dep_name, "reason": dep.reason} - for dep in ti.get_failed_dep_statuses(dep_context=dep_context) + for dep in ti.get_failed_dep_statuses(dep_context=dep_context, session=session) ], key=lambda x: x["name"], ) From b5630502d411a65aeea90254fa56260949a0db75 Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Tue, 26 Mar 2024 20:37:52 +0530 Subject: [PATCH 4/8] Use separate endpoint for task failed dependencies. --- .../endpoints/task_instance_endpoint.py | 63 +++++++++ airflow/api_connexion/openapi/v1.yaml | 81 ++++++++++-- .../schemas/task_instance_schema.py | 46 ++----- airflow/www/static/js/api/index.ts | 2 + .../static/js/api/useTaskFailedDependency.ts | 65 +++++++++ .../js/dag/details/taskInstance/Details.tsx | 38 +----- .../taskInstance/TaskFailedDependency.tsx | 70 ++++++++++ .../js/dag/details/taskInstance/index.tsx | 13 ++ airflow/www/static/js/types/api-generated.ts | 117 ++++++++++++++++- airflow/www/templates/airflow/dag.html | 3 + .../endpoints/test_task_instance_endpoint.py | 123 +++++++++++++++++- 11 files changed, 531 insertions(+), 90 deletions(-) create mode 100644 airflow/www/static/js/api/useTaskFailedDependency.ts create mode 100644 airflow/www/static/js/dag/details/taskInstance/TaskFailedDependency.tsx diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index c3d5de9963735..f98f37f38b35d 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -35,6 +35,7 @@ set_single_task_instance_state_form, set_task_instance_note_form_schema, set_task_instance_state_form, + task_dependencies_collection_schema, task_instance_batch_form, task_instance_collection_schema, task_instance_reference_collection_schema, @@ -685,3 +686,65 @@ def set_task_instance_note( ti.task_instance_note.user_id = current_user_id session.commit() return task_instance_schema.dump((ti, sla_miss)) + + +@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE) +@provide_session +def get_task_instance_dependencies( + *, dag_id: str, dag_run_id: str, task_id: str, map_index: int = -1, session: Session = NEW_SESSION +) -> APIResponse: + """Get dependencies blocking task from getting scheduled.""" + from airflow.exceptions import TaskNotFound + from airflow.ti_deps.dep_context import DepContext + from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS + from airflow.utils.airflow_flask_app import get_airflow_app + + query = select(TI).where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id) + + if map_index == -1: + query = query.where(or_(TI.map_index == -1, TI.map_index is None)) + else: + query = query.where(TI.map_index == map_index) + + try: + result = session.execute(query).one_or_none() + except MultipleResultsFound: + raise NotFound( + "Task instance not found", detail="Task instance is mapped, add the map_index value to the URL" + ) + + if result is None: + error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}" + raise NotFound(error_message) + + ti = result[0] + deps = [] + + if ti.state in [None, TaskInstanceState.SCHEDULED]: + dag = get_airflow_app().dag_bag.get_dag(ti.dag_id) + + if dag: + try: + ti.task = dag.get_task(ti.task_id) + except TaskNotFound: + pass + else: + dep_context = DepContext(SCHEDULER_QUEUED_DEPS) + deps = sorted( + [ + {"name": dep.dep_name, "reason": dep.reason} + for dep in ti.get_failed_dep_statuses(dep_context=dep_context, session=session) + ], + key=lambda x: x["name"], + ) + + return task_dependencies_collection_schema.dump({"dependencies": deps}) + + +def get_mapped_task_instance_dependencies( + *, dag_id: str, dag_run_id: str, task_id: str, map_index: int +) -> APIResponse: + """Get dependencies blocking mapped task instance from getting scheduled.""" + return get_task_instance_dependencies( + dag_id=dag_id, dag_run_id=dag_run_id, task_id=task_id, map_index=map_index + ) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 0bc03cfd7bb52..5e62f8ae395d7 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -679,6 +679,71 @@ paths: "404": $ref: "#/components/responses/NotFound" + /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/dependencies: + parameters: + - $ref: "#/components/parameters/DAGID" + - $ref: "#/components/parameters/DAGRunID" + - $ref: "#/components/parameters/TaskID" + + get: + summary: Get task dependencies blocking task from getting scheduled. + description: | + Get task dependencies blocking task from getting scheduled. + + *New in version 2.9.0* + x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint + operationId: get_task_instance_dependencies + tags: [TaskInstance] + + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/TaskInstanceDependencyCollection" + "400": + $ref: "#/components/responses/BadRequest" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + + ? /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/dependencies + : parameters: + - $ref: "#/components/parameters/DAGID" + - $ref: "#/components/parameters/DAGRunID" + - $ref: "#/components/parameters/TaskID" + - $ref: "#/components/parameters/MapIndex" + + get: + summary: Get task dependencies blocking task from getting scheduled. + description: | + Get task dependencies blocking task from getting scheduled. + + *New in version 2.9.0* + x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint + operationId: get_mapped_task_instance_dependencies + tags: [TaskInstance] + + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/TaskInstanceDependencyCollection" + "400": + $ref: "#/components/responses/BadRequest" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + /dags/{dag_id}/updateTaskInstancesState: parameters: - $ref: "#/components/parameters/DAGID" @@ -3565,6 +3630,14 @@ components: reason: type: string + TaskInstanceDependencyCollection: + type: object + properties: + dependencies: + type: array + items: + $ref: "#/components/schemas/TaskFailedDependency" + Job: type: object nullable: true @@ -3691,14 +3764,6 @@ components: $ref: "#/components/schemas/Trigger" triggerer_job: $ref: "#/components/schemas/Job" - task_failed_deps: - description: | - Array of failed dependencies blocking task from getting scheduled. - - *New in version 2.9.0* - type: array - items: - $ref: "#/components/schemas/TaskFailedDependency" note: type: string description: | diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index e3c69ba58c589..04c2edc49c2bc 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -21,7 +21,6 @@ from marshmallow import Schema, ValidationError, fields, validate, validates_schema from marshmallow.utils import get_value from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field -from sqlalchemy import inspect from airflow.api_connexion.parameters import validate_istimezone from airflow.api_connexion.schemas.common_schema import JsonObjectField @@ -29,11 +28,7 @@ from airflow.api_connexion.schemas.job_schema import JobSchema from airflow.api_connexion.schemas.sla_miss_schema import SlaMissSchema from airflow.api_connexion.schemas.trigger_schema import TriggerSchema -from airflow.exceptions import TaskNotFound from airflow.models import TaskInstance -from airflow.ti_deps.dep_context import DepContext -from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS -from airflow.utils.airflow_flask_app import get_airflow_app from airflow.utils.helpers import exactly_one from airflow.utils.state import TaskInstanceState @@ -77,7 +72,6 @@ class Meta: rendered_fields = JsonObjectField(dump_default={}) trigger = fields.Nested(TriggerSchema) triggerer_job = fields.Nested(JobSchema) - task_failed_deps = fields.Method("get_task_failed_deps", dump_default=[]) def get_attribute(self, obj, attr, default): if attr == "sla_miss": @@ -90,32 +84,6 @@ def get_attribute(self, obj, attr, default): return get_value(obj[0], "rendered_task_instance_fields.rendered_fields", default) return get_value(obj[0], attr, default) - @staticmethod - def get_task_failed_deps(obj): - # Get failed deps only for tasks in None or scheduled state - if obj and (obj[0].state in [None, TaskInstanceState.SCHEDULED]): - ti = obj[0] - - session = inspect(ti).session - dag = get_airflow_app().dag_bag.get_dag(ti.dag_id, session=session) - - if dag: - try: - ti.task = dag.get_task(ti.task_id) - except TaskNotFound: - return [] - - dep_context = DepContext(SCHEDULER_QUEUED_DEPS) - return sorted( - [ - {"name": dep.dep_name, "reason": dep.reason} - for dep in ti.get_failed_dep_statuses(dep_context=dep_context, session=session) - ], - key=lambda x: x["name"], - ) - else: - return [] - class TaskInstanceCollection(NamedTuple): """List of task instances with metadata.""" @@ -252,8 +220,22 @@ class SetTaskInstanceNoteFormSchema(Schema): note = fields.String(allow_none=True, validate=validate.Length(max=1000)) +class TaskDependencySchema(Schema): + """Schema for task scheduling dependencies.""" + + name = fields.String() + reason = fields.String() + + +class TaskDependencyCollectionSchema(Schema): + """Task scheduling dependencies collection schema.""" + + dependencies = fields.List(fields.Nested(TaskDependencySchema)) + + task_instance_schema = TaskInstanceSchema() task_instance_collection_schema = TaskInstanceCollectionSchema() +task_dependencies_collection_schema = TaskDependencyCollectionSchema() task_instance_batch_form = TaskInstanceBatchFormSchema() clear_task_instance_form = ClearTaskInstanceFormSchema() set_task_instance_state_form = SetTaskInstanceStateFormSchema() diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index c9bc4ed4c8cc5..96532c3903781 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -41,6 +41,7 @@ import useSetDagRunNote from "./useSetDagRunNote"; import useSetTaskInstanceNote from "./useSetTaskInstanceNote"; import useUpstreamDatasetEvents from "./useUpstreamDatasetEvents"; import useTaskInstance from "./useTaskInstance"; +import useTaskFailedDependency from "./useTaskFailedDependency"; import useDag from "./useDag"; import useDagCode from "./useDagCode"; import useDagDetails from "./useDagDetails"; @@ -100,6 +101,7 @@ export { useHistoricalMetricsData, useTaskXcomEntry, useTaskXcomCollection, + useTaskFailedDependency, useEventLogs, useCalendarData, useCreateDatasetEvent, diff --git a/airflow/www/static/js/api/useTaskFailedDependency.ts b/airflow/www/static/js/api/useTaskFailedDependency.ts new file mode 100644 index 0000000000000..db7a99afea76b --- /dev/null +++ b/airflow/www/static/js/api/useTaskFailedDependency.ts @@ -0,0 +1,65 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import axios, { AxiosResponse } from "axios"; +import type { API } from "src/types"; +import { useQuery } from "react-query"; +import { getMetaValue } from "../utils"; + +const taskDependencyURI = getMetaValue("task_dependency_api"); +const mappedTaskDependencyURI = getMetaValue("mapped_task_dependency_api"); + +export default function useTaskFailedDependency({ + dagId, + taskId, + runId, + mapIndex, +}: { + dagId: string; + taskId: string; + runId: string; + mapIndex?: number | undefined; +}) { + return useQuery( + ["taskFailedDependencies", dagId, taskId, runId, mapIndex], + async () => { + const url = ( + mapIndex && mapIndex >= 0 ? mappedTaskDependencyURI : taskDependencyURI + ) + .replace("_DAG_RUN_ID_", runId) + .replace( + "_TASK_ID_/0/dependencies", + `_TASK_ID_/${mapIndex}/dependencies` + ) + .replace("_TASK_ID_", taskId); + + try { + const datum = await axios.get< + AxiosResponse, + API.TaskInstanceDependencyCollection + >(url); + return datum; + } catch (e) { + // eslint-disable-next-line no-console + console.error(e); + return { dependencies: [] }; + } + } + ); +} diff --git a/airflow/www/static/js/dag/details/taskInstance/Details.tsx b/airflow/www/static/js/dag/details/taskInstance/Details.tsx index 2efbb36eda61a..80e26d79ca29e 100644 --- a/airflow/www/static/js/dag/details/taskInstance/Details.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/Details.tsx @@ -18,17 +18,7 @@ */ import React from "react"; -import { - Text, - Flex, - Table, - Tbody, - Tr, - Th, - Td, - Code, - Box, -} from "@chakra-ui/react"; +import { Text, Flex, Table, Tbody, Tr, Td, Code, Box } from "@chakra-ui/react"; import { snakeCase } from "lodash"; import { getGroupAndMapSummary } from "src/utils"; @@ -105,36 +95,10 @@ const Details = ({ gridInstance, taskInstance, group }: Props) => { const isStateFinal = state && ["success", "failed", "upstream_failed", "skipped"].includes(state); - const isStateScheduled = !state || (state && ["scheduled"].includes(state)); const isOverall = (isMapped || isGroup) && "Overall "; return ( - {isStateScheduled && ( - - - Task Failed Dependencies - -
-
- Dependencies Blocking Task From Getting Scheduled - - - - - - - {taskInstance?.taskFailedDeps && - taskInstance.taskFailedDeps.map((dep) => ( - - - - - ))} - -
DependencyReason
{dep.name}{dep.reason}
-
- )} Task Instance Details diff --git a/airflow/www/static/js/dag/details/taskInstance/TaskFailedDependency.tsx b/airflow/www/static/js/dag/details/taskInstance/TaskFailedDependency.tsx new file mode 100644 index 0000000000000..24802ef4b6db0 --- /dev/null +++ b/airflow/www/static/js/dag/details/taskInstance/TaskFailedDependency.tsx @@ -0,0 +1,70 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from "react"; +import { Text, Table, Tbody, Tr, Th, Td, Box } from "@chakra-ui/react"; + +import { useTaskFailedDependency } from "src/api"; + +interface Props { + dagId: string; + runId: string; + taskId: string; + mapIndex?: number; +} + +const TaskFailedDependency = ({ dagId, runId, taskId, mapIndex }: Props) => { + const { data: dependencies, isLoading } = useTaskFailedDependency({ + dagId, + taskId, + runId, + mapIndex, + }); + + if (isLoading) { + return null; + } + + return ( + + + Task Failed Dependencies + +
+
+ Dependencies Blocking Task From Getting Scheduled + + + + + + + {dependencies?.dependencies?.map((dep) => ( + + + + + ))} + +
DependencyReason
{dep.name}{dep.reason}
+
+ ); +}; + +export default TaskFailedDependency; diff --git a/airflow/www/static/js/dag/details/taskInstance/index.tsx b/airflow/www/static/js/dag/details/taskInstance/index.tsx index 83681cb070065..7dc3727ae4792 100644 --- a/airflow/www/static/js/dag/details/taskInstance/index.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/index.tsx @@ -30,6 +30,7 @@ import ExtraLinks from "./ExtraLinks"; import Details from "./Details"; import DatasetUpdateEvents from "./DatasetUpdateEvents"; import TriggererInfo from "./TriggererInfo"; +import TaskFailedDependency from "./TaskFailedDependency"; const dagId = getMetaValue("dag_id")!; @@ -66,6 +67,10 @@ const TaskInstance = ({ taskId, runId, mapIndex }: Props) => { enabled: (!isGroup && !isMapped) || isMapIndexDefined, }); + const showTaskSchedulingDependencies = + !isGroupOrMappedTaskSummary && + (!taskInstance?.state || taskInstance?.state === "scheduled"); + const gridInstance = group?.instances.find((ti) => ti.runId === runId); return ( @@ -112,6 +117,14 @@ const TaskInstance = ({ taskId, runId, mapIndex }: Props) => { )} + {showTaskSchedulingDependencies && ( + + )}
; +export type TaskInstanceDependencyCollection = CamelCasedPropertiesDeep< + components["schemas"]["TaskInstanceDependencyCollection"] +>; export type Job = CamelCasedPropertiesDeep; export type TaskInstance = CamelCasedPropertiesDeep< components["schemas"]["TaskInstance"] @@ -5382,6 +5480,13 @@ export type SetMappedTaskInstanceNoteVariables = CamelCasedPropertiesDeep< operations["set_mapped_task_instance_note"]["parameters"]["path"] & operations["set_mapped_task_instance_note"]["requestBody"]["content"]["application/json"] >; +export type GetTaskInstanceDependenciesVariables = CamelCasedPropertiesDeep< + operations["get_task_instance_dependencies"]["parameters"]["path"] +>; +export type GetMappedTaskInstanceDependenciesVariables = + CamelCasedPropertiesDeep< + operations["get_mapped_task_instance_dependencies"]["parameters"]["path"] + >; export type PostSetTaskInstancesStateVariables = CamelCasedPropertiesDeep< operations["post_set_task_instances_state"]["parameters"]["path"] & operations["post_set_task_instances_state"]["requestBody"]["content"]["application/json"] diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index c91d4b4454a44..05e6cc521dbcb 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -84,6 +84,9 @@ + + + diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 6d8a54d18d722..c3305e95b5ad7 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -251,7 +251,6 @@ def test_should_respond_200(self, username, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, - "task_failed_deps": [], } def test_should_respond_200_with_task_state_in_deferred(self, session): @@ -319,7 +318,6 @@ def test_should_respond_200_with_task_state_in_deferred(self, session): "state": "running", "unixname": getuser(), }, - "task_failed_deps": [], } def test_should_respond_200_with_task_state_in_removed(self, session): @@ -358,7 +356,6 @@ def test_should_respond_200_with_task_state_in_removed(self, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, - "task_failed_deps": [], } def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): @@ -417,7 +414,6 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, - "task_failed_deps": [], } def test_should_respond_200_mapped_task_instance_with_rtif(self, session): @@ -470,7 +466,6 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, - "task_failed_deps": [], } def test_should_raises_401_unauthenticated(self): @@ -2396,7 +2391,6 @@ def test_should_respond_200(self, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, - "task_failed_deps": [], } ti = session.scalars(select(TaskInstance).where(TaskInstance.task_id == "print_the_context")).one() assert ti.task_instance_note.user_id is not None @@ -2456,7 +2450,6 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session): "rendered_map_index": None, "trigger": None, "triggerer_job": None, - "task_failed_deps": [], } def test_should_respond_200_when_note_is_empty(self, session): @@ -2518,3 +2511,119 @@ def test_should_respond_404(self, session): environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 + + +class TestGetTaskDependencies(TestTaskInstanceEndpoint): + def setup_method(self): + clear_db_runs() + + def teardown_method(self): + clear_db_runs() + + @provide_session + def test_should_respond_empty_non_scheduled(self, session): + self.create_task_instances(session) + response = self.client.get( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/" + "print_the_context/dependencies", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200, response.text + assert response.json == {"dependencies": []} + + @pytest.mark.parametrize( + "state, dependencies", + [ + ( + State.SCHEDULED, + { + "dependencies": [ + { + "name": "Execution Date", + "reason": "The execution date is 2020-01-01T00:00:00+00:00 but this is " + "before the task's start date 2021-01-01T00:00:00+00:00.", + }, + { + "name": "Execution Date", + "reason": "The execution date is 2020-01-01T00:00:00+00:00 but this is " + "before the task's DAG's start date 2021-01-01T00:00:00+00:00.", + }, + ], + }, + ), + ( + State.NONE, + { + "dependencies": [ + { + "name": "Execution Date", + "reason": "The execution date is 2020-01-01T00:00:00+00:00 but this is before the task's start date 2021-01-01T00:00:00+00:00.", + }, + { + "name": "Execution Date", + "reason": "The execution date is 2020-01-01T00:00:00+00:00 but this is before the task's DAG's start date 2021-01-01T00:00:00+00:00.", + }, + {"name": "Task Instance State", "reason": "Task is in the 'None' state."}, + ] + }, + ), + ], + ) + @provide_session + def test_should_respond_dependencies(self, session, state, dependencies): + self.create_task_instances(session, task_instances=[{"state": state}], update_extras=True) + + response = self.client.get( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/" + "print_the_context/dependencies", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200, response.text + assert response.json == dependencies + + def test_should_respond_dependencies_mapped(self, session): + tis = self.create_task_instances( + session, task_instances=[{"state": State.SCHEDULED}], update_extras=True + ) + old_ti = tis[0] + + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=0, state=old_ti.state) + session.add(ti) + session.commit() + + response = self.client.get( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/" + "print_the_context/0/dependencies", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200, response.text + + def test_should_raises_401_unauthenticated(self): + for map_index in ["", "/0"]: + url = ( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/" + f"print_the_context{map_index}/dependencies" + ) + response = self.client.get( + url, + ) + assert_401(response) + + def test_should_raise_403_forbidden(self): + for map_index in ["", "/0"]: + response = self.client.get( + "api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/" + f"print_the_context{map_index}/dependencies", + environ_overrides={"REMOTE_USER": "test_no_permissions"}, + ) + assert response.status_code == 403 + + def test_should_respond_404(self, session): + self.create_task_instances(session) + for map_index in ["", "/0"]: + response = self.client.get( + f"api/v1/dags/INVALID_DAG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context" + f"{map_index}/dependencies", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 404 From 3bff4e5ee3370f98c77faa2bde285c91cd0f1b6d Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Tue, 26 Mar 2024 21:07:08 +0530 Subject: [PATCH 5/8] Fix conditional. --- airflow/www/static/js/api/useTaskFailedDependency.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/www/static/js/api/useTaskFailedDependency.ts b/airflow/www/static/js/api/useTaskFailedDependency.ts index db7a99afea76b..b924fffccb476 100644 --- a/airflow/www/static/js/api/useTaskFailedDependency.ts +++ b/airflow/www/static/js/api/useTaskFailedDependency.ts @@ -39,8 +39,9 @@ export default function useTaskFailedDependency({ return useQuery( ["taskFailedDependencies", dagId, taskId, runId, mapIndex], async () => { + const definedMapIndex = mapIndex ?? -1; const url = ( - mapIndex && mapIndex >= 0 ? mappedTaskDependencyURI : taskDependencyURI + definedMapIndex >= 0 ? mappedTaskDependencyURI : taskDependencyURI ) .replace("_DAG_RUN_ID_", runId) .replace( From 7c81e8b926d6f86f328e7258f8c33ef28550392a Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Fri, 3 May 2024 11:32:30 +0530 Subject: [PATCH 6/8] Add spinner during loading and handle error scenario. --- .../static/js/api/useTaskFailedDependency.ts | 16 ++--- .../taskInstance/TaskFailedDependency.tsx | 67 +++++++++++++------ 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/airflow/www/static/js/api/useTaskFailedDependency.ts b/airflow/www/static/js/api/useTaskFailedDependency.ts index b924fffccb476..b1176e17eb455 100644 --- a/airflow/www/static/js/api/useTaskFailedDependency.ts +++ b/airflow/www/static/js/api/useTaskFailedDependency.ts @@ -50,17 +50,11 @@ export default function useTaskFailedDependency({ ) .replace("_TASK_ID_", taskId); - try { - const datum = await axios.get< - AxiosResponse, - API.TaskInstanceDependencyCollection - >(url); - return datum; - } catch (e) { - // eslint-disable-next-line no-console - console.error(e); - return { dependencies: [] }; - } + const datum = await axios.get< + AxiosResponse, + API.TaskInstanceDependencyCollection + >(url); + return datum; } ); } diff --git a/airflow/www/static/js/dag/details/taskInstance/TaskFailedDependency.tsx b/airflow/www/static/js/dag/details/taskInstance/TaskFailedDependency.tsx index 24802ef4b6db0..3db51f1bbce1a 100644 --- a/airflow/www/static/js/dag/details/taskInstance/TaskFailedDependency.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/TaskFailedDependency.tsx @@ -18,7 +18,18 @@ */ import React from "react"; -import { Text, Table, Tbody, Tr, Th, Td, Box } from "@chakra-ui/react"; +import { + Alert, + AlertIcon, + Box, + Spinner, + Text, + Table, + Tbody, + Tr, + Th, + Td, +} from "@chakra-ui/react"; import { useTaskFailedDependency } from "src/api"; @@ -30,39 +41,51 @@ interface Props { } const TaskFailedDependency = ({ dagId, runId, taskId, mapIndex }: Props) => { - const { data: dependencies, isLoading } = useTaskFailedDependency({ + const { + data: dependencies, + isLoading, + error, + } = useTaskFailedDependency({ dagId, taskId, runId, mapIndex, }); - if (isLoading) { - return null; - } - return ( Task Failed Dependencies
-
- Dependencies Blocking Task From Getting Scheduled - - - - - - - {dependencies?.dependencies?.map((dep) => ( - - - - - ))} - -
DependencyReason
{dep.name}{dep.reason}
+ + {isLoading && } + {!!error && ( + + + An error occurred while fetching task dependencies. + + )} + + {dependencies && ( + + Dependencies Blocking Task From Getting Scheduled + + + + + + + {dependencies.dependencies?.map((dep) => ( + + + + + ))} + +
DependencyReason
{dep.name}{dep.reason}
+
+ )}
); }; From 469872fba5518dd596c1a57e9f479c2db3951bbe Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Thu, 9 May 2024 19:03:02 +0530 Subject: [PATCH 7/8] Update version number in API. --- airflow/api_connexion/openapi/v1.yaml | 4 ++-- airflow/www/static/js/types/api-generated.ts | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 5e62f8ae395d7..4e25341ace491 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -690,7 +690,7 @@ paths: description: | Get task dependencies blocking task from getting scheduled. - *New in version 2.9.0* + *New in version 2.10.0* x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint operationId: get_task_instance_dependencies tags: [TaskInstance] @@ -723,7 +723,7 @@ paths: description: | Get task dependencies blocking task from getting scheduled. - *New in version 2.9.0* + *New in version 2.10.0* x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint operationId: get_mapped_task_instance_dependencies tags: [TaskInstance] diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index b8cddda22af42..c6cc5daddd716 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -143,7 +143,7 @@ export interface paths { /** * Get task dependencies blocking task from getting scheduled. * - * *New in version 2.9.0* + * *New in version 2.10.0* */ get: operations["get_task_instance_dependencies"]; parameters: { @@ -161,7 +161,7 @@ export interface paths { /** * Get task dependencies blocking task from getting scheduled. * - * *New in version 2.9.0* + * *New in version 2.10.0* */ get: operations["get_mapped_task_instance_dependencies"]; parameters: { @@ -3064,7 +3064,7 @@ export interface operations { /** * Get task dependencies blocking task from getting scheduled. * - * *New in version 2.9.0* + * *New in version 2.10.0* */ get_task_instance_dependencies: { parameters: { @@ -3093,7 +3093,7 @@ export interface operations { /** * Get task dependencies blocking task from getting scheduled. * - * *New in version 2.9.0* + * *New in version 2.10.0* */ get_mapped_task_instance_dependencies: { parameters: { From f446844b3923eb4fb4c7d10eaf7cf1f922172111 Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Thu, 16 May 2024 18:09:22 +0530 Subject: [PATCH 8/8] Auto refresh dependencies. --- airflow/www/static/js/api/useTaskFailedDependency.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/www/static/js/api/useTaskFailedDependency.ts b/airflow/www/static/js/api/useTaskFailedDependency.ts index b1176e17eb455..08168deebffaf 100644 --- a/airflow/www/static/js/api/useTaskFailedDependency.ts +++ b/airflow/www/static/js/api/useTaskFailedDependency.ts @@ -18,6 +18,7 @@ */ import axios, { AxiosResponse } from "axios"; +import { useAutoRefresh } from "src/context/autorefresh"; import type { API } from "src/types"; import { useQuery } from "react-query"; import { getMetaValue } from "../utils"; @@ -36,6 +37,7 @@ export default function useTaskFailedDependency({ runId: string; mapIndex?: number | undefined; }) { + const { isRefreshOn } = useAutoRefresh(); return useQuery( ["taskFailedDependencies", dagId, taskId, runId, mapIndex], async () => { @@ -55,6 +57,7 @@ export default function useTaskFailedDependency({ API.TaskInstanceDependencyCollection >(url); return datum; - } + }, + { refetchInterval: isRefreshOn && (autoRefreshInterval || 1) * 1000 } ); }