Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
task_dependencies_collection_schema,
task_instance_batch_form,
task_instance_collection_schema,
task_instance_history_schema,
task_instance_reference_collection_schema,
task_instance_reference_schema,
task_instance_schema,
Expand Down Expand Up @@ -754,3 +755,62 @@ def get_mapped_task_instance_dependencies(
return get_task_instance_dependencies(
dag_id=dag_id, dag_run_id=dag_run_id, task_id=task_id, map_index=map_index
)


@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_task_instance_try_details(
*,
dag_id: str,
dag_run_id: str,
task_id: str,
task_try_number: int,
map_index: int = -1,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get details of a task instance try."""
from airflow.models.taskinstancehistory import TaskInstanceHistory

def _query(orm_object):
query = select(orm_object).where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.try_number == task_try_number,
orm_object.map_index == map_index,
)
try:
result = session.execute(query).one_or_none()
except MultipleResultsFound:
raise NotFound(
"Task instance not found",
detail="Task instance is mapped, add the map_index value to the URL",
)
return result

result = _query(TI) or _query(TaskInstanceHistory)
if result is None:
error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}, map_index={map_index}, try_number={task_try_number}."
raise NotFound("Task instance not found", detail=error_message)
return task_instance_history_schema.dump(result[0])


@provide_session
def get_mapped_task_instance_try_details(
*,
dag_id: str,
dag_run_id: str,
task_id: str,
task_try_number: int,
map_index: int,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get details of a mapped task instance try."""
return get_task_instance_try_details(
dag_id=dag_id,
dag_run_id=dag_run_id,
task_id=task_id,
task_try_number=task_try_number,
map_index=map_index,
session=session,
)
59 changes: 59 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,65 @@ paths:
"404":
$ref: "#/components/responses/NotFound"

/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}:
get:
summary: get taskinstance try
description: |
Get details of a task instance try.

*New in version 2.10.0*
x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint
operationId: get_task_instance_try_details
tags: [TaskInstance]
parameters:
- $ref: "#/components/parameters/DAGID"
- $ref: "#/components/parameters/DAGRunID"
- $ref: "#/components/parameters/TaskID"
- $ref: "#/components/parameters/TaskTryNumber"
responses:
"200":
description: Success.
content:
application/json:
schema:
$ref: "#/components/schemas/TaskInstance"
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"

/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}:
get:
summary: get mapped taskinstance try
description: |
Get details of a mapped task instance try.

*New in version 2.10.0*
x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint
operationId: get_mapped_task_instance_try_details
tags: [TaskInstance]
parameters:
- $ref: "#/components/parameters/DAGID"
- $ref: "#/components/parameters/DAGRunID"
- $ref: "#/components/parameters/TaskID"
- $ref: "#/components/parameters/MapIndex"
- $ref: "#/components/parameters/TaskTryNumber"
responses:
"200":
description: Success.
content:
application/json:
schema:
$ref: "#/components/schemas/TaskInstance"
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"

/variables:
get:
summary: List variables
Expand Down
34 changes: 34 additions & 0 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.api_connexion.schemas.sla_miss_schema import SlaMissSchema
from airflow.api_connexion.schemas.trigger_schema import TriggerSchema
from airflow.models import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.utils.helpers import exactly_one
from airflow.utils.state import TaskInstanceState

Expand Down Expand Up @@ -86,6 +87,38 @@ def get_attribute(self, obj, attr, default):
return get_value(obj[0], attr, default)


class TaskInstanceHistorySchema(SQLAlchemySchema):
"""Task instance schema."""

class Meta:
"""Meta."""

model = TaskInstanceHistory

