Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
c8047070343c2f31da70d1cf2fc286ec74aff921b964c5a662c9ac5eb8d07eac
1a9b0180a1d8fecd9236fc1d897636e3bbe55fd0d0d24b9c530673fa78b4dc08
3,999 changes: 2,035 additions & 1,964 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``1b2c3d4e5f6g`` (head) | ``ab6dc0c82d0e`` | ``3.2.0`` | Add length to dag_bundle_team.dag_bundle_name. |
| ``5cc8117e9285`` (head) | ``1b2c3d4e5f6g`` | ``3.2.0`` | Add Human In the Loop Detail History table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``1b2c3d4e5f6g`` | ``ab6dc0c82d0e`` | ``3.2.0`` | Add length to dag_bundle_team.dag_bundle_name. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``ab6dc0c82d0e`` | ``15d84ca19038`` | ``3.2.0`` | Change ``serialized_dag`` data column to JSONB for |
| | | | PostgreSQL. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.task_instances import TaskInstanceResponse
from airflow.sdk import Param


class UpdateHITLDetailPayload(BaseModel):
Expand All @@ -50,10 +49,8 @@ class HITLUser(BaseModel):
name: str


class HITLDetail(BaseModel):
"""Schema for Human-in-the-loop detail."""

task_instance: TaskInstanceResponse
class BaseHITLDetail(BaseModel):
"""The common part within HITLDetail and HITLDetailHisotry."""

# User Request Detail
options: list[str] = Field(min_length=1)
Expand All @@ -77,7 +74,13 @@ class HITLDetail(BaseModel):
@classmethod
def get_params(cls, params: dict[str, Any]) -> dict[str, Any]:
"""Convert params attribute to dict representation."""
return {k: v.dump() if isinstance(v, Param) else v for k, v in params.items()}
return {k: v.dump() if getattr(v, "dump", None) else v for k, v in params.items()}


class HITLDetail(BaseHITLDetail):
"""Schema for Human-in-the-loop detail."""

task_instance: TaskInstanceResponse


