Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from collections.abc import Iterable
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from pydantic import AliasPath, AwareDatetime, Field, NonNegativeInt, model_validator

Expand Down Expand Up @@ -55,11 +55,23 @@ class DAGRunClearBody(StrictBaseModel):

dry_run: bool = True
only_failed: bool = False
only_new: bool = Field(
default=False,
description="Only queue newly added tasks in the latest DAG version without clearing existing tasks.",
)
run_on_latest_version: bool = Field(
default=False,
description="(Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run.",
)

@model_validator(mode="before")
@classmethod
def validate_model(cls, data: Any) -> Any:
"""Validate clear DAG run form."""
if data.get("only_new") and data.get("only_failed"):
raise ValueError("only_new and only_failed are mutually exclusive")
return data


class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
AliasPath,
AwareDatetime,
BeforeValidator,
Discriminator,
Field,
NonNegativeInt,
StringConstraints,
Tag,
ValidationError,
field_validator,
model_validator,
Expand All @@ -40,6 +42,13 @@
from airflow.utils.state import TaskInstanceState


class NewTaskResponse(BaseModel):
"""Lightweight response for new tasks that don't have TaskInstances yet."""

task_id: str
task_display_name: str


class TaskInstanceResponse(BaseModel):
"""TaskInstance serializer for responses."""

Expand Down Expand Up @@ -89,6 +98,28 @@ class TaskInstanceCollectionResponse(BaseModel):
total_entries: int


def _task_instance_discriminator(v: Any) -> str:
"""Discriminate between TaskInstanceResponse and NewTaskResponse in the union."""
Comment thread
pierrejeambrun marked this conversation as resolved.
if isinstance(v, NewTaskResponse):
return "new"
if isinstance(v, dict):
return "new" if "id" not in v else "full"
# ORM objects and TaskInstanceResponse instances
return "full"


class ClearTaskInstanceCollectionResponse(BaseModel):
"""Response for clear dag run dry run, which may contain new tasks without full TaskInstance data."""

task_instances: Iterable[
Annotated[
Annotated[TaskInstanceResponse, Tag("full")] | Annotated[NewTaskResponse, Tag("new")],
Discriminator(_task_instance_discriminator),
]
]
total_entries: int


