Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from pydantic import BaseModel


class TaskInstancesLogResponse(BaseModel):
"""Log serializer for responses."""

content: str
continuation_token: str | None
119 changes: 119 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4633,6 +4633,109 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}:
get:
tags:
- Task Instance
summary: Get Log
description: Get logs for a specific task instance.
operationId: get_log
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: try_number
in: path
required: true
schema:
type: integer
exclusiveMinimum: 0
title: Try Number
- name: full_content
in: query
required: false
schema:
type: boolean
default: false
title: Full Content
- name: map_index
in: query
required: false
schema:
type: integer
default: -1
title: Map Index
- name: token
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Token
- name: accept
in: header
required: false
schema:
type: string
enum:
- application/json
- text/plain
- '*/*'
default: '*/*'
title: Accept
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstancesLogResponse'
text/plain:
schema:
type: string
example: 'content

'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down Expand Up @@ -7237,6 +7340,22 @@ components:
type: object
title: TaskInstancesBatchBody
description: Task Instance body for get batch.
TaskInstancesLogResponse:
properties:
content:
type: string
title: Content
continuation_token:
anyOf:
- type: string
- type: 'null'
title: Continuation Token
type: object
required:
- content
- continuation_token
title: TaskInstancesLogResponse
description: Log serializer for responses.
TaskOutletAssetReference:
properties:
dag_id:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router
from airflow.api_fastapi.core_api.routes.public.import_error import import_error_router
from airflow.api_fastapi.core_api.routes.public.log import task_instances_log_router
from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router
from airflow.api_fastapi.core_api.routes.public.plugins import plugins_router
from airflow.api_fastapi.core_api.routes.public.pools import pools_router
Expand Down Expand Up @@ -67,6 +68,7 @@
authenticated_router.include_router(tasks_router)
authenticated_router.include_router(variables_router)
authenticated_router.include_router(xcom_router)
authenticated_router.include_router(task_instances_log_router)

# Include authenticated router in public router
public_router.include_router(authenticated_router)
Expand Down
146 changes: 146 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import textwrap
from typing import Annotated, Any

from fastapi import Depends, HTTPException, Request, Response, status
from itsdangerous import BadSignature, URLSafeSerializer
from pydantic import PositiveInt
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.sql import select

Comment thread
utkarsharma2 marked this conversation as resolved.
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.types import Mimetype
from airflow.api_fastapi.core_api.datamodels.log import TaskInstancesLogResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.exceptions import TaskNotFound
from airflow.models import TaskInstance, Trigger
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.utils.log.log_reader import TaskLogReader

task_instances_log_router = AirflowRouter(
tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances"
)

text_example_response_for_get_log = {
Mimetype.TEXT: {
"schema": {
"type": "string",
"example": textwrap.dedent(
"""\
content
"""
),
}
}
}


@task_instances_log_router.get(
"/{task_id}/logs/{try_number}",
responses={
**create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
status.HTTP_200_OK: {
"description": "Successful Response",
"content": text_example_response_for_get_log,
},
},
response_model=TaskInstancesLogResponse,
)
def get_log(
dag_id: str,
dag_run_id: str,
task_id: str,
try_number: PositiveInt,
accept: HeaderAcceptJsonOrText,
request: Request,
session: Annotated[Session, Depends(get_session)],
full_content: bool = False,
map_index: int = -1,
token: str | None = None,
):
"""Get logs for a specific task instance."""
if not token:
metadata = {}
else:
try:
metadata = URLSafeSerializer(request.app.state.secret_key).loads(token)
except BadSignature:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "Bad Signature. Please use only the tokens provided by the API."
Comment thread
pierrejeambrun marked this conversation as resolved.
)

if metadata.get("download_logs") and metadata["download_logs"]:
full_content = True

if full_content:
metadata["download_logs"] = True
else:
metadata["download_logs"] = False
Comment thread
pierrejeambrun marked this conversation as resolved.

task_log_reader = TaskLogReader()

if not task_log_reader.supports_read:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Task log handler does not support read logs.")

query = (
select(TaskInstance)
.where(
TaskInstance.task_id == task_id,
TaskInstance.dag_id == dag_id,
TaskInstance.run_id == dag_run_id,
TaskInstance.map_index == map_index,
)
.join(TaskInstance.dag_run)
.options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
)
ti = session.scalar(query)
if ti is None:
query = select(TaskInstanceHistory).where(
TaskInstanceHistory.task_id == task_id,
TaskInstanceHistory.dag_id == dag_id,
TaskInstanceHistory.run_id == dag_run_id,
TaskInstanceHistory.map_index == map_index,
TaskInstanceHistory.try_number == try_number,
)
ti = session.scalar(query)

if ti is None:
metadata["end_of_log"] = True
raise HTTPException(status.HTTP_404_NOT_FOUND, "TaskInstance not found")

dag = request.app.state.dag_bag.get_dag(dag_id)
if dag:
try:
ti.task = dag.get_task(ti.task_id)
except TaskNotFound:
pass

logs: Any
if accept == Mimetype.JSON or accept == Mimetype.ANY: # default
logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
# we must have token here, so we can safely ignore it
token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment]
return TaskInstancesLogResponse(continuation_token=token, content=str(logs[0])).model_dump()
# text/plain. Stream
logs = task_log_reader.read_log_stream(ti, try_number, metadata)
return Response(media_type=accept, content="".join(list(logs)))
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,50 @@ export const UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn = (
useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]),
];
export type TaskInstanceServiceGetLogDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getLog>
>;
export type TaskInstanceServiceGetLogQueryResult<
TData = TaskInstanceServiceGetLogDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetLogKey = "TaskInstanceServiceGetLog";
export const UseTaskInstanceServiceGetLogKeyFn = (
{
accept,
dagId,
dagRunId,
fullContent,
mapIndex,
taskId,
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
dagId: string;
dagRunId: string;
fullContent?: boolean;
mapIndex?: number;
taskId: string;
token?: string;
tryNumber: number;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetLogKey,
...(queryKey ?? [
{
accept,
dagId,
dagRunId,
fullContent,
mapIndex,
taskId,
token,
tryNumber,
},
]),
];
export type TaskServiceGetTasksDefaultResponse = Awaited<
ReturnType<typeof TaskService.getTasks>
>;
Expand Down
Loading