diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py index 3c7d4c8070f34..3930fd945ea45 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py @@ -19,14 +19,19 @@ from uuid import UUID -from fastapi import APIRouter, status +from fastapi import APIRouter, Security, status from sqlalchemy import select from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.common.types import UtcDateTime +from airflow.api_fastapi.execution_api.security import require_auth from airflow.models.taskreschedule import TaskReschedule router = APIRouter( + dependencies=[ + # Validates that the JWT sub matches the task_instance_id path parameter. + Security(require_auth, scopes=["ti:self"]), + ], responses={ status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"}, status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"}, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py index 0a71e65525505..2f5a264f4c634 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py @@ -87,6 +87,49 @@ def test_ct_self_routes_have_connection_test_id_param(client): ) +# Execution-API routes that expose a {task_instance_id} path parameter but intentionally do NOT +# enforce the ti:self scope. Add a route here only with a clear justification. +TI_ID_ROUTES_WITHOUT_TI_SELF: set[str] = set() + + +def test_routes_with_task_instance_id_param_enforce_ti_self(client): + """Dual of :func:`test_ti_self_routes_have_task_instance_id_param`. + + Every operation that exposes a ``{task_instance_id}`` path parameter must require the + ``ti:self`` scope, so a caller can only act on its own task instance -- unless the path is + explicitly listed in ``TI_ID_ROUTES_WITHOUT_TI_SELF``. This guards against a new endpoint + accepting a caller-supplied ``task_instance_id`` while silently skipping the ownership check + (the bug fixed alongside this test, where ``/task-reschedules/{task_instance_id}/start_date`` + lacked it). Checked against the served OpenAPI spec of every API version, since the execution + API assembles its routes per version. + """ + http_methods = {"get", "put", "post", "delete", "patch", "options", "head", "trace"} + offenders = [] + checked = 0 + for version in bundle.versions: + spec = client.get(f"/execution/openapi.json?version={version.value}").json() + for path, operations in spec.get("paths", {}).items(): + if "{task_instance_id}" not in path or path in TI_ID_ROUTES_WITHOUT_TI_SELF: + continue + for method, operation in operations.items(): + if method.lower() not in http_methods or not isinstance(operation, dict): + continue + checked += 1 + requirements = operation.get("security") or [] + has_ti_self = any( + "ti:self" in scopes for requirement in requirements for scopes in requirement.values() + ) + if not has_ti_self: + offenders.append(f"[{version.value}] {method.upper()} {path}") + + assert checked, "Found no {task_instance_id} operations in any API version -- the test is vacuous." + assert not offenders, ( + "These execution-API operations expose a {task_instance_id} path parameter without the " + "ti:self scope. Add `Security(require_auth, scopes=['ti:self'])` to the router, or add the " + "path to TI_ID_ROUTES_WITHOUT_TI_SELF with a justification:\n" + "\n".join(sorted(offenders)) + ) + + class TestCorrelationIdMiddleware: def test_correlation_id_echoed_in_response_headers(self, client): """Test that correlation-id from request is echoed back in response headers."""