class TaskDependencyResponse(BaseModel):
"""Task Dependency serializer for responses."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1987,7 +1987,7 @@ paths:
application/json:
schema:
anyOf:
- $ref: '#/components/schemas/TaskInstanceCollectionResponse'
- $ref: '#/components/schemas/ClearTaskInstanceCollectionResponse'
- $ref: '#/components/schemas/DAGRunResponse'
title: Response Clear Dag Run
'401':
Expand Down Expand Up @@ -9834,6 +9834,25 @@ components:
- action
- entities
title: BulkUpdateAction[VariableBody]
ClearTaskInstanceCollectionResponse:
properties:
task_instances:
items:
oneOf:
- $ref: '#/components/schemas/TaskInstanceResponse'
- $ref: '#/components/schemas/NewTaskResponse'
type: array
title: Task Instances
total_entries:
type: integer
title: Total Entries
type: object
required:
- task_instances
- total_entries
title: ClearTaskInstanceCollectionResponse
description: Response for clear dag run dry run, which may contain new tasks
without full TaskInstance data.
ClearTaskInstancesBody:
properties:
dry_run:
Expand Down Expand Up @@ -10620,6 +10639,12 @@ components:
type: boolean
title: Only Failed
default: false
only_new:
type: boolean
title: Only New
description: Only queue newly added tasks in the latest DAG version without
clearing existing tasks.
default: false
run_on_latest_version:
type: boolean
title: Run On Latest Version
Expand Down Expand Up @@ -12008,6 +12033,21 @@ components:
type: object
title: MaterializeAssetBody
description: Materialize asset request.
NewTaskResponse:
properties:
task_id:
type: string
title: Task Id
task_display_name:
type: string
title: Task Display Name
type: object
required:
- task_id
- task_display_name
title: NewTaskResponse
description: Lightweight response for new tasks that don't have TaskInstances
yet.
PatchTaskInstanceBody:
properties:
new_state:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@
TriggerDAGRunPostBody,
)
from airflow.api_fastapi.core_api.datamodels.task_instances import (
TaskInstanceCollectionResponse,
ClearTaskInstanceCollectionResponse,
NewTaskResponse,
TaskInstanceResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
Expand Down Expand Up @@ -285,7 +286,7 @@ def clear_dag_run(
body: DAGRunClearBody,
dag_bag: DagBagDep,
session: SessionDep,
) -> TaskInstanceCollectionResponse | DAGRunResponse:
) -> ClearTaskInstanceCollectionResponse | DAGRunResponse:
dag_run = session.scalar(
select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id).options(joinedload(DagRun.dag_model))
)
Expand All @@ -297,27 +298,39 @@ def clear_dag_run(

dag = dag_bag.get_dag_for_run(dag_run, session=session)

if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found")

if body.dry_run:
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found")
task_instances = dag.clear(
task_instances_or_ids = dag.clear(
run_id=dag_run_id,
task_ids=None,
only_new=body.only_new,
only_failed=body.only_failed,
run_on_latest_version=body.run_on_latest_version,
dry_run=True,
session=session,
)

return TaskInstanceCollectionResponse(
task_instances=cast("list[TaskInstanceResponse]", task_instances),
if body.only_new:
# Create lightweight NewTaskResponse objects for new tasks
new_task_ids = cast("set[str]", task_instances_or_ids)
task_instances: list[TaskInstanceResponse | NewTaskResponse] = [
NewTaskResponse(task_id=task_id, task_display_name=task_id)
for task_id in sorted(new_task_ids)
]
else:
task_instances = cast("list[TaskInstanceResponse | NewTaskResponse]", task_instances_or_ids)

return ClearTaskInstanceCollectionResponse(
task_instances=task_instances,
total_entries=len(task_instances),
)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found")

dag.clear(
run_id=dag_run_id,
task_ids=None,
only_new=body.only_new,
only_failed=body.only_failed,
run_on_latest_version=body.run_on_latest_version,
session=session,
Expand Down
9 changes: 8 additions & 1 deletion airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,14 @@ def _get_new_task_ids(
if not latest_dag:
raise ValueError(f"Latest DAG version for '{dag_id}' not found")

current_dag = scheduler_dagbag.get_dag_for_run(dag_run=dag_run, session=session)
# Use created_dag_version_id directly to get the DAG version the run was
# originally created with. We cannot use get_dag_for_run here because it
# falls back to the latest version when bundle_version is not set (e.g.
# LocalDagBundle), which would make current_dag == latest_dag and the diff
# always empty.
current_dag = None
if dag_run.created_dag_version_id:
current_dag = scheduler_dagbag.get_dag(version_id=dag_run.created_dag_version_id, session=session)
new_task_ids = set(latest_dag.task_ids) - set(current_dag.task_ids) if current_dag else set()

return list(new_task_ids)
Expand Down
17 changes: 17 additions & 0 deletions airflow-core/src/airflow/serialization/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,23 @@ def clear(
run_on_latest_version: bool = False,
) -> list[TaskInstance]: ... # pragma: no cover

@overload
def clear(
self,
*,
dry_run: Literal[True],
task_ids: Collection[str | tuple[str, int]] | None = None,
run_id: str,
only_failed: bool = False,
only_running: bool = False,
only_new: bool,
dag_run_state: DagRunState = DagRunState.QUEUED,
session: Session = NEW_SESSION,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(),
exclude_run_ids: frozenset[str] | None = frozenset(),
run_on_latest_version: bool = False,
) -> set[str] | list[TaskInstance]: ... # pragma: no cover

@overload
def clear(
self,
Expand Down
50 changes: 50 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,33 @@ export const $BulkUpdateAction_VariableBody_ = {
title: 'BulkUpdateAction[VariableBody]'
} as const;

export const $ClearTaskInstanceCollectionResponse = {
properties: {
task_instances: {
items: {
oneOf: [
{
'$ref': '#/components/schemas/TaskInstanceResponse'
},
{
'$ref': '#/components/schemas/NewTaskResponse'
}
]
},
type: 'array',
title: 'Task Instances'
},
total_entries: {
type: 'integer',
title: 'Total Entries'
}
},
type: 'object',
required: ['task_instances', 'total_entries'],
title: 'ClearTaskInstanceCollectionResponse',
description: 'Response for clear dag run dry run, which may contain new tasks without full TaskInstance data.'
} as const;

export const $ClearTaskInstancesBody = {
properties: {
dry_run: {
Expand Down Expand Up @@ -2473,6 +2500,12 @@ export const $DAGRunClearBody = {
title: 'Only Failed',
default: false
},
only_new: {
type: 'boolean',
title: 'Only New',
description: 'Only queue newly added tasks in the latest DAG version without clearing existing tasks.',
default: false
},
run_on_latest_version: {
type: 'boolean',
title: 'Run On Latest Version',
Expand Down Expand Up @@ -4575,6 +4608,23 @@ export const $MaterializeAssetBody = {
description: 'Materialize asset request.'
} as const;

export const $NewTaskResponse = {
properties: {
task_id: {
type: 'string',
title: 'Task Id'
},
task_display_name: {
type: 'string',
title: 'Task Display Name'
}
},
type: 'object',
required: ['task_id', 'task_display_name'],
title: 'NewTaskResponse',
description: "Lightweight response for new tasks that don't have TaskInstances yet."
} as const;

export const $PatchTaskInstanceBody = {
properties: {
new_state: {
Expand Down
24 changes: 22 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,14 @@ export type BulkUpdateAction_VariableBody_ = {
action_on_non_existence?: BulkActionNotOnExistence;
};

/**
* Response for clear dag run dry run, which may contain new tasks without full TaskInstance data.
*/
export type ClearTaskInstanceCollectionResponse = {
task_instances: Array<(TaskInstanceResponse | NewTaskResponse)>;
total_entries: number;
};

/**
* Request body for Clear Task Instances endpoint.
*/
Expand Down Expand Up @@ -645,6 +653,10 @@ export type DAGResponse = {
export type DAGRunClearBody = {
dry_run?: boolean;
only_failed?: boolean;
/**
* Only queue newly added tasks in the latest DAG version without clearing existing tasks.
*/
only_new?: boolean;
/**
* (Experimental) Run on the latest bundle version of the Dag after clearing the Dag Run.
*/
Expand Down Expand Up @@ -1150,6 +1162,14 @@ export type MaterializeAssetBody = {
partition_key?: string | null;
};

/**
* Lightweight response for new tasks that don't have TaskInstances yet.
*/
export type NewTaskResponse = {
task_id: string;
task_display_name: string;
};

/**
* Request body for Clear Task Instances endpoint.
*/
Expand Down Expand Up @@ -2538,7 +2558,7 @@ export type ClearDagRunData = {
requestBody: DAGRunClearBody;
};

export type ClearDagRunResponse = TaskInstanceCollectionResponse | DAGRunResponse;
export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse | DAGRunResponse;

export type GetDagRunsData = {
bundleVersion?: string | null;
Expand Down Expand Up @@ -4628,7 +4648,7 @@ export type $OpenApiTs = {
/**
* Successful Response
*/
200: TaskInstanceCollectionResponse | DAGRunResponse;
200: ClearTaskInstanceCollectionResponse | DAGRunResponse;
/**
* Unauthorized
*/
Expand Down
Loading
Loading