task_id = auto_field()
dag_id = auto_field()
run_id = auto_field(data_key="dag_run_id")
Comment thread
Lee-W marked this conversation as resolved.
map_index = auto_field()
start_date = auto_field()
end_date = auto_field()
duration = auto_field()
state = TaskInstanceStateField()
try_number = auto_field()
max_tries = auto_field()
task_display_name = fields.String(attribute="task_display_name", dump_only=True)
hostname = auto_field()
unixname = auto_field()
pool = auto_field()
pool_slots = auto_field()
queue = auto_field()
priority_weight = auto_field()
operator = auto_field()
queued_dttm = auto_field(data_key="queued_when")
pid = auto_field()
executor = auto_field()
executor_config = auto_field()


class TaskInstanceCollection(NamedTuple):
"""List of task instances with metadata."""

Expand Down Expand Up @@ -245,3 +278,4 @@ class TaskDependencyCollectionSchema(Schema):
task_instance_reference_schema = TaskInstanceReferenceSchema()
task_instance_reference_collection_schema = TaskInstanceReferenceCollectionSchema()
set_task_instance_note_form_schema = SetTaskInstanceNoteFormSchema()
task_instance_history_schema = TaskInstanceHistorySchema()
84 changes: 84 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,22 @@ export interface paths {
*/
post: operations["get_task_instances_batch"];
};
"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}": {
/**
* Get details of a task instance try.
*
* *New in version 2.10.0*
*/
get: operations["get_task_instance_try_details"];
};
"/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}": {
/**
* Get details of a mapped task instance try.
*
* *New in version 2.10.0*
*/
get: operations["get_mapped_task_instance_try_details"];
};
"/variables": {
/** The collection does not contain data. To get data, you must get a single entity. */
get: operations["get_variables"];
Expand Down Expand Up @@ -4264,6 +4280,68 @@ export interface operations {
};
};
};
/**
* Get details of a task instance try.
*
* *New in version 2.10.0*
*/
get_task_instance_try_details: {
parameters: {
path: {
/** The DAG ID. */
dag_id: components["parameters"]["DAGID"];
/** The DAG run ID. */
dag_run_id: components["parameters"]["DAGRunID"];
/** The task ID. */
task_id: components["parameters"]["TaskID"];
/** The task try number. */
task_try_number: components["parameters"]["TaskTryNumber"];
};
};
responses: {
/** Success. */
200: {
content: {
"application/json": components["schemas"]["TaskInstance"];
};
};
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
404: components["responses"]["NotFound"];
};
};
/**
* Get details of a mapped task instance try.
*
* *New in version 2.10.0*
*/
get_mapped_task_instance_try_details: {
parameters: {
path: {
/** The DAG ID. */
dag_id: components["parameters"]["DAGID"];
/** The DAG run ID. */
dag_run_id: components["parameters"]["DAGRunID"];
/** The task ID. */
task_id: components["parameters"]["TaskID"];
/** The map index. */
map_index: components["parameters"]["MapIndex"];
/** The task try number. */
task_try_number: components["parameters"]["TaskTryNumber"];
};
};
responses: {
/** Success. */
200: {
content: {
"application/json": components["schemas"]["TaskInstance"];
};
};
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
404: components["responses"]["NotFound"];
};
};
/** The collection does not contain data. To get data, you must get a single entity. */
get_variables: {
parameters: {
Expand Down Expand Up @@ -5640,6 +5718,12 @@ export type GetMappedTaskInstancesVariables = CamelCasedPropertiesDeep<
export type GetTaskInstancesBatchVariables = CamelCasedPropertiesDeep<
operations["get_task_instances_batch"]["requestBody"]["content"]["application/json"]
>;
export type GetTaskInstanceTryDetailsVariables = CamelCasedPropertiesDeep<
operations["get_task_instance_try_details"]["parameters"]["path"]
>;
export type GetMappedTaskInstanceTryDetailsVariables = CamelCasedPropertiesDeep<
operations["get_mapped_task_instance_try_details"]["parameters"]["path"]
>;
export type GetVariablesVariables = CamelCasedPropertiesDeep<
operations["get_variables"]["parameters"]["query"]
>;
Expand Down
Loading