From 8fb5f5bab01143251a3fd22a074f3428f3bb9d1d Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 21 Nov 2024 12:54:05 +0530 Subject: [PATCH 01/15] Migrate log endpoint --- .../core_api/datamodels/task_instances.py | 8 + .../core_api/openapi/v1-generated.yaml | 84 ++++ .../core_api/routes/public/task_instances.py | 102 ++++- airflow/ui/openapi-gen/queries/common.ts | 33 ++ airflow/ui/openapi-gen/queries/prefetch.ts | 55 +++ airflow/ui/openapi-gen/queries/queries.ts | 56 +++ airflow/ui/openapi-gen/queries/suspense.ts | 56 +++ .../ui/openapi-gen/requests/services.gen.ts | 40 ++ airflow/ui/openapi-gen/requests/types.gen.ts | 39 ++ .../routes/public/test_task_instances.py | 363 ++++++++++++++++++ 10 files changed, 832 insertions(+), 4 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow/api_fastapi/core_api/datamodels/task_instances.py index 4712df3273a4e..5428a7c6e51fb 100644 --- a/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -150,3 +150,11 @@ class TaskInstanceHistoryCollectionResponse(BaseModel): task_instances: list[TaskInstanceHistoryResponse] total_entries: int + + +# Response Models +class TaskInstancesLogResponseObject(BaseModel): + """Log serializer for responses.""" + + content: str + continuation_token: str | None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 2f74f2268928f..8b0cbae02696a 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -4181,6 +4181,90 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}: + get: + tags: + - Task Instance + summary: Get Log + description: Get logs for specific task instance. + operationId: get_log + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: task_try_number + in: path + required: true + schema: + type: integer + title: Task Try Number + - name: full_content + in: query + required: false + schema: + type: boolean + default: false + title: Full Content + - name: map_index + in: query + required: false + schema: + type: integer + default: -1 + title: Map Index + - name: token + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Token + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/tasks: get: tags: diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index f4769a981b882..fd753928e1491 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -17,9 +17,11 @@ from __future__ import annotations -from typing import Annotated, Literal +from typing import Annotated, Any, Literal -from fastapi import Depends, HTTPException, Request, status +# from flask import Response +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer from sqlalchemy.orm import Session, joinedload from sqlalchemy.sql import select @@ -53,15 +55,17 @@ TaskInstanceHistoryResponse, TaskInstanceResponse, TaskInstancesBatchBody, + TaskInstancesLogResponseObject, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.exceptions import TaskNotFound -from airflow.models import Base, DagRun +from airflow.models import Base, DagRun, TaskInstance, Trigger from airflow.models.taskinstance import TaskInstance as TI -from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH +from airflow.models.taskinstancehistory import TaskInstanceHistory, TaskInstanceHistory as TIH from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS from airflow.utils.db import get_query_count +from airflow.utils.log.log_reader import TaskLogReader from airflow.utils.state import TaskInstanceState task_instances_router = AirflowRouter( @@ -482,3 +486,93 @@ def get_mapped_task_instance_try_details( map_index=map_index, session=session, ) + + +@task_instances_router.get( + "/{task_id}/logs/{task_try_number}", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + response_model=None, +) +def get_log( + *, + dag_id: str, + dag_run_id: str, + task_id: str, + task_try_number: int, + request: Request, + session: Annotated[Session, Depends(get_session)], + full_content: bool = False, + map_index: int = -1, + token: str | None = None, +) -> Response | dict: + """Get logs for specific task instance.""" + if not token: + metadata = {} + else: + try: + metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) + except BadSignature: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." + ) + + if metadata.get("download_logs") and metadata["download_logs"]: + full_content = True + + if full_content: + metadata["download_logs"] = True + else: + metadata["download_logs"] = False + + task_log_reader = TaskLogReader() + + print("\n\ntask_log_reader : ", task_log_reader) + + if not task_log_reader.supports_read: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.") + + query = ( + select(TaskInstance) + .where( + TaskInstance.task_id == task_id, + TaskInstance.dag_id == dag_id, + TaskInstance.run_id == dag_run_id, + TaskInstance.map_index == map_index, + ) + .join(TaskInstance.dag_run) + .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job)) + ) + ti = session.scalar(query) + if ti is None: + query = select(TaskInstanceHistory).where( + TaskInstanceHistory.task_id == task_id, + TaskInstanceHistory.dag_id == dag_id, + TaskInstanceHistory.run_id == dag_run_id, + TaskInstanceHistory.map_index == map_index, + TaskInstanceHistory.try_number == task_try_number, + ) + ti = session.scalar(query) + + if ti is None: + metadata["end_of_log"] = True + raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found") + + dag = request.app.state.dag_bag.get_dag(dag_id) + if dag: + try: + ti.task = dag.get_task(ti.task_id) + except TaskNotFound: + pass + + return_type = request.headers["accept"] + # return_type would be either the above two or None + logs: Any + if return_type == "application/json" or return_type is None: # default + logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) + logs = logs[0] if task_try_number is not None else logs + # we must have token here, so we can safely ignore it + token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment] + return TaskInstancesLogResponseObject(continuation_token=token, content=str(logs)).model_dump() + # text/plain. Stream + logs = task_log_reader.read_log_stream(ti, task_try_number, metadata) + return Response(media_type="text/plain", content="".join(list(logs))) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 736425218022a..8a45206dadb6e 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1180,6 +1180,39 @@ export const UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn = ( useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]), ]; +export type TaskInstanceServiceGetLogDefaultResponse = Awaited< + ReturnType +>; +export type TaskInstanceServiceGetLogQueryResult< + TData = TaskInstanceServiceGetLogDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useTaskInstanceServiceGetLogKey = "TaskInstanceServiceGetLog"; +export const UseTaskInstanceServiceGetLogKeyFn = ( + { + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }: { + dagId: string; + dagRunId: string; + fullContent?: boolean; + mapIndex?: number; + taskId: string; + taskTryNumber: number; + token?: string; + }, + queryKey?: Array, +) => [ + useTaskInstanceServiceGetLogKey, + ...(queryKey ?? [ + { dagId, dagRunId, fullContent, mapIndex, taskId, taskTryNumber, token }, + ]), +]; export type TaskServiceGetTasksDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index d57dfe3d24d98..52385034ab6bb 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1598,6 +1598,61 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstanceTryDetails = ( taskTryNumber, }), }); +/** + * Get Log + * Get logs for specific task instance. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.fullContent + * @param data.mapIndex + * @param data.token + * @returns unknown Successful Response + * @throws ApiError + */ +export const prefetchUseTaskInstanceServiceGetLog = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }: { + dagId: string; + dagRunId: string; + fullContent?: boolean; + mapIndex?: number; + taskId: string; + taskTryNumber: number; + token?: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseTaskInstanceServiceGetLogKeyFn({ + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }), + queryFn: () => + TaskInstanceService.getLog({ + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }), + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 2ca159f465f51..d77353e8abb82 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1905,6 +1905,62 @@ export const useTaskInstanceServiceGetMappedTaskInstanceTryDetails = < }) as TData, ...options, }); +/** + * Get Log + * Get logs for specific task instance. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.fullContent + * @param data.mapIndex + * @param data.token + * @returns unknown Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetLog = < + TData = Common.TaskInstanceServiceGetLogDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }: { + dagId: string; + dagRunId: string; + fullContent?: boolean; + mapIndex?: number; + taskId: string; + taskTryNumber: number; + token?: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseTaskInstanceServiceGetLogKeyFn( + { dagId, dagRunId, fullContent, mapIndex, taskId, taskTryNumber, token }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getLog({ + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }) as TData, + ...options, + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 50ccc8a3c6820..2ba420ed82d8a 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1887,6 +1887,62 @@ export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsSuspense = < }) as TData, ...options, }); +/** + * Get Log + * Get logs for specific task instance. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.fullContent + * @param data.mapIndex + * @param data.token + * @returns unknown Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetLogSuspense = < + TData = Common.TaskInstanceServiceGetLogDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }: { + dagId: string; + dagRunId: string; + fullContent?: boolean; + mapIndex?: number; + taskId: string; + taskTryNumber: number; + token?: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseTaskInstanceServiceGetLogKeyFn( + { dagId, dagRunId, fullContent, mapIndex, taskId, taskTryNumber, token }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getLog({ + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }) as TData, + ...options, + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 63272c5e0c470..ad988fbda7e8d 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -129,6 +129,8 @@ import type { GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, + GetLogData, + GetLogResponse, GetTasksData, GetTasksResponse, GetTaskData, @@ -2186,6 +2188,44 @@ export class TaskInstanceService { }, }); } + + /** + * Get Log + * Get logs for specific task instance. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.fullContent + * @param data.mapIndex + * @param data.token + * @returns unknown Successful Response + * @throws ApiError + */ + public static getLog(data: GetLogData): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + task_try_number: data.taskTryNumber, + }, + query: { + full_content: data.fullContent, + map_index: data.mapIndex, + token: data.token, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class TaskService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 926932379b350..b603afb6dd19d 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1632,6 +1632,18 @@ export type GetMappedTaskInstanceTryDetailsData = { export type GetMappedTaskInstanceTryDetailsResponse = TaskInstanceHistoryResponse; +export type GetLogData = { + dagId: string; + dagRunId: string; + fullContent?: boolean; + mapIndex?: number; + taskId: string; + taskTryNumber: number; + token?: string | null; +}; + +export type GetLogResponse = unknown; + export type GetTasksData = { dagId: string; orderBy?: string; @@ -3380,6 +3392,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}": { + get: { + req: GetLogData; + res: { + /** + * Successful Response + */ + 200: unknown; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dags/{dag_id}/tasks": { get: { req: GetTasksData; diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index f8e75600171b3..76b1aa4b36ee0 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -17,23 +17,35 @@ from __future__ import annotations +import copy import datetime as dt import itertools +import logging.config import os +import sys from unittest import mock +from unittest.mock import PropertyMock import pendulum import pytest +from fastapi.testclient import TestClient +from itsdangerous.url_safe import URLSafeSerializer +from airflow.api_fastapi.app import create_app +from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +from airflow.decorators import task from airflow.jobs.job import Job from airflow.jobs.triggerer_job_runner import TriggererJobRunner from airflow.models import DagRun, TaskInstance from airflow.models.baseoperator import BaseOperator +from airflow.models.dag import DAG from airflow.models.dagbag import DagBag from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.models.taskmap import TaskMap from airflow.models.trigger import Trigger +from airflow.operators.empty import EmptyOperator +from airflow.utils import timezone from airflow.utils.platform import getuser from airflow.utils.state import State, TaskInstanceState from airflow.utils.timezone import datetime @@ -1663,3 +1675,354 @@ def test_raises_404_for_nonexistent_task_instance(self, test_client, session): assert response.json() == { "detail": "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `nonexistent_task`, try_number: `0` and map_index: `-1` was not found" } + + +class TestTaskInstancesLog: + DAG_ID = "dag_for_testing_log_endpoint" + RUN_ID = "dag_run_id_for_testing_log_endpoint" + TASK_ID = "task_for_testing_log_endpoint" + MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint" + TRY_NUMBER = 1 + + default_time = "2020-06-10T20:00:00+00:00" + + @pytest.fixture(autouse=True) + def setup_attrs(self, configure_loggers, dag_maker, session) -> None: + self.app = create_app() + self.client = TestClient(self.app) + # Make sure that the configure_logging is not cached + self.old_modules = dict(sys.modules) + + with dag_maker(self.DAG_ID, start_date=timezone.parse(self.default_time), session=session) as dag: + EmptyOperator(task_id=self.TASK_ID) + + @task(task_id=self.MAPPED_TASK_ID) + def add_one(x: int): + return x + 1 + + add_one.expand(x=[1, 2, 3]) + + dr = dag_maker.create_dagrun( + run_id=self.RUN_ID, + run_type=DagRunType.SCHEDULED, + logical_date=timezone.parse(self.default_time), + start_date=timezone.parse(self.default_time), + ) + + self.app.state.dag_bag.bag_dag(dag) + + # Add dummy dag for checking picking correct log with same task_id and different dag_id case. + with dag_maker( + f"{self.DAG_ID}_copy", start_date=timezone.parse(self.default_time), session=session + ) as dummy_dag: + EmptyOperator(task_id=self.TASK_ID) + dr2 = dag_maker.create_dagrun( + run_id=self.RUN_ID, + run_type=DagRunType.SCHEDULED, + logical_date=timezone.parse(self.default_time), + start_date=timezone.parse(self.default_time), + ) + self.app.state.dag_bag.bag_dag(dummy_dag) + + for ti in dr.task_instances: + ti.try_number = 1 + ti.hostname = "localhost" + session.merge(ti) + for ti in dr2.task_instances: + ti.try_number = 1 + ti.hostname = "localhost" + session.merge(ti) + session.flush() + dag.clear() + dummy_dag.clear() + for ti in dr.task_instances: + ti.try_number = 2 + ti.hostname = "localhost" + session.merge(ti) + for ti in dr2.task_instances: + ti.try_number = 2 + ti.hostname = "localhost" + session.merge(ti) + session.flush() + + @pytest.fixture + def configure_loggers(self, tmp_path, create_log_template): + self.log_dir = tmp_path + + # TASK_ID + dir_path = tmp_path / f"dag_id={self.DAG_ID}" / f"run_id={self.RUN_ID}" / f"task_id={self.TASK_ID}" + dir_path.mkdir(parents=True) + + log = dir_path / "attempt=1.log" + log.write_text("Log for testing.") + + # try number 2 + log = dir_path / "attempt=2.log" + log.write_text("Log for testing 2.") + + # MAPPED_TASK_ID + for map_index in range(3): + dir_path = ( + tmp_path + / f"dag_id={self.DAG_ID}" + / f"run_id={self.RUN_ID}" + / f"task_id={self.MAPPED_TASK_ID}" + / f"map_index={map_index}" + ) + + dir_path.mkdir(parents=True) + + log = dir_path / "attempt=1.log" + log.write_text("Log for testing.") + + # try number 2 + log = dir_path / "attempt=2.log" + log.write_text("Log for testing 2.") + + # Create a custom logging configuration + logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG) + logging_config["handlers"]["task"]["base_log_folder"] = self.log_dir + + logging.config.dictConfig(logging_config) + + yield + + logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) + + def teardown_method(self): + clear_db_runs() + + @pytest.mark.parametrize("try_number", [1, 2]) + def test_should_respond_200_json(self, try_number): + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": False}) + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}", + params={"token": token}, + headers={"Accept": "application/json"}, + # environ_overrides={"REMOTE_USER": "test"}, + ) + expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log" + log_content = "Log for testing." if try_number == 1 else "Log for testing 2." + print("\n\n\n response.json()", response.json()) + assert "[('localhost'," in response.json()["content"] + assert f"*** Found local files:\\n*** * {expected_filename}\\n" in response.json()["content"] + assert f"{log_content}')]" in response.json()["content"] + + info = serializer.loads(response.json()["continuation_token"]) + assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1 else 18} + assert 200 == response.status_code + + @pytest.mark.parametrize( + "request_url, expected_filename, extra_query_string, try_number", + [ + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log", + {}, + 1, + ), + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log", + {"map_index": 0}, + 1, + ), + # try_number 2 + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log", + {}, + 2, + ), + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log", + {"map_index": 0}, + 2, + ), + ], + ) + def test_should_respond_200_text_plain( + self, request_url, expected_filename, extra_query_string, try_number + ): + expected_filename = expected_filename.replace("LOG_DIR", str(self.log_dir)) + + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + request_url, + params={"token": token, **extra_query_string}, + headers={"Accept": "text/plain"}, + # environ_overrides={"REMOTE_USER": "test"}, + ) + assert 200 == response.status_code + + log_content = "Log for testing." if try_number == 1 else "Log for testing 2." + assert "localhost\n" in response.content.decode("utf-8") + assert f"*** Found local files:\n*** * {expected_filename}\n" in response.content.decode("utf-8") + assert f"{log_content}\n" in response.content.decode("utf-8") + + @pytest.mark.parametrize( + "request_url, expected_filename, extra_query_string, try_number", + [ + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log", + {}, + 1, + ), + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log", + {"map_index": 0}, + 1, + ), + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log", + {}, + 2, + ), + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log", + {"map_index": 0}, + 2, + ), + ], + ) + def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_query_string, try_number): + expected_filename = expected_filename.replace("LOG_DIR", str(self.log_dir)) + + # Recreate DAG without tasks + dagbag = self.app.state.dag_bag + dag = DAG(self.DAG_ID, schedule=None, start_date=timezone.parse(self.default_time)) + del dagbag.dags[self.DAG_ID] + dagbag.bag_dag(dag=dag) + + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + request_url, + params={"token": token, **extra_query_string}, + headers={"Accept": "text/plain"}, + # environ_overrides={"REMOTE_USER": "test"}, + ) + + assert 200 == response.status_code + + log_content = "Log for testing." if try_number == 1 else "Log for testing 2." + assert "localhost\n" in response.content.decode("utf-8") + assert f"*** Found local files:\n*** * {expected_filename}\n" in response.content.decode("utf-8") + assert f"{log_content}\n" in response.content.decode("utf-8") + + @pytest.mark.parametrize("try_number", [1, 2]) + def test_get_logs_response_with_ti_equal_to_none(self, try_number): + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/Invalid-Task-ID/logs/{try_number}", + params={"token": token}, + # environ_overrides={"REMOTE_USER": "test"}, + ) + print("response.json(): ", response.json()) + assert response.status_code == 404 + assert response.json() == {"detail": "TaskInstance not found"} + + @pytest.mark.parametrize("try_number", [1, 2]) + def test_get_logs_with_metadata_as_download_large_file(self, try_number): + with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") as read_mock: + first_return = ([[("", "1st line")]], [{}]) + second_return = ([[("", "2nd line")]], [{"end_of_log": False}]) + third_return = ([[("", "3rd line")]], [{"end_of_log": True}]) + fourth_return = ([[("", "should never be read")]], [{"end_of_log": True}]) + read_mock.side_effect = [first_return, second_return, third_return, fourth_return] + + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/" + f"taskInstances/{self.TASK_ID}/logs/{try_number}?full_content=True", + headers={"Accept": "text/plain"}, + # environ_overrides={"REMOTE_USER": "test"}, + ) + + assert "1st line" in response.content.decode("utf-8") + assert "2nd line" in response.content.decode("utf-8") + assert "3rd line" in response.content.decode("utf-8") + assert "should never be read" not in response.content.decode("utf-8") + + @pytest.mark.parametrize("try_number", [1, 2]) + @mock.patch("airflow.api_fastapi.core_api.routes.public.task_instances.TaskLogReader") + def test_get_logs_for_handler_without_read_method(self, mock_log_reader, try_number): + type(mock_log_reader.return_value).supports_read = PropertyMock(return_value=False) + + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": False}) + + # check guessing + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}", + params={"token": token}, + headers={"Content-Type": "application/jso"}, + # environ_overrides={"REMOTE_USER": "test"}, + ) + assert 400 == response.status_code + assert "Task log handler does not support read logs." in response.content.decode("utf-8") + + def test_bad_signature_raises(self): + token = {"download_logs": False} + + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", + params={"token": token}, + headers={"Accept": "application/json"}, + # environ_overrides={"REMOTE_USER": "test"}, + ) + # assert response.status_code == 400 + assert response.json() == {"detail": "Bad Signature. Please use only the tokens provided by the API."} + + def test_raises_404_for_invalid_dag_run_id(self): + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/NO_DAG_RUN/" # invalid run_id + f"taskInstances/{self.TASK_ID}/logs/1?", + headers={"Accept": "application/json"}, + # environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 404 + assert response.json() == {"detail": "TaskInstance not found"} + + def test_should_raise_404_when_missing_map_index_param_for_mapped_task(self): + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.MAPPED_TASK_ID}/logs/1", + params={"token": token}, + headers={"Accept": "text/plain"}, + # environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 404 + assert response.json()["detail"] == "TaskInstance not found" + + def test_should_raise_404_when_filtering_on_map_index_for_unmapped_task(self): + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", + params={"token": token, "map_index": 0}, + headers={"Accept": "text/plain"}, + # environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 404 + assert response.json()["detail"] == "TaskInstance not found" From 6996c71d93ff214e219708e437a32ef49877fcd1 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 21 Nov 2024 14:48:58 +0530 Subject: [PATCH 02/15] Remove debug commit --- airflow/api_fastapi/core_api/routes/public/task_instances.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index fd753928e1491..a7ef8887c0480 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -526,8 +526,6 @@ def get_log( task_log_reader = TaskLogReader() - print("\n\ntask_log_reader : ", task_log_reader) - if not task_log_reader.supports_read: raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.") From 7dcc42f0f8ff594b55d3c533266f627f290d3bbc Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Thu, 21 Nov 2024 15:59:06 +0530 Subject: [PATCH 03/15] Update tests/api_fastapi/core_api/routes/public/test_task_instances.py Co-authored-by: Pierre Jeambrun --- tests/api_fastapi/core_api/routes/public/test_task_instances.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 97422209978e0..8c3ec7f912077 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1941,7 +1941,6 @@ def test_get_logs_response_with_ti_equal_to_none(self, try_number): params={"token": token}, # environ_overrides={"REMOTE_USER": "test"}, ) - print("response.json(): ", response.json()) assert response.status_code == 404 assert response.json() == {"detail": "TaskInstance not found"} From fdbc27d8eb0f33bec676e6d451cc786a5a487cce Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Thu, 21 Nov 2024 15:59:22 +0530 Subject: [PATCH 04/15] Update tests/api_fastapi/core_api/routes/public/test_task_instances.py Co-authored-by: Pierre Jeambrun --- tests/api_fastapi/core_api/routes/public/test_task_instances.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 8c3ec7f912077..9659f35575f7e 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1813,7 +1813,6 @@ def test_should_respond_200_json(self, try_number): ) expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log" log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - print("\n\n\n response.json()", response.json()) assert "[('localhost'," in response.json()["content"] assert f"*** Found local files:\\n*** * {expected_filename}\\n" in response.json()["content"] assert f"{log_content}')]" in response.json()["content"] From c158b20ef3223b5c0c700843d0b3b34d25573375 Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Thu, 21 Nov 2024 15:59:30 +0530 Subject: [PATCH 05/15] Update airflow/api_fastapi/core_api/datamodels/task_instances.py Co-authored-by: Pierre Jeambrun --- airflow/api_fastapi/core_api/datamodels/task_instances.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow/api_fastapi/core_api/datamodels/task_instances.py index 5428a7c6e51fb..294f540579050 100644 --- a/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -152,7 +152,6 @@ class TaskInstanceHistoryCollectionResponse(BaseModel): total_entries: int -# Response Models class TaskInstancesLogResponseObject(BaseModel): """Log serializer for responses.""" From d0813ca2881945db55c00d128754c6b8b86e864d Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Thu, 21 Nov 2024 15:59:45 +0530 Subject: [PATCH 06/15] Update tests/api_fastapi/core_api/routes/public/test_task_instances.py Co-authored-by: Pierre Jeambrun --- tests/api_fastapi/core_api/routes/public/test_task_instances.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 9659f35575f7e..59ad235ad818c 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1809,7 +1809,6 @@ def test_should_respond_200_json(self, try_number): f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}", params={"token": token}, headers={"Accept": "application/json"}, - # environ_overrides={"REMOTE_USER": "test"}, ) expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log" log_content = "Log for testing." if try_number == 1 else "Log for testing 2." From 2732061056219e7fd14611c9782ef67c0bbce9ca Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 21 Nov 2024 16:01:41 +0530 Subject: [PATCH 07/15] Remove code comments --- .../core_api/routes/public/test_task_instances.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 59ad235ad818c..b652a72eeb5ca 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1863,7 +1863,6 @@ def test_should_respond_200_text_plain( request_url, params={"token": token, **extra_query_string}, headers={"Accept": "text/plain"}, - # environ_overrides={"REMOTE_USER": "test"}, ) assert 200 == response.status_code @@ -1918,7 +1917,6 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu request_url, params={"token": token, **extra_query_string}, headers={"Accept": "text/plain"}, - # environ_overrides={"REMOTE_USER": "test"}, ) assert 200 == response.status_code @@ -1937,7 +1935,6 @@ def test_get_logs_response_with_ti_equal_to_none(self, try_number): response = self.client.get( f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/Invalid-Task-ID/logs/{try_number}", params={"token": token}, - # environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 assert response.json() == {"detail": "TaskInstance not found"} @@ -1955,7 +1952,6 @@ def test_get_logs_with_metadata_as_download_large_file(self, try_number): f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/" f"taskInstances/{self.TASK_ID}/logs/{try_number}?full_content=True", headers={"Accept": "text/plain"}, - # environ_overrides={"REMOTE_USER": "test"}, ) assert "1st line" in response.content.decode("utf-8") @@ -1977,7 +1973,6 @@ def test_get_logs_for_handler_without_read_method(self, mock_log_reader, try_num f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}", params={"token": token}, headers={"Content-Type": "application/jso"}, - # environ_overrides={"REMOTE_USER": "test"}, ) assert 400 == response.status_code assert "Task log handler does not support read logs." in response.content.decode("utf-8") @@ -1989,7 +1984,6 @@ def test_bad_signature_raises(self): f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", params={"token": token}, headers={"Accept": "application/json"}, - # environ_overrides={"REMOTE_USER": "test"}, ) # assert response.status_code == 400 assert response.json() == {"detail": "Bad Signature. Please use only the tokens provided by the API."} @@ -1999,7 +1993,6 @@ def test_raises_404_for_invalid_dag_run_id(self): f"public/dags/{self.DAG_ID}/dagRuns/NO_DAG_RUN/" # invalid run_id f"taskInstances/{self.TASK_ID}/logs/1?", headers={"Accept": "application/json"}, - # environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 assert response.json() == {"detail": "TaskInstance not found"} @@ -2013,7 +2006,6 @@ def test_should_raise_404_when_missing_map_index_param_for_mapped_task(self): f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.MAPPED_TASK_ID}/logs/1", params={"token": token}, headers={"Accept": "text/plain"}, - # environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 assert response.json()["detail"] == "TaskInstance not found" @@ -2027,7 +2019,6 @@ def test_should_raise_404_when_filtering_on_map_index_for_unmapped_task(self): f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", params={"token": token, "map_index": 0}, headers={"Accept": "text/plain"}, - # environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 404 assert response.json()["detail"] == "TaskInstance not found" From c3d4a22d413270cf7bd027fdbddafb4a4d98bf24 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 21 Nov 2024 17:19:04 +0530 Subject: [PATCH 08/15] Address PR comments --- .../api_fastapi/core_api/datamodels/log.py | 31 ++ .../core_api/openapi/v1-generated.yaml | 196 +++++---- .../core_api/routes/public/__init__.py | 2 + .../api_fastapi/core_api/routes/public/log.py | 127 ++++++ .../core_api/routes/public/task_instances.py | 99 +---- airflow/ui/openapi-gen/queries/common.ts | 13 +- airflow/ui/openapi-gen/queries/prefetch.ts | 7 +- airflow/ui/openapi-gen/queries/queries.ts | 17 +- airflow/ui/openapi-gen/queries/suspense.ts | 17 +- .../ui/openapi-gen/requests/schemas.gen.ts | 24 ++ .../ui/openapi-gen/requests/services.gen.ts | 6 +- airflow/ui/openapi-gen/requests/types.gen.ts | 13 +- .../core_api/routes/public/test_log.py | 377 ++++++++++++++++++ 13 files changed, 741 insertions(+), 188 deletions(-) create mode 100644 airflow/api_fastapi/core_api/datamodels/log.py create mode 100644 airflow/api_fastapi/core_api/routes/public/log.py create mode 100644 tests/api_fastapi/core_api/routes/public/test_log.py diff --git a/airflow/api_fastapi/core_api/datamodels/log.py b/airflow/api_fastapi/core_api/datamodels/log.py new file mode 100644 index 0000000000000..282dc81972ed9 --- /dev/null +++ b/airflow/api_fastapi/core_api/datamodels/log.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from pydantic import BaseModel + + +class TaskInstancesLogResponse(BaseModel): + """Log serializer for responses.""" + + content: str + continuation_token: str | None + + @property + def text_format(self): + # convert all config sections to text + return "".join(list(self.content)) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 8b0cbae02696a..d061f2659eef8 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -4181,90 +4181,6 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}: - get: - tags: - - Task Instance - summary: Get Log - description: Get logs for specific task instance. - operationId: get_log - parameters: - - name: dag_id - in: path - required: true - schema: - type: string - title: Dag Id - - name: dag_run_id - in: path - required: true - schema: - type: string - title: Dag Run Id - - name: task_id - in: path - required: true - schema: - type: string - title: Task Id - - name: task_try_number - in: path - required: true - schema: - type: integer - title: Task Try Number - - name: full_content - in: query - required: false - schema: - type: boolean - default: false - title: Full Content - - name: map_index - in: query - required: false - schema: - type: integer - default: -1 - title: Map Index - - name: token - in: query - required: false - schema: - anyOf: - - type: string - - type: 'null' - title: Token - responses: - '200': - description: Successful Response - content: - application/json: - schema: {} - '401': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Unauthorized - '403': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Forbidden - '404': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Not Found - '422': - description: Validation Error - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/tasks: get: tags: @@ -4717,6 +4633,102 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}: + get: + tags: + - Task Instance + summary: Get Log + description: Get logs for specific task instance. + operationId: get_log + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: task_try_number + in: path + required: true + schema: + type: integer + title: Task Try Number + - name: full_content + in: query + required: false + schema: + type: boolean + default: false + title: Full Content + - name: map_index + in: query + required: false + schema: + type: integer + default: -1 + title: Map Index + - name: token + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Token + - name: accept + in: header + required: false + schema: + type: string + enum: + - application/json + - text/plain + - '*/*' + default: '*/*' + title: Accept + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TaskInstancesLogResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/monitor/health: get: tags: @@ -7321,6 +7333,22 @@ components: type: object title: TaskInstancesBatchBody description: Task Instance body for get batch. + TaskInstancesLogResponse: + properties: + content: + type: string + title: Content + continuation_token: + anyOf: + - type: string + - type: 'null' + title: Continuation Token + type: object + required: + - content + - continuation_token + title: TaskInstancesLogResponse + description: Log serializer for responses. TaskOutletAssetReference: properties: dag_id: diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py b/airflow/api_fastapi/core_api/routes/public/__init__.py index ae4035f6d3532..2b194fafae595 100644 --- a/airflow/api_fastapi/core_api/routes/public/__init__.py +++ b/airflow/api_fastapi/core_api/routes/public/__init__.py @@ -32,6 +32,7 @@ from airflow.api_fastapi.core_api.routes.public.dags import dags_router from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router from airflow.api_fastapi.core_api.routes.public.import_error import import_error_router +from airflow.api_fastapi.core_api.routes.public.log import task_instances_log_router from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router from airflow.api_fastapi.core_api.routes.public.plugins import plugins_router from airflow.api_fastapi.core_api.routes.public.pools import pools_router @@ -67,6 +68,7 @@ authenticated_router.include_router(tasks_router) authenticated_router.include_router(variables_router) authenticated_router.include_router(xcom_router) +authenticated_router.include_router(task_instances_log_router) # Include authenticated router in public router public_router.include_router(authenticated_router) diff --git a/airflow/api_fastapi/core_api/routes/public/log.py b/airflow/api_fastapi/core_api/routes/public/log.py new file mode 100644 index 0000000000000..1a0fa458ee2db --- /dev/null +++ b/airflow/api_fastapi/core_api/routes/public/log.py @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Annotated, Any + +from fastapi import Depends, HTTPException, Request, Response, status +from itsdangerous import BadSignature, URLSafeSerializer +from sqlalchemy.orm import Session, joinedload +from sqlalchemy.sql import select + +from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.types import Mimetype +from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.exceptions import TaskNotFound +from airflow.models import TaskInstance, Trigger +from airflow.models.taskinstancehistory import TaskInstanceHistory +from airflow.utils.log.log_reader import TaskLogReader + +task_instances_log_router = AirflowRouter( + tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" +) + + +@task_instances_log_router.get( + "/{task_id}/logs/{task_try_number}", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + response_model=TaskInstancesLogResponse, +) +def get_log( + *, + dag_id: str, + dag_run_id: str, + task_id: str, + task_try_number: int, + accept: HeaderAcceptJsonOrText, + request: Request, + session: Annotated[Session, Depends(get_session)], + full_content: bool = False, + map_index: int = -1, + token: str | None = None, +): + """Get logs for specific task instance.""" + if not token: + metadata = {} + else: + try: + metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) + except BadSignature: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." + ) + + if metadata.get("download_logs") and metadata["download_logs"]: + full_content = True + + if full_content: + metadata["download_logs"] = True + else: + metadata["download_logs"] = False + + task_log_reader = TaskLogReader() + + if not task_log_reader.supports_read: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.") + + query = ( + select(TaskInstance) + .where( + TaskInstance.task_id == task_id, + TaskInstance.dag_id == dag_id, + TaskInstance.run_id == dag_run_id, + TaskInstance.map_index == map_index, + ) + .join(TaskInstance.dag_run) + .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job)) + ) + ti = session.scalar(query) + if ti is None: + query = select(TaskInstanceHistory).where( + TaskInstanceHistory.task_id == task_id, + TaskInstanceHistory.dag_id == dag_id, + TaskInstanceHistory.run_id == dag_run_id, + TaskInstanceHistory.map_index == map_index, + TaskInstanceHistory.try_number == task_try_number, + ) + ti = session.scalar(query) + + if ti is None: + metadata["end_of_log"] = True + raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found") + + dag = request.app.state.dag_bag.get_dag(dag_id) + if dag: + try: + ti.task = dag.get_task(ti.task_id) + except TaskNotFound: + pass + + logs: Any + if accept == Mimetype.JSON or accept == Mimetype.ANY: # default + logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) + logs = logs[0] if task_try_number is not None else logs + # we must have token here, so we can safely ignore it + token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment] + return TaskInstancesLogResponse(continuation_token=token, content=str(logs)).model_dump() + # text/plain. Stream + logs = task_log_reader.read_log_stream(ti, task_try_number, metadata) + return Response(media_type=accept, content="".join(list(logs))) diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index a7ef8887c0480..b879663052ebb 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -17,11 +17,10 @@ from __future__ import annotations -from typing import Annotated, Any, Literal +from typing import Annotated, Literal # from flask import Response -from fastapi import Depends, HTTPException, Request, Response, status -from itsdangerous import BadSignature, URLSafeSerializer +from fastapi import Depends, HTTPException, Request, status from sqlalchemy.orm import Session, joinedload from sqlalchemy.sql import select @@ -55,17 +54,15 @@ TaskInstanceHistoryResponse, TaskInstanceResponse, TaskInstancesBatchBody, - TaskInstancesLogResponseObject, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.exceptions import TaskNotFound -from airflow.models import Base, DagRun, TaskInstance, Trigger +from airflow.models import Base, DagRun from airflow.models.taskinstance import TaskInstance as TI -from airflow.models.taskinstancehistory import TaskInstanceHistory, TaskInstanceHistory as TIH +from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS from airflow.utils.db import get_query_count -from airflow.utils.log.log_reader import TaskLogReader from airflow.utils.state import TaskInstanceState task_instances_router = AirflowRouter( @@ -486,91 +483,3 @@ def get_mapped_task_instance_try_details( map_index=map_index, session=session, ) - - -@task_instances_router.get( - "/{task_id}/logs/{task_try_number}", - responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), - response_model=None, -) -def get_log( - *, - dag_id: str, - dag_run_id: str, - task_id: str, - task_try_number: int, - request: Request, - session: Annotated[Session, Depends(get_session)], - full_content: bool = False, - map_index: int = -1, - token: str | None = None, -) -> Response | dict: - """Get logs for specific task instance.""" - if not token: - metadata = {} - else: - try: - metadata = URLSafeSerializer(request.app.state.secret_key).loads(token) - except BadSignature: - raise HTTPException( - status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." - ) - - if metadata.get("download_logs") and metadata["download_logs"]: - full_content = True - - if full_content: - metadata["download_logs"] = True - else: - metadata["download_logs"] = False - - task_log_reader = TaskLogReader() - - if not task_log_reader.supports_read: - raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.") - - query = ( - select(TaskInstance) - .where( - TaskInstance.task_id == task_id, - TaskInstance.dag_id == dag_id, - TaskInstance.run_id == dag_run_id, - TaskInstance.map_index == map_index, - ) - .join(TaskInstance.dag_run) - .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job)) - ) - ti = session.scalar(query) - if ti is None: - query = select(TaskInstanceHistory).where( - TaskInstanceHistory.task_id == task_id, - TaskInstanceHistory.dag_id == dag_id, - TaskInstanceHistory.run_id == dag_run_id, - TaskInstanceHistory.map_index == map_index, - TaskInstanceHistory.try_number == task_try_number, - ) - ti = session.scalar(query) - - if ti is None: - metadata["end_of_log"] = True - raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found") - - dag = request.app.state.dag_bag.get_dag(dag_id) - if dag: - try: - ti.task = dag.get_task(ti.task_id) - except TaskNotFound: - pass - - return_type = request.headers["accept"] - # return_type would be either the above two or None - logs: Any - if return_type == "application/json" or return_type is None: # default - logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) - logs = logs[0] if task_try_number is not None else logs - # we must have token here, so we can safely ignore it - token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment] - return TaskInstancesLogResponseObject(continuation_token=token, content=str(logs)).model_dump() - # text/plain. Stream - logs = task_log_reader.read_log_stream(ti, task_try_number, metadata) - return Response(media_type="text/plain", content="".join(list(logs))) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 8a45206dadb6e..fa3052dd0aa00 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1190,6 +1190,7 @@ export type TaskInstanceServiceGetLogQueryResult< export const useTaskInstanceServiceGetLogKey = "TaskInstanceServiceGetLog"; export const UseTaskInstanceServiceGetLogKeyFn = ( { + accept, dagId, dagRunId, fullContent, @@ -1198,6 +1199,7 @@ export const UseTaskInstanceServiceGetLogKeyFn = ( taskTryNumber, token, }: { + accept?: "application/json" | "text/plain" | "*/*"; dagId: string; dagRunId: string; fullContent?: boolean; @@ -1210,7 +1212,16 @@ export const UseTaskInstanceServiceGetLogKeyFn = ( ) => [ useTaskInstanceServiceGetLogKey, ...(queryKey ?? [ - { dagId, dagRunId, fullContent, mapIndex, taskId, taskTryNumber, token }, + { + accept, + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }, ]), ]; export type TaskServiceGetTasksDefaultResponse = Awaited< diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 52385034ab6bb..580419d8cf043 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1609,12 +1609,14 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstanceTryDetails = ( * @param data.fullContent * @param data.mapIndex * @param data.token - * @returns unknown Successful Response + * @param data.accept + * @returns TaskInstancesLogResponse Successful Response * @throws ApiError */ export const prefetchUseTaskInstanceServiceGetLog = ( queryClient: QueryClient, { + accept, dagId, dagRunId, fullContent, @@ -1623,6 +1625,7 @@ export const prefetchUseTaskInstanceServiceGetLog = ( taskTryNumber, token, }: { + accept?: "application/json" | "text/plain" | "*/*"; dagId: string; dagRunId: string; fullContent?: boolean; @@ -1634,6 +1637,7 @@ export const prefetchUseTaskInstanceServiceGetLog = ( ) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetLogKeyFn({ + accept, dagId, dagRunId, fullContent, @@ -1644,6 +1648,7 @@ export const prefetchUseTaskInstanceServiceGetLog = ( }), queryFn: () => TaskInstanceService.getLog({ + accept, dagId, dagRunId, fullContent, diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index d77353e8abb82..74136f42ce468 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1916,7 +1916,8 @@ export const useTaskInstanceServiceGetMappedTaskInstanceTryDetails = < * @param data.fullContent * @param data.mapIndex * @param data.token - * @returns unknown Successful Response + * @param data.accept + * @returns TaskInstancesLogResponse Successful Response * @throws ApiError */ export const useTaskInstanceServiceGetLog = < @@ -1925,6 +1926,7 @@ export const useTaskInstanceServiceGetLog = < TQueryKey extends Array = unknown[], >( { + accept, dagId, dagRunId, fullContent, @@ -1933,6 +1935,7 @@ export const useTaskInstanceServiceGetLog = < taskTryNumber, token, }: { + accept?: "application/json" | "text/plain" | "*/*"; dagId: string; dagRunId: string; fullContent?: boolean; @@ -1946,11 +1949,21 @@ export const useTaskInstanceServiceGetLog = < ) => useQuery({ queryKey: Common.UseTaskInstanceServiceGetLogKeyFn( - { dagId, dagRunId, fullContent, mapIndex, taskId, taskTryNumber, token }, + { + accept, + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }, queryKey, ), queryFn: () => TaskInstanceService.getLog({ + accept, dagId, dagRunId, fullContent, diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 2ba420ed82d8a..e84ae408555a2 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1898,7 +1898,8 @@ export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsSuspense = < * @param data.fullContent * @param data.mapIndex * @param data.token - * @returns unknown Successful Response + * @param data.accept + * @returns TaskInstancesLogResponse Successful Response * @throws ApiError */ export const useTaskInstanceServiceGetLogSuspense = < @@ -1907,6 +1908,7 @@ export const useTaskInstanceServiceGetLogSuspense = < TQueryKey extends Array = unknown[], >( { + accept, dagId, dagRunId, fullContent, @@ -1915,6 +1917,7 @@ export const useTaskInstanceServiceGetLogSuspense = < taskTryNumber, token, }: { + accept?: "application/json" | "text/plain" | "*/*"; dagId: string; dagRunId: string; fullContent?: boolean; @@ -1928,11 +1931,21 @@ export const useTaskInstanceServiceGetLogSuspense = < ) => useSuspenseQuery({ queryKey: Common.UseTaskInstanceServiceGetLogKeyFn( - { dagId, dagRunId, fullContent, mapIndex, taskId, taskTryNumber, token }, + { + accept, + dagId, + dagRunId, + fullContent, + mapIndex, + taskId, + taskTryNumber, + token, + }, queryKey, ), queryFn: () => TaskInstanceService.getLog({ + accept, dagId, dagRunId, fullContent, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index a0bb85ace80ad..9f8dffff841c1 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3942,6 +3942,30 @@ export const $TaskInstancesBatchBody = { description: "Task Instance body for get batch.", } as const; +export const $TaskInstancesLogResponse = { + properties: { + content: { + type: "string", + title: "Content", + }, + continuation_token: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Continuation Token", + }, + }, + type: "object", + required: ["content", "continuation_token"], + title: "TaskInstancesLogResponse", + description: "Log serializer for responses.", +} as const; + export const $TaskOutletAssetReference = { properties: { dag_id: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index ad988fbda7e8d..ef3551098a475 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -2200,7 +2200,8 @@ export class TaskInstanceService { * @param data.fullContent * @param data.mapIndex * @param data.token - * @returns unknown Successful Response + * @param data.accept + * @returns TaskInstancesLogResponse Successful Response * @throws ApiError */ public static getLog(data: GetLogData): CancelablePromise { @@ -2213,6 +2214,9 @@ export class TaskInstanceService { task_id: data.taskId, task_try_number: data.taskTryNumber, }, + headers: { + accept: data.accept, + }, query: { full_content: data.fullContent, map_index: data.mapIndex, diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index b603afb6dd19d..ef17f755e7f21 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -935,6 +935,14 @@ export type TaskInstancesBatchBody = { order_by?: string | null; }; +/** + * Log serializer for responses. + */ +export type TaskInstancesLogResponse = { + content: string; + continuation_token: string | null; +}; + /** * Task outlet reference serializer for assets. */ @@ -1633,6 +1641,7 @@ export type GetMappedTaskInstanceTryDetailsResponse = TaskInstanceHistoryResponse; export type GetLogData = { + accept?: "application/json" | "text/plain" | "*/*"; dagId: string; dagRunId: string; fullContent?: boolean; @@ -1642,7 +1651,7 @@ export type GetLogData = { token?: string | null; }; -export type GetLogResponse = unknown; +export type GetLogResponse = TaskInstancesLogResponse; export type GetTasksData = { dagId: string; @@ -3399,7 +3408,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: unknown; + 200: TaskInstancesLogResponse; /** * Unauthorized */ diff --git a/tests/api_fastapi/core_api/routes/public/test_log.py b/tests/api_fastapi/core_api/routes/public/test_log.py new file mode 100644 index 0000000000000..f638b2f709a78 --- /dev/null +++ b/tests/api_fastapi/core_api/routes/public/test_log.py @@ -0,0 +1,377 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import copy +import logging.config +import sys +from unittest import mock +from unittest.mock import PropertyMock + +import pytest +from itsdangerous.url_safe import URLSafeSerializer + +from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +from airflow.decorators import task +from airflow.models.dag import DAG +from airflow.operators.empty import EmptyOperator +from airflow.utils import timezone +from airflow.utils.types import DagRunType + +from tests_common.test_utils.db import clear_db_runs + +pytestmark = pytest.mark.db_test + + +class TestTaskInstancesLog: + DAG_ID = "dag_for_testing_log_endpoint" + RUN_ID = "dag_run_id_for_testing_log_endpoint" + TASK_ID = "task_for_testing_log_endpoint" + MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint" + TRY_NUMBER = 1 + + default_time = "2020-06-10T20:00:00+00:00" + + @pytest.fixture(autouse=True) + def setup_attrs(self, test_client, configure_loggers, dag_maker, session) -> None: + self.app = test_client.app + self.client = test_client + # Make sure that the configure_logging is not cached + self.old_modules = dict(sys.modules) + + with dag_maker(self.DAG_ID, start_date=timezone.parse(self.default_time), session=session) as dag: + EmptyOperator(task_id=self.TASK_ID) + + @task(task_id=self.MAPPED_TASK_ID) + def add_one(x: int): + return x + 1 + + add_one.expand(x=[1, 2, 3]) + + dr = dag_maker.create_dagrun( + run_id=self.RUN_ID, + run_type=DagRunType.SCHEDULED, + logical_date=timezone.parse(self.default_time), + start_date=timezone.parse(self.default_time), + ) + + self.app.state.dag_bag.bag_dag(dag) + + # Add dummy dag for checking picking correct log with same task_id and different dag_id case. + with dag_maker( + f"{self.DAG_ID}_copy", start_date=timezone.parse(self.default_time), session=session + ) as dummy_dag: + EmptyOperator(task_id=self.TASK_ID) + dr2 = dag_maker.create_dagrun( + run_id=self.RUN_ID, + run_type=DagRunType.SCHEDULED, + logical_date=timezone.parse(self.default_time), + start_date=timezone.parse(self.default_time), + ) + self.app.state.dag_bag.bag_dag(dummy_dag) + + for ti in dr.task_instances: + ti.try_number = 1 + ti.hostname = "localhost" + session.merge(ti) + for ti in dr2.task_instances: + ti.try_number = 1 + ti.hostname = "localhost" + session.merge(ti) + session.flush() + dag.clear() + dummy_dag.clear() + for ti in dr.task_instances: + ti.try_number = 2 + ti.hostname = "localhost" + session.merge(ti) + for ti in dr2.task_instances: + ti.try_number = 2 + ti.hostname = "localhost" + session.merge(ti) + session.flush() + + @pytest.fixture + def configure_loggers(self, tmp_path, create_log_template): + self.log_dir = tmp_path + + # TASK_ID + dir_path = tmp_path / f"dag_id={self.DAG_ID}" / f"run_id={self.RUN_ID}" / f"task_id={self.TASK_ID}" + dir_path.mkdir(parents=True) + + log = dir_path / "attempt=1.log" + log.write_text("Log for testing.") + + # try number 2 + log = dir_path / "attempt=2.log" + log.write_text("Log for testing 2.") + + # MAPPED_TASK_ID + for map_index in range(3): + dir_path = ( + tmp_path + / f"dag_id={self.DAG_ID}" + / f"run_id={self.RUN_ID}" + / f"task_id={self.MAPPED_TASK_ID}" + / f"map_index={map_index}" + ) + + dir_path.mkdir(parents=True) + + log = dir_path / "attempt=1.log" + log.write_text("Log for testing.") + + # try number 2 + log = dir_path / "attempt=2.log" + log.write_text("Log for testing 2.") + + # Create a custom logging configuration + logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG) + logging_config["handlers"]["task"]["base_log_folder"] = self.log_dir + + logging.config.dictConfig(logging_config) + + yield + + logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) + + def teardown_method(self): + clear_db_runs() + + @pytest.mark.parametrize("try_number", [1, 2]) + def test_should_respond_200_json(self, try_number): + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": False}) + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}", + params={"token": token}, + headers={"Accept": "application/json"}, + ) + expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log" + log_content = "Log for testing." if try_number == 1 else "Log for testing 2." + assert "[('localhost'," in response.json()["content"] + assert f"*** Found local files:\\n*** * {expected_filename}\\n" in response.json()["content"] + assert f"{log_content}')]" in response.json()["content"] + + info = serializer.loads(response.json()["continuation_token"]) + assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1 else 18} + assert 200 == response.status_code + + @pytest.mark.parametrize( + "request_url, expected_filename, extra_query_string, try_number", + [ + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log", + {}, + 1, + ), + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log", + {"map_index": 0}, + 1, + ), + # try_number 2 + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log", + {}, + 2, + ), + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log", + {"map_index": 0}, + 2, + ), + ], + ) + def test_should_respond_200_text_plain( + self, request_url, expected_filename, extra_query_string, try_number + ): + expected_filename = expected_filename.replace("LOG_DIR", str(self.log_dir)) + + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + request_url, + params={"token": token, **extra_query_string}, + headers={"Accept": "text/plain"}, + ) + assert 200 == response.status_code + + log_content = "Log for testing." if try_number == 1 else "Log for testing 2." + assert "localhost\n" in response.content.decode("utf-8") + assert f"*** Found local files:\n*** * {expected_filename}\n" in response.content.decode("utf-8") + assert f"{log_content}\n" in response.content.decode("utf-8") + + @pytest.mark.parametrize( + "request_url, expected_filename, extra_query_string, try_number", + [ + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log", + {}, + 1, + ), + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log", + {"map_index": 0}, + 1, + ), + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log", + {}, + 2, + ), + ( + f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2", + f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log", + {"map_index": 0}, + 2, + ), + ], + ) + def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_query_string, try_number): + expected_filename = expected_filename.replace("LOG_DIR", str(self.log_dir)) + + # Recreate DAG without tasks + dagbag = self.app.state.dag_bag + dag = DAG(self.DAG_ID, schedule=None, start_date=timezone.parse(self.default_time)) + del dagbag.dags[self.DAG_ID] + dagbag.bag_dag(dag=dag) + + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + request_url, + params={"token": token, **extra_query_string}, + headers={"Accept": "text/plain"}, + ) + + assert 200 == response.status_code + + log_content = "Log for testing." if try_number == 1 else "Log for testing 2." + assert "localhost\n" in response.content.decode("utf-8") + assert f"*** Found local files:\n*** * {expected_filename}\n" in response.content.decode("utf-8") + assert f"{log_content}\n" in response.content.decode("utf-8") + + @pytest.mark.parametrize("try_number", [1, 2]) + def test_get_logs_response_with_ti_equal_to_none(self, try_number): + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/Invalid-Task-ID/logs/{try_number}", + params={"token": token}, + ) + assert response.status_code == 404 + assert response.json() == {"detail": "TaskInstance not found"} + + @pytest.mark.parametrize("try_number", [1, 2]) + def test_get_logs_with_metadata_as_download_large_file(self, try_number): + with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") as read_mock: + first_return = ([[("", "1st line")]], [{}]) + second_return = ([[("", "2nd line")]], [{"end_of_log": False}]) + third_return = ([[("", "3rd line")]], [{"end_of_log": True}]) + fourth_return = ([[("", "should never be read")]], [{"end_of_log": True}]) + read_mock.side_effect = [first_return, second_return, third_return, fourth_return] + + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/" + f"taskInstances/{self.TASK_ID}/logs/{try_number}?full_content=True", + headers={"Accept": "text/plain"}, + ) + + assert "1st line" in response.content.decode("utf-8") + assert "2nd line" in response.content.decode("utf-8") + assert "3rd line" in response.content.decode("utf-8") + assert "should never be read" not in response.content.decode("utf-8") + + @pytest.mark.parametrize("try_number", [1, 2]) + @mock.patch("airflow.api_fastapi.core_api.routes.public.log.TaskLogReader") + def test_get_logs_for_handler_without_read_method(self, mock_log_reader, try_number): + type(mock_log_reader.return_value).supports_read = PropertyMock(return_value=False) + + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": False}) + + # check guessing + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}", + params={"token": token}, + headers={"Content-Type": "application/jso"}, + ) + assert 400 == response.status_code + assert "Task log handler does not support read logs." in response.content.decode("utf-8") + + def test_bad_signature_raises(self): + token = {"download_logs": False} + + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", + params={"token": token}, + headers={"Accept": "application/json"}, + ) + # assert response.status_code == 400 + assert response.json() == {"detail": "Bad Signature. Please use only the tokens provided by the API."} + + def test_raises_404_for_invalid_dag_run_id(self): + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/NO_DAG_RUN/" # invalid run_id + f"taskInstances/{self.TASK_ID}/logs/1?", + headers={"Accept": "application/json"}, + ) + assert response.status_code == 404 + assert response.json() == {"detail": "TaskInstance not found"} + + def test_should_raise_404_when_missing_map_index_param_for_mapped_task(self): + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.MAPPED_TASK_ID}/logs/1", + params={"token": token}, + headers={"Accept": "text/plain"}, + ) + assert response.status_code == 404 + assert response.json()["detail"] == "TaskInstance not found" + + def test_should_raise_404_when_filtering_on_map_index_for_unmapped_task(self): + key = self.app.state.secret_key + serializer = URLSafeSerializer(key) + token = serializer.dumps({"download_logs": True}) + + response = self.client.get( + f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", + params={"token": token, "map_index": 0}, + headers={"Accept": "text/plain"}, + ) + assert response.status_code == 404 + assert response.json()["detail"] == "TaskInstance not found" From 613db8538d5ca83b8204c982eb16e99ea25554e2 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 21 Nov 2024 17:23:56 +0530 Subject: [PATCH 09/15] Remove unwanted code --- .../core_api/datamodels/task_instances.py | 7 - .../core_api/routes/public/task_instances.py | 1 - .../routes/public/test_task_instances.py | 351 ------------------ 3 files changed, 359 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow/api_fastapi/core_api/datamodels/task_instances.py index 294f540579050..4712df3273a4e 100644 --- a/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -150,10 +150,3 @@ class TaskInstanceHistoryCollectionResponse(BaseModel): task_instances: list[TaskInstanceHistoryResponse] total_entries: int - - -class TaskInstancesLogResponseObject(BaseModel): - """Log serializer for responses.""" - - content: str - continuation_token: str | None diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index b879663052ebb..f4769a981b882 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -19,7 +19,6 @@ from typing import Annotated, Literal -# from flask import Response from fastapi import Depends, HTTPException, Request, status from sqlalchemy.orm import Session, joinedload from sqlalchemy.sql import select diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index b652a72eeb5ca..b3b4d0ffb1b50 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -17,35 +17,23 @@ from __future__ import annotations -import copy import datetime as dt import itertools -import logging.config import os -import sys from unittest import mock -from unittest.mock import PropertyMock import pendulum import pytest -from fastapi.testclient import TestClient -from itsdangerous.url_safe import URLSafeSerializer -from airflow.api_fastapi.app import create_app -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG -from airflow.decorators import task from airflow.jobs.job import Job from airflow.jobs.triggerer_job_runner import TriggererJobRunner from airflow.models import DagRun, TaskInstance from airflow.models.baseoperator import BaseOperator -from airflow.models.dag import DAG from airflow.models.dagbag import DagBag from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.models.taskmap import TaskMap from airflow.models.trigger import Trigger -from airflow.operators.empty import EmptyOperator -from airflow.utils import timezone from airflow.utils.platform import getuser from airflow.utils.state import State, TaskInstanceState from airflow.utils.timezone import datetime @@ -1683,342 +1671,3 @@ def test_raises_404_for_nonexistent_task_instance(self, test_client, session): assert response.json() == { "detail": "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `nonexistent_task`, try_number: `0` and map_index: `-1` was not found" } - - -class TestTaskInstancesLog: - DAG_ID = "dag_for_testing_log_endpoint" - RUN_ID = "dag_run_id_for_testing_log_endpoint" - TASK_ID = "task_for_testing_log_endpoint" - MAPPED_TASK_ID = "mapped_task_for_testing_log_endpoint" - TRY_NUMBER = 1 - - default_time = "2020-06-10T20:00:00+00:00" - - @pytest.fixture(autouse=True) - def setup_attrs(self, configure_loggers, dag_maker, session) -> None: - self.app = create_app() - self.client = TestClient(self.app) - # Make sure that the configure_logging is not cached - self.old_modules = dict(sys.modules) - - with dag_maker(self.DAG_ID, start_date=timezone.parse(self.default_time), session=session) as dag: - EmptyOperator(task_id=self.TASK_ID) - - @task(task_id=self.MAPPED_TASK_ID) - def add_one(x: int): - return x + 1 - - add_one.expand(x=[1, 2, 3]) - - dr = dag_maker.create_dagrun( - run_id=self.RUN_ID, - run_type=DagRunType.SCHEDULED, - logical_date=timezone.parse(self.default_time), - start_date=timezone.parse(self.default_time), - ) - - self.app.state.dag_bag.bag_dag(dag) - - # Add dummy dag for checking picking correct log with same task_id and different dag_id case. - with dag_maker( - f"{self.DAG_ID}_copy", start_date=timezone.parse(self.default_time), session=session - ) as dummy_dag: - EmptyOperator(task_id=self.TASK_ID) - dr2 = dag_maker.create_dagrun( - run_id=self.RUN_ID, - run_type=DagRunType.SCHEDULED, - logical_date=timezone.parse(self.default_time), - start_date=timezone.parse(self.default_time), - ) - self.app.state.dag_bag.bag_dag(dummy_dag) - - for ti in dr.task_instances: - ti.try_number = 1 - ti.hostname = "localhost" - session.merge(ti) - for ti in dr2.task_instances: - ti.try_number = 1 - ti.hostname = "localhost" - session.merge(ti) - session.flush() - dag.clear() - dummy_dag.clear() - for ti in dr.task_instances: - ti.try_number = 2 - ti.hostname = "localhost" - session.merge(ti) - for ti in dr2.task_instances: - ti.try_number = 2 - ti.hostname = "localhost" - session.merge(ti) - session.flush() - - @pytest.fixture - def configure_loggers(self, tmp_path, create_log_template): - self.log_dir = tmp_path - - # TASK_ID - dir_path = tmp_path / f"dag_id={self.DAG_ID}" / f"run_id={self.RUN_ID}" / f"task_id={self.TASK_ID}" - dir_path.mkdir(parents=True) - - log = dir_path / "attempt=1.log" - log.write_text("Log for testing.") - - # try number 2 - log = dir_path / "attempt=2.log" - log.write_text("Log for testing 2.") - - # MAPPED_TASK_ID - for map_index in range(3): - dir_path = ( - tmp_path - / f"dag_id={self.DAG_ID}" - / f"run_id={self.RUN_ID}" - / f"task_id={self.MAPPED_TASK_ID}" - / f"map_index={map_index}" - ) - - dir_path.mkdir(parents=True) - - log = dir_path / "attempt=1.log" - log.write_text("Log for testing.") - - # try number 2 - log = dir_path / "attempt=2.log" - log.write_text("Log for testing 2.") - - # Create a custom logging configuration - logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG) - logging_config["handlers"]["task"]["base_log_folder"] = self.log_dir - - logging.config.dictConfig(logging_config) - - yield - - logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) - - def teardown_method(self): - clear_db_runs() - - @pytest.mark.parametrize("try_number", [1, 2]) - def test_should_respond_200_json(self, try_number): - key = self.app.state.secret_key - serializer = URLSafeSerializer(key) - token = serializer.dumps({"download_logs": False}) - response = self.client.get( - f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}", - params={"token": token}, - headers={"Accept": "application/json"}, - ) - expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log" - log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert "[('localhost'," in response.json()["content"] - assert f"*** Found local files:\\n*** * {expected_filename}\\n" in response.json()["content"] - assert f"{log_content}')]" in response.json()["content"] - - info = serializer.loads(response.json()["continuation_token"]) - assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1 else 18} - assert 200 == response.status_code - - @pytest.mark.parametrize( - "request_url, expected_filename, extra_query_string, try_number", - [ - ( - f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1", - f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log", - {}, - 1, - ), - ( - f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1", - f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log", - {"map_index": 0}, - 1, - ), - # try_number 2 - ( - f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2", - f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log", - {}, - 2, - ), - ( - f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2", - f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log", - {"map_index": 0}, - 2, - ), - ], - ) - def test_should_respond_200_text_plain( - self, request_url, expected_filename, extra_query_string, try_number - ): - expected_filename = expected_filename.replace("LOG_DIR", str(self.log_dir)) - - key = self.app.state.secret_key - serializer = URLSafeSerializer(key) - token = serializer.dumps({"download_logs": True}) - - response = self.client.get( - request_url, - params={"token": token, **extra_query_string}, - headers={"Accept": "text/plain"}, - ) - assert 200 == response.status_code - - log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert "localhost\n" in response.content.decode("utf-8") - assert f"*** Found local files:\n*** * {expected_filename}\n" in response.content.decode("utf-8") - assert f"{log_content}\n" in response.content.decode("utf-8") - - @pytest.mark.parametrize( - "request_url, expected_filename, extra_query_string, try_number", - [ - ( - f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/1", - f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=1.log", - {}, - 1, - ), - ( - f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/1", - f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=1.log", - {"map_index": 0}, - 1, - ), - ( - f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/logs/2", - f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={TASK_ID}/attempt=2.log", - {}, - 2, - ), - ( - f"public/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{MAPPED_TASK_ID}/logs/2", - f"LOG_DIR/dag_id={DAG_ID}/run_id={RUN_ID}/task_id={MAPPED_TASK_ID}/map_index=0/attempt=2.log", - {"map_index": 0}, - 2, - ), - ], - ) - def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_query_string, try_number): - expected_filename = expected_filename.replace("LOG_DIR", str(self.log_dir)) - - # Recreate DAG without tasks - dagbag = self.app.state.dag_bag - dag = DAG(self.DAG_ID, schedule=None, start_date=timezone.parse(self.default_time)) - del dagbag.dags[self.DAG_ID] - dagbag.bag_dag(dag=dag) - - key = self.app.state.secret_key - serializer = URLSafeSerializer(key) - token = serializer.dumps({"download_logs": True}) - - response = self.client.get( - request_url, - params={"token": token, **extra_query_string}, - headers={"Accept": "text/plain"}, - ) - - assert 200 == response.status_code - - log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert "localhost\n" in response.content.decode("utf-8") - assert f"*** Found local files:\n*** * {expected_filename}\n" in response.content.decode("utf-8") - assert f"{log_content}\n" in response.content.decode("utf-8") - - @pytest.mark.parametrize("try_number", [1, 2]) - def test_get_logs_response_with_ti_equal_to_none(self, try_number): - key = self.app.state.secret_key - serializer = URLSafeSerializer(key) - token = serializer.dumps({"download_logs": True}) - - response = self.client.get( - f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/Invalid-Task-ID/logs/{try_number}", - params={"token": token}, - ) - assert response.status_code == 404 - assert response.json() == {"detail": "TaskInstance not found"} - - @pytest.mark.parametrize("try_number", [1, 2]) - def test_get_logs_with_metadata_as_download_large_file(self, try_number): - with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") as read_mock: - first_return = ([[("", "1st line")]], [{}]) - second_return = ([[("", "2nd line")]], [{"end_of_log": False}]) - third_return = ([[("", "3rd line")]], [{"end_of_log": True}]) - fourth_return = ([[("", "should never be read")]], [{"end_of_log": True}]) - read_mock.side_effect = [first_return, second_return, third_return, fourth_return] - - response = self.client.get( - f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/" - f"taskInstances/{self.TASK_ID}/logs/{try_number}?full_content=True", - headers={"Accept": "text/plain"}, - ) - - assert "1st line" in response.content.decode("utf-8") - assert "2nd line" in response.content.decode("utf-8") - assert "3rd line" in response.content.decode("utf-8") - assert "should never be read" not in response.content.decode("utf-8") - - @pytest.mark.parametrize("try_number", [1, 2]) - @mock.patch("airflow.api_fastapi.core_api.routes.public.task_instances.TaskLogReader") - def test_get_logs_for_handler_without_read_method(self, mock_log_reader, try_number): - type(mock_log_reader.return_value).supports_read = PropertyMock(return_value=False) - - key = self.app.state.secret_key - serializer = URLSafeSerializer(key) - token = serializer.dumps({"download_logs": False}) - - # check guessing - response = self.client.get( - f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/{try_number}", - params={"token": token}, - headers={"Content-Type": "application/jso"}, - ) - assert 400 == response.status_code - assert "Task log handler does not support read logs." in response.content.decode("utf-8") - - def test_bad_signature_raises(self): - token = {"download_logs": False} - - response = self.client.get( - f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", - params={"token": token}, - headers={"Accept": "application/json"}, - ) - # assert response.status_code == 400 - assert response.json() == {"detail": "Bad Signature. Please use only the tokens provided by the API."} - - def test_raises_404_for_invalid_dag_run_id(self): - response = self.client.get( - f"public/dags/{self.DAG_ID}/dagRuns/NO_DAG_RUN/" # invalid run_id - f"taskInstances/{self.TASK_ID}/logs/1?", - headers={"Accept": "application/json"}, - ) - assert response.status_code == 404 - assert response.json() == {"detail": "TaskInstance not found"} - - def test_should_raise_404_when_missing_map_index_param_for_mapped_task(self): - key = self.app.state.secret_key - serializer = URLSafeSerializer(key) - token = serializer.dumps({"download_logs": True}) - - response = self.client.get( - f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.MAPPED_TASK_ID}/logs/1", - params={"token": token}, - headers={"Accept": "text/plain"}, - ) - assert response.status_code == 404 - assert response.json()["detail"] == "TaskInstance not found" - - def test_should_raise_404_when_filtering_on_map_index_for_unmapped_task(self): - key = self.app.state.secret_key - serializer = URLSafeSerializer(key) - token = serializer.dumps({"download_logs": True}) - - response = self.client.get( - f"public/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", - params={"token": token, "map_index": 0}, - headers={"Accept": "text/plain"}, - ) - assert response.status_code == 404 - assert response.json()["detail"] == "TaskInstance not found" From 2a1715a85e87e2f8b5ec8837c7b5b3ab1a4879c0 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 21 Nov 2024 17:27:22 +0530 Subject: [PATCH 10/15] Address PR comments --- airflow/api_fastapi/core_api/routes/public/log.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/api_fastapi/core_api/routes/public/log.py b/airflow/api_fastapi/core_api/routes/public/log.py index 1a0fa458ee2db..93503efa25ac6 100644 --- a/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow/api_fastapi/core_api/routes/public/log.py @@ -46,7 +46,6 @@ response_model=TaskInstancesLogResponse, ) def get_log( - *, dag_id: str, dag_run_id: str, task_id: str, From b0b2dc94569b4a436b8ae124cb3af100559187d1 Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Thu, 21 Nov 2024 17:51:22 +0530 Subject: [PATCH 11/15] Update airflow/api_fastapi/core_api/routes/public/log.py Co-authored-by: Kalyan R --- airflow/api_fastapi/core_api/routes/public/log.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/core_api/routes/public/log.py b/airflow/api_fastapi/core_api/routes/public/log.py index 93503efa25ac6..35e7feffe5b58 100644 --- a/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow/api_fastapi/core_api/routes/public/log.py @@ -57,7 +57,7 @@ def get_log( map_index: int = -1, token: str | None = None, ): - """Get logs for specific task instance.""" + """Get logs for a specific task instance.""" if not token: metadata = {} else: From c374d767bae66e04a9a2341e1c6f2c1692caa0ab Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Thu, 21 Nov 2024 17:51:57 +0530 Subject: [PATCH 12/15] Update airflow/api_fastapi/core_api/datamodels/log.py --- airflow/api_fastapi/core_api/datamodels/log.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/log.py b/airflow/api_fastapi/core_api/datamodels/log.py index 282dc81972ed9..f3de173896d77 100644 --- a/airflow/api_fastapi/core_api/datamodels/log.py +++ b/airflow/api_fastapi/core_api/datamodels/log.py @@ -25,7 +25,3 @@ class TaskInstancesLogResponse(BaseModel): content: str continuation_token: str | None - @property - def text_format(self): - # convert all config sections to text - return "".join(list(self.content)) From b65270dd72982f3a55bbd20dc9601e71779bf0a3 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 21 Nov 2024 18:10:28 +0530 Subject: [PATCH 13/15] Fix static check --- airflow/api_fastapi/core_api/datamodels/log.py | 1 - airflow/api_fastapi/core_api/openapi/v1-generated.yaml | 2 +- airflow/ui/openapi-gen/queries/prefetch.ts | 2 +- airflow/ui/openapi-gen/queries/queries.ts | 2 +- airflow/ui/openapi-gen/queries/suspense.ts | 2 +- airflow/ui/openapi-gen/requests/services.gen.ts | 2 +- 6 files changed, 5 insertions(+), 6 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/log.py b/airflow/api_fastapi/core_api/datamodels/log.py index f3de173896d77..79b341d6e3808 100644 --- a/airflow/api_fastapi/core_api/datamodels/log.py +++ b/airflow/api_fastapi/core_api/datamodels/log.py @@ -24,4 +24,3 @@ class TaskInstancesLogResponse(BaseModel): content: str continuation_token: str | None - diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index d061f2659eef8..ce39d2b81cbd0 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -4638,7 +4638,7 @@ paths: tags: - Task Instance summary: Get Log - description: Get logs for specific task instance. + description: Get logs for a specific task instance. operationId: get_log parameters: - name: dag_id diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 580419d8cf043..35f0b63e9ecc1 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1600,7 +1600,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstanceTryDetails = ( }); /** * Get Log - * Get logs for specific task instance. + * Get logs for a specific task instance. * @param data The data for the request. * @param data.dagId * @param data.dagRunId diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 74136f42ce468..1140f3941f952 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1907,7 +1907,7 @@ export const useTaskInstanceServiceGetMappedTaskInstanceTryDetails = < }); /** * Get Log - * Get logs for specific task instance. + * Get logs for a specific task instance. * @param data The data for the request. * @param data.dagId * @param data.dagRunId diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index e84ae408555a2..bd8feeba3b1c0 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1889,7 +1889,7 @@ export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsSuspense = < }); /** * Get Log - * Get logs for specific task instance. + * Get logs for a specific task instance. * @param data The data for the request. * @param data.dagId * @param data.dagRunId diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index ef3551098a475..3ead5a093003a 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -2191,7 +2191,7 @@ export class TaskInstanceService { /** * Get Log - * Get logs for specific task instance. + * Get logs for a specific task instance. * @param data The data for the request. * @param data.dagId * @param data.dagRunId From 320b0227afb54c0acae29b1806e63589363aa25a Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Thu, 21 Nov 2024 20:24:50 +0530 Subject: [PATCH 14/15] Address PR comments --- airflow/api_fastapi/core_api/routes/public/log.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/log.py b/airflow/api_fastapi/core_api/routes/public/log.py index 35e7feffe5b58..156230c6da0c7 100644 --- a/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow/api_fastapi/core_api/routes/public/log.py @@ -68,6 +68,9 @@ def get_log( status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." ) + if task_try_number <= 0: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "task_try_number must be a positive integer") + if metadata.get("download_logs") and metadata["download_logs"]: full_content = True @@ -117,10 +120,9 @@ def get_log( logs: Any if accept == Mimetype.JSON or accept == Mimetype.ANY: # default logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) - logs = logs[0] if task_try_number is not None else logs # we must have token here, so we can safely ignore it token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment] - return TaskInstancesLogResponse(continuation_token=token, content=str(logs)).model_dump() + return TaskInstancesLogResponse(continuation_token=token, content=str(logs[0])).model_dump() # text/plain. Stream logs = task_log_reader.read_log_stream(ti, task_try_number, metadata) return Response(media_type=accept, content="".join(list(logs))) From f9bdf6e771080f428c7bb686aedf78ccfe145843 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Fri, 22 Nov 2024 12:24:51 +0530 Subject: [PATCH 15/15] Address PR comments --- .../core_api/openapi/v1-generated.yaml | 13 +++++-- .../api_fastapi/core_api/routes/public/log.py | 36 ++++++++++++++----- airflow/ui/openapi-gen/queries/common.ts | 6 ++-- airflow/ui/openapi-gen/queries/prefetch.ts | 10 +++--- airflow/ui/openapi-gen/queries/queries.ts | 10 +++--- airflow/ui/openapi-gen/queries/suspense.ts | 10 +++--- .../ui/openapi-gen/requests/services.gen.ts | 6 ++-- airflow/ui/openapi-gen/requests/types.gen.ts | 4 +-- 8 files changed, 60 insertions(+), 35 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index ce39d2b81cbd0..969057d1034fc 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -4633,7 +4633,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}: + /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}: get: tags: - Task Instance @@ -4659,12 +4659,13 @@ paths: schema: type: string title: Task Id - - name: task_try_number + - name: try_number in: path required: true schema: type: integer - title: Task Try Number + exclusiveMinimum: 0 + title: Try Number - name: full_content in: query required: false @@ -4705,6 +4706,12 @@ paths: application/json: schema: $ref: '#/components/schemas/TaskInstancesLogResponse' + text/plain: + schema: + type: string + example: 'content + + ' '401': content: application/json: diff --git a/airflow/api_fastapi/core_api/routes/public/log.py b/airflow/api_fastapi/core_api/routes/public/log.py index 156230c6da0c7..bddf9c85692a1 100644 --- a/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow/api_fastapi/core_api/routes/public/log.py @@ -17,10 +17,12 @@ from __future__ import annotations +import textwrap from typing import Annotated, Any from fastapi import Depends, HTTPException, Request, Response, status from itsdangerous import BadSignature, URLSafeSerializer +from pydantic import PositiveInt from sqlalchemy.orm import Session, joinedload from sqlalchemy.sql import select @@ -39,17 +41,36 @@ tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" ) +text_example_response_for_get_log = { + Mimetype.TEXT: { + "schema": { + "type": "string", + "example": textwrap.dedent( + """\ + content + """ + ), + } + } +} + @task_instances_log_router.get( - "/{task_id}/logs/{task_try_number}", - responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + "/{task_id}/logs/{try_number}", + responses={ + **create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + status.HTTP_200_OK: { + "description": "Successful Response", + "content": text_example_response_for_get_log, + }, + }, response_model=TaskInstancesLogResponse, ) def get_log( dag_id: str, dag_run_id: str, task_id: str, - task_try_number: int, + try_number: PositiveInt, accept: HeaderAcceptJsonOrText, request: Request, session: Annotated[Session, Depends(get_session)], @@ -68,9 +89,6 @@ def get_log( status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API." ) - if task_try_number <= 0: - raise HTTPException(status.HTTP_400_BAD_REQUEST, "task_try_number must be a positive integer") - if metadata.get("download_logs") and metadata["download_logs"]: full_content = True @@ -102,7 +120,7 @@ def get_log( TaskInstanceHistory.dag_id == dag_id, TaskInstanceHistory.run_id == dag_run_id, TaskInstanceHistory.map_index == map_index, - TaskInstanceHistory.try_number == task_try_number, + TaskInstanceHistory.try_number == try_number, ) ti = session.scalar(query) @@ -119,10 +137,10 @@ def get_log( logs: Any if accept == Mimetype.JSON or accept == Mimetype.ANY: # default - logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) + logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata) # we must have token here, so we can safely ignore it token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment] return TaskInstancesLogResponse(continuation_token=token, content=str(logs[0])).model_dump() # text/plain. Stream - logs = task_log_reader.read_log_stream(ti, task_try_number, metadata) + logs = task_log_reader.read_log_stream(ti, try_number, metadata) return Response(media_type=accept, content="".join(list(logs))) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index fa3052dd0aa00..21dd3c4efd270 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1196,8 +1196,8 @@ export const UseTaskInstanceServiceGetLogKeyFn = ( fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }: { accept?: "application/json" | "text/plain" | "*/*"; dagId: string; @@ -1205,8 +1205,8 @@ export const UseTaskInstanceServiceGetLogKeyFn = ( fullContent?: boolean; mapIndex?: number; taskId: string; - taskTryNumber: number; token?: string; + tryNumber: number; }, queryKey?: Array, ) => [ @@ -1219,8 +1219,8 @@ export const UseTaskInstanceServiceGetLogKeyFn = ( fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }, ]), ]; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 35f0b63e9ecc1..ede67e5b80dbb 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1605,7 +1605,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstanceTryDetails = ( * @param data.dagId * @param data.dagRunId * @param data.taskId - * @param data.taskTryNumber + * @param data.tryNumber * @param data.fullContent * @param data.mapIndex * @param data.token @@ -1622,8 +1622,8 @@ export const prefetchUseTaskInstanceServiceGetLog = ( fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }: { accept?: "application/json" | "text/plain" | "*/*"; dagId: string; @@ -1631,8 +1631,8 @@ export const prefetchUseTaskInstanceServiceGetLog = ( fullContent?: boolean; mapIndex?: number; taskId: string; - taskTryNumber: number; token?: string; + tryNumber: number; }, ) => queryClient.prefetchQuery({ @@ -1643,8 +1643,8 @@ export const prefetchUseTaskInstanceServiceGetLog = ( fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }), queryFn: () => TaskInstanceService.getLog({ @@ -1654,8 +1654,8 @@ export const prefetchUseTaskInstanceServiceGetLog = ( fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }), }); /** diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 1140f3941f952..dfde42df7322e 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1912,7 +1912,7 @@ export const useTaskInstanceServiceGetMappedTaskInstanceTryDetails = < * @param data.dagId * @param data.dagRunId * @param data.taskId - * @param data.taskTryNumber + * @param data.tryNumber * @param data.fullContent * @param data.mapIndex * @param data.token @@ -1932,8 +1932,8 @@ export const useTaskInstanceServiceGetLog = < fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }: { accept?: "application/json" | "text/plain" | "*/*"; dagId: string; @@ -1941,8 +1941,8 @@ export const useTaskInstanceServiceGetLog = < fullContent?: boolean; mapIndex?: number; taskId: string; - taskTryNumber: number; token?: string; + tryNumber: number; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, @@ -1956,8 +1956,8 @@ export const useTaskInstanceServiceGetLog = < fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }, queryKey, ), @@ -1969,8 +1969,8 @@ export const useTaskInstanceServiceGetLog = < fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }) as TData, ...options, }); diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index bd8feeba3b1c0..ec5f27408005f 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1894,7 +1894,7 @@ export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsSuspense = < * @param data.dagId * @param data.dagRunId * @param data.taskId - * @param data.taskTryNumber + * @param data.tryNumber * @param data.fullContent * @param data.mapIndex * @param data.token @@ -1914,8 +1914,8 @@ export const useTaskInstanceServiceGetLogSuspense = < fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }: { accept?: "application/json" | "text/plain" | "*/*"; dagId: string; @@ -1923,8 +1923,8 @@ export const useTaskInstanceServiceGetLogSuspense = < fullContent?: boolean; mapIndex?: number; taskId: string; - taskTryNumber: number; token?: string; + tryNumber: number; }, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, @@ -1938,8 +1938,8 @@ export const useTaskInstanceServiceGetLogSuspense = < fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }, queryKey, ), @@ -1951,8 +1951,8 @@ export const useTaskInstanceServiceGetLogSuspense = < fullContent, mapIndex, taskId, - taskTryNumber, token, + tryNumber, }) as TData, ...options, }); diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 3ead5a093003a..ef1b569364c1c 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -2196,7 +2196,7 @@ export class TaskInstanceService { * @param data.dagId * @param data.dagRunId * @param data.taskId - * @param data.taskTryNumber + * @param data.tryNumber * @param data.fullContent * @param data.mapIndex * @param data.token @@ -2207,12 +2207,12 @@ export class TaskInstanceService { public static getLog(data: GetLogData): CancelablePromise { return __request(OpenAPI, { method: "GET", - url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}", path: { dag_id: data.dagId, dag_run_id: data.dagRunId, task_id: data.taskId, - task_try_number: data.taskTryNumber, + try_number: data.tryNumber, }, headers: { accept: data.accept, diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index ef17f755e7f21..11bcdda75dac6 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1647,8 +1647,8 @@ export type GetLogData = { fullContent?: boolean; mapIndex?: number; taskId: string; - taskTryNumber: number; token?: string | null; + tryNumber: number; }; export type GetLogResponse = TaskInstancesLogResponse; @@ -3401,7 +3401,7 @@ export type $OpenApiTs = { }; }; }; - "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}": { + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}": { get: { req: GetLogData; res: {