class HITLDetailCollection(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime
from typing import Annotated

from pydantic import (
AliasPath,
BeforeValidator,
Field,
)

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
from airflow.api_fastapi.core_api.datamodels.hitl import BaseHITLDetail
from airflow.utils.state import TaskInstanceState


class HITLDetailHisotry(BaseHITLDetail):
Comment thread
Lee-W marked this conversation as resolved.
"""Schema for Human-in-the-loop detail history."""


class TaskInstanceHistoryResponse(BaseModel):
"""TaskInstanceHistory serializer for responses."""

task_id: str
dag_id: str

# todo: this should not be aliased; it's ambiguous with dag run's "id" - airflow 3.0
run_id: str = Field(alias="dag_run_id")

map_index: int
start_date: datetime | None
end_date: datetime | None
duration: float | None
state: TaskInstanceState | None
try_number: int
max_tries: int
task_display_name: str
dag_display_name: str = Field(validation_alias=AliasPath("dag_run", "dag_model", "dag_display_name"))
hostname: str | None
unixname: str | None
pool: str
pool_slots: int
queue: str | None
priority_weight: int | None
operator: str | None
custom_operator_name: str | None = Field(alias="operator_name")
queued_dttm: datetime | None = Field(alias="queued_when")
scheduled_dttm: datetime | None = Field(alias="scheduled_when")
pid: int | None
executor: str | None
executor_config: Annotated[str, BeforeValidator(str)]
dag_version: DagVersionResponse | None
hitl_detail: HITLDetailHisotry | None


class TaskInstanceHistoryCollectionResponse(BaseModel):
"""TaskInstanceHistory Collection serializer for responses."""

task_instances: list[TaskInstanceHistoryResponse]
total_entries: int
Original file line number Diff line number Diff line change
Expand Up @@ -141,47 +141,6 @@ class TaskInstancesBatchBody(StrictBaseModel):
order_by: str | None = None


class TaskInstanceHistoryResponse(BaseModel):
"""TaskInstanceHistory serializer for responses."""

task_id: str
dag_id: str

# todo: this should not be aliased; it's ambiguous with dag run's "id" - airflow 3.0
run_id: str = Field(alias="dag_run_id")

map_index: int
start_date: datetime | None
end_date: datetime | None
duration: float | None
state: TaskInstanceState | None
try_number: int
max_tries: int
task_display_name: str
dag_display_name: str = Field(validation_alias=AliasPath("dag_run", "dag_model", "dag_display_name"))
hostname: str | None
unixname: str | None
pool: str
pool_slots: int
queue: str | None
priority_weight: int | None
operator: str | None
custom_operator_name: str | None = Field(alias="operator_name")
queued_dttm: datetime | None = Field(alias="queued_when")
scheduled_dttm: datetime | None = Field(alias="scheduled_when")
pid: int | None
executor: str | None
executor_config: Annotated[str, BeforeValidator(str)]
dag_version: DagVersionResponse | None


class TaskInstanceHistoryCollectionResponse(BaseModel):
"""TaskInstanceHistory Collection serializer for responses."""

task_instances: list[TaskInstanceHistoryResponse]
total_entries: int


class ClearTaskInstancesBody(StrictBaseModel):
"""Request body for Clear Task Instances endpoint."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2005,8 +2005,6 @@ components:
description: DAG Run model for the Grid UI.
HITLDetail:
properties:
task_instance:
$ref: '#/components/schemas/TaskInstanceResponse'
options:
items:
type: string
Expand Down Expand Up @@ -2070,12 +2068,14 @@ components:
type: boolean
title: Response Received
default: false
task_instance:
$ref: '#/components/schemas/TaskInstanceResponse'
type: object
required:
- task_instance
- options
- subject
- created_at
- task_instance
title: HITLDetail
description: Schema for Human-in-the-loop detail.
HITLUser:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11170,8 +11170,6 @@ components:
description: Serializer for Plugin FastAPI root middleware responses.
HITLDetail:
properties:
task_instance:
$ref: '#/components/schemas/TaskInstanceResponse'
options:
items:
type: string
Expand Down Expand Up @@ -11235,12 +11233,14 @@ components:
type: boolean
title: Response Received
default: false
task_instance:
$ref: '#/components/schemas/TaskInstanceResponse'
type: object
required:
- task_instance
- options
- subject
- created_at
- task_instance
title: HITLDetail
description: Schema for Human-in-the-loop detail.
HITLDetailCollection:
Expand All @@ -11259,6 +11259,78 @@ components:
- total_entries
title: HITLDetailCollection
description: Schema for a collection of Human-in-the-loop details.
HITLDetailHisotry:
properties:
options:
items:
type: string
type: array
minItems: 1
title: Options
subject:
type: string
title: Subject
body:
anyOf:
- type: string
- type: 'null'
title: Body
defaults:
anyOf:
- items:
type: string
type: array
- type: 'null'
title: Defaults
multiple:
type: boolean
title: Multiple
default: false
params:
additionalProperties: true
type: object
title: Params
assigned_users:
items:
$ref: '#/components/schemas/HITLUser'
type: array
title: Assigned Users
created_at:
type: string
format: date-time
title: Created At
responded_by_user:
anyOf:
- $ref: '#/components/schemas/HITLUser'
- type: 'null'
responded_at:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Responded At
chosen_options:
anyOf:
- items:
type: string
type: array
- type: 'null'
title: Chosen Options
params_input:
additionalProperties: true
type: object
title: Params Input
response_received:
type: boolean
title: Response Received
default: false
type: object
required:
- options
- subject
- created_at
title: HITLDetailHisotry
description: Schema for Human-in-the-loop detail history.
HITLDetailResponse:
properties:
responded_by:
Expand Down Expand Up @@ -12146,6 +12218,10 @@ components:
anyOf:
- $ref: '#/components/schemas/DagVersionResponse'
- type: 'null'
hitl_detail:
anyOf:
- $ref: '#/components/schemas/HITLDetailHisotry'
- type: 'null'
type: object
required:
- task_id
Expand Down Expand Up @@ -12174,6 +12250,7 @@ components:
- executor
- executor_config
- dag_version
- hitl_detail
title: TaskInstanceHistoryResponse
description: TaskInstanceHistory serializer for responses.
TaskInstanceResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.common import BulkBody, BulkResponse
from airflow.api_fastapi.core_api.datamodels.task_instance_history import (
TaskInstanceHistoryCollectionResponse,
TaskInstanceHistoryResponse,
)
from airflow.api_fastapi.core_api.datamodels.task_instances import (
BulkTaskInstanceBody,
ClearTaskInstancesBody,
PatchTaskInstanceBody,
TaskDependencyCollectionResponse,
TaskInstanceCollectionResponse,
TaskInstanceHistoryCollectionResponse,
TaskInstanceHistoryResponse,
TaskInstanceResponse,
TaskInstancesBatchBody,
)
Expand Down Expand Up @@ -321,6 +323,7 @@ def _query(orm_object: Base) -> Select:
)
.options(joinedload(orm_object.dag_version))
.options(joinedload(orm_object.dag_run).options(joinedload(DagRun.dag_model)))
.options(joinedload(orm_object.hitl_detail))
)
return query

Expand Down Expand Up @@ -644,12 +647,16 @@ def get_task_instance_try_details(
"""Get task instance details by try number."""

def _query(orm_object: Base) -> TI | TIH | None:
query = select(orm_object).where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.try_number == task_try_number,
orm_object.map_index == map_index,
query = (
select(orm_object)
.where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.try_number == task_try_number,
orm_object.map_index == map_index,
)
.options(joinedload(orm_object.hitl_detail))
)

task_instance = session.scalar(query)
Expand Down
Loading
Loading