diff --git a/airflow-core/package-lock.json b/airflow-core/package-lock.json new file mode 100644 index 0000000000000..47020b70c4680 --- /dev/null +++ b/airflow-core/package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "airflow-core", + "lockfileVersion": 3, + "requires": true, + "packages": {} +} diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py index f33b12e6f7e8a..2207370e0965a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py @@ -17,7 +17,10 @@ from __future__ import annotations -from fastapi import Depends, HTTPException, status +from datetime import datetime +from typing import Annotated + +from fastapi import Depends, HTTPException, Query, status from sqlalchemy import or_, select, union_all from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity @@ -59,8 +62,16 @@ def get_gantt_data( dag_id: str, run_id: str, session: SessionDep, + start_date_range: Annotated[datetime | None, Query(alias="start_date")] = None, + end_date_range: Annotated[datetime | None, Query(alias="end_date")] = None, ) -> GanttResponse: """Get all task instance tries for Gantt chart.""" + if start_date_range and end_date_range and start_date_range > end_date_range: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "start_date cannot be greater than end_date", + ) + # Exclude mapped tasks (use grid summaries) and UP_FOR_RETRY (already in history) current_tis = select( TaskInstance.task_id.label("task_id"), @@ -74,6 +85,16 @@ def get_gantt_data( TaskInstance.run_id == run_id, TaskInstance.map_index == -1, or_(TaskInstance.state != TaskInstanceState.UP_FOR_RETRY, TaskInstance.state.is_(None)), + *( + [or_(TaskInstance.end_date >= start_date_range, TaskInstance.end_date.is_(None))] + if start_date_range is not None + else [] + ), + *( + [or_(TaskInstance.start_date <= end_date_range, TaskInstance.start_date.is_(None))] + if end_date_range is not None + else [] + ), ) history_tis = select( @@ -87,6 +108,21 @@ def get_gantt_data( TaskInstanceHistory.dag_id == dag_id, TaskInstanceHistory.run_id == run_id, TaskInstanceHistory.map_index == -1, + *( + [or_(TaskInstanceHistory.end_date >= start_date_range, TaskInstanceHistory.end_date.is_(None))] + if start_date_range is not None + else [] + ), + *( + [ + or_( + TaskInstanceHistory.start_date <= end_date_range, + TaskInstanceHistory.start_date.is_(None), + ) + ] + if end_date_range is not None + else [] + ), ) combined = union_all(current_tis, history_tis).subquery() @@ -97,7 +133,7 @@ def get_gantt_data( if not results: raise HTTPException( status.HTTP_404_NOT_FOUND, - f"No task instances for dag_id={dag_id} run_id={run_id}", + detail="DAG or DagRun not found", ) task_instances = [ diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index e44f9cc3856e1..624d0e70c900a 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -96,6 +96,7 @@ "duration": "Duration", "edit": "Edit", "endDate": "End Date", + "endDateBeforeStartDate": "End date cannot be before start date", "error": { "back": "Back", "defaultMessage": "An unexpected error occurred", @@ -212,6 +213,7 @@ "sourceAssetEvent_one": "Source Asset Event", "sourceAssetEvent_other": "Source Asset Events", "startDate": "Start Date", + "startDateAfterEndDate": "Start date cannot be after end date", "state": "State", "states": { "deferred": "Deferred", diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx index 62de32cdb98bb..f994020c5d92a 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { Box, useToken } from "@chakra-ui/react"; +import { Box, Field, Input, useToken } from "@chakra-ui/react"; import { Chart as ChartJS, CategoryScale, @@ -33,7 +33,7 @@ import { import "chart.js/auto"; import "chartjs-adapter-dayjs-4/dist/chartjs-adapter-dayjs-4.esm"; import annotationPlugin from "chartjs-plugin-annotation"; -import { useDeferredValue } from "react"; +import { useDeferredValue, useState } from "react"; import { Bar } from "react-chartjs-2"; import { useTranslation } from "react-i18next"; import { useParams, useNavigate, useLocation, useSearchParams } from "react-router-dom"; @@ -71,8 +71,6 @@ ChartJS.register( type Props = { readonly dagRunState?: DagRunState | undefined; readonly limit: number; - readonly runAfterGte?: string | undefined; - readonly runAfterLte?: string | undefined; readonly runType?: DagRunType | undefined; readonly triggeringUser?: string | undefined; }; @@ -81,8 +79,12 @@ const CHART_PADDING = 36; const CHART_ROW_HEIGHT = 20; const MIN_BAR_WIDTH = 10; -export const Gantt = ({ dagRunState, limit, runAfterGte, runAfterLte, runType, triggeringUser }: Props) => { +export const Gantt = ({ dagRunState, limit, runType, triggeringUser }: Props) => { const { dagId = "", groupId: selectedGroupId, runId = "", taskId: selectedTaskId } = useParams(); + const [filterStartDate, setFilterStartDate] = useState(""); + const [filterEndDate, setFilterEndDate] = useState(""); + const [dateError, setDateError] = useState(""); + const [searchParams] = useSearchParams(); const { openGroupIds } = useOpenGroups(); const deferredOpenGroupIds = useDeferredValue(openGroupIds); @@ -116,8 +118,6 @@ export const Gantt = ({ dagRunState, limit, runAfterGte, runAfterLte, runType, t const { data: gridRuns, isLoading: runsLoading } = useGridRuns({ dagRunState, limit, - runAfterGte, - runAfterLte, runType, triggeringUser, }); @@ -143,9 +143,15 @@ export const Gantt = ({ dagRunState, limit, runAfterGte, runAfterLte, runType, t const gridTiSummaries = summariesByRunId.get(runId); const summariesLoading = Boolean(runId && selectedRun && !summariesByRunId.has(runId)); - // Single fetch for all Gantt data (individual task tries) + // startDate and endDate are sent to the backend as query parameters. + // The server filters the data — NOT the browser. const { data: ganttData, isLoading: ganttLoading } = useGanttServiceGetGanttData( - { dagId, runId }, + { + dagId, + runId, + startDate: filterStartDate ? `${filterStartDate}T00:00:00Z` : undefined, + endDate: filterEndDate ? `${filterEndDate}T23:59:59Z` : undefined, + }, undefined, { enabled: Boolean(dagId) && Boolean(runId) && Boolean(selectedRun), @@ -212,7 +218,7 @@ export const Gantt = ({ dagRunState, limit, runAfterGte, runAfterLte, runType, t translate, }); - if (runId === "" || (!isLoading && !selectedRun)) { + if (runId === "") { return undefined; } @@ -228,21 +234,74 @@ export const Gantt = ({ dagRunState, limit, runAfterGte, runAfterLte, runType, t }; return ( - - - + <> + {/* Date range inputs — values are sent to backend as query params, no client-side filtering */} + + + + {translate("startDate")} + + { + const value = e.target.value; + + if (value && filterEndDate && value > filterEndDate) { + setDateError(translate("startDateAfterEndDate")); + return; + } + setDateError(""); + setFilterStartDate(value); + }} + placeholder="YYYY-MM-DD" + size="sm" + type="date" + value={filterStartDate} + /> + + + + + {translate("endDate")} + + { + const value = e.target.value; + + if (value && filterStartDate && value < filterStartDate) { + setDateError(translate("endDateBeforeStartDate")); + return; + } + setDateError(""); + setFilterEndDate(value); + }} + placeholder="YYYY-MM-DD" + size="sm" + type="date" + value={filterEndDate} + /> + {dateError ? {dateError} : undefined} + + + + + + ); }; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py index 162c82682afcf..ade423c7fb73b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py @@ -326,3 +326,65 @@ def test_should_response_404(self, test_client, dag_id, run_id): with assert_queries_count(3): response = test_client.get(f"/gantt/{dag_id}/{run_id}") assert response.status_code == 404 + + def test_gantt_with_start_date(self, test_client): + """Filtering by start_date excludes tasks that ended before the filter.""" + # Get all tasks without filter + unfiltered = test_client.get(f"/gantt/{DAG_ID}/run_1") + assert unfiltered.status_code == 200 + unfiltered_ids = [ti["task_id"] for ti in unfiltered.json()["task_instances"]] + + # Filter: only tasks with end_date >= 10:06 (or NULL end_date for running tasks) + # task ends at 10:05 → excluded + # task2 ends at 10:10 → included + # task3 has NULL end_date (running) → always included + response = test_client.get( + f"/gantt/{DAG_ID}/run_1", + params={"start_date": "2024-11-30T10:06:00Z"}, + ) + assert response.status_code == 200 + + data = response.json() + task_ids = [ti["task_id"] for ti in data["task_instances"]] + + # running task (NULL end_date) must always be included + assert "task3" in task_ids + # task2 ended after the filter start → included + assert "task2" in task_ids + # task ended before the filter start → excluded + assert "task" not in task_ids + # filtered result must have fewer tasks than unfiltered + assert len(task_ids) < len(unfiltered_ids) + + def test_gantt_with_end_date(self, test_client): + """Filtering by end_date excludes tasks that started after the filter.""" + # Filter: only tasks with start_date <= 10:04 + # task starts at 10:00 → included + # task2 starts at 10:05 → excluded + # task3 starts at 10:10 → excluded + response = test_client.get( + f"/gantt/{DAG_ID}/run_1", + params={"end_date": "2024-11-30T10:04:00Z"}, + ) + assert response.status_code == 200 + + data = response.json() + task_ids = [ti["task_id"] for ti in data["task_instances"]] + + # task started before end_date → included + assert "task" in task_ids + # task3 started after end_date → excluded + assert "task3" not in task_ids + # task2 started after end_date → excluded + assert "task2" not in task_ids + + def test_invalid_date_range_returns_400(self, test_client): + """start_date after end_date must return 400.""" + response = test_client.get( + f"/gantt/{DAG_ID}/run_1", + params={ + "start_date": "2024-12-01T00:00:00Z", + "end_date": "2024-11-01T00:00:00Z", + }, + ) + assert response.status_code == 400 diff --git a/scripts/ci/prek/check_excluded_provider_markers.py b/scripts/ci/prek/check_excluded_provider_markers.py old mode 100644 new mode 100755 diff --git a/scripts/ci/prek/check_metrics_synced_with_the_registry.py b/scripts/ci/prek/check_metrics_synced_with_the_registry.py old mode 100644 new mode 100755 diff --git a/scripts/ci/prek/check_registry_types_json_sync.py b/scripts/ci/prek/check_registry_types_json_sync.py old mode 100644 new mode 100755