Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_stats_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models.dag import DagRun
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
from airflow.www.extensions.init_auth_manager import get_auth_manager
Expand All @@ -37,6 +38,7 @@
from airflow.api_connexion.types import APIResponse


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.RUN)
@provide_session
def get_dag_stats(
Expand Down
5 changes: 4 additions & 1 deletion airflow/api_fastapi/common/db/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ def paginated_select(
offset: BaseParam | None = None,
limit: BaseParam | None = None,
session: Session = NEW_SESSION,
return_total_entries: bool = True,
) -> Select:
base_select = apply_filters_to_select(
base_select,
filters,
)

total_entries = get_query_count(base_select, session=session)
total_entries = None
if return_total_entries:
total_entries = get_query_count(base_select, session=session)

# TODO: Re-enable when permissions are handled. Readable / writable entities,
# for instance:
Expand Down
32 changes: 32 additions & 0 deletions airflow/api_fastapi/common/db/dag_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from sqlalchemy import func, select

from airflow.models.dagrun import DagRun

dagruns_select_with_state_count = (
select(
DagRun.dag_id,
DagRun.state,
func.count(DagRun.state),
)
.group_by(DagRun.dag_id, DagRun.state)
.order_by(DagRun.dag_id)
)
17 changes: 17 additions & 0 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ def depends(self, only_active: bool = True) -> _OnlyActiveFilter:
return self.set_value(only_active)


class _DagIdsFilter(BaseParam[list[str]]):
"""Filter on multi-valued dag_ids param for DagRun."""

def to_orm(self, select: Select) -> Select:
if self.value and self.skip_none:
return select.where(DagRun.dag_id.in_(self.value))
return select

def depends(self, dag_ids: list[str] = Query(None)) -> _DagIdsFilter:
return self.set_value(dag_ids)


class _SearchParam(BaseParam[str]):
"""Search on attribute."""

Expand Down Expand Up @@ -323,6 +335,7 @@ def depends(self, dag_id: str | None = None) -> _DagIdFilter:

# Common Safe DateTime
DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]

# DAG
QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter().depends)]
QueryOffset = Annotated[_OffsetFilter, Depends(_OffsetFilter().depends)]
Expand All @@ -337,10 +350,14 @@ def depends(self, dag_id: str | None = None) -> _DagIdFilter:
]
QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter().depends)]
QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter().depends)]

# DagRun
QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter, Depends(_LastDagRunStateFilter().depends)]
QueryDagIdsFilter = Annotated[_DagIdsFilter, Depends(_DagIdsFilter().depends)]

# DAGWarning
QueryDagIdInDagWarningFilter = Annotated[_DagIdFilter, Depends(_DagIdFilter(DagWarning.dag_id).depends)]
QueryWarningTypeFilter = Annotated[_WarningTypeFilter, Depends(_WarningTypeFilter().depends)]

# DAGTags
QueryDagTagPatternSearch = Annotated[_DagTagNamePatternSearch, Depends(_DagTagNamePatternSearch().depends)]
98 changes: 98 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2320,6 +2320,59 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/VersionInfo'
/public/dagStats/:
get:
tags:
- DagStats
summary: Get Dag Stats
description: Get Dag statistics.
operationId: get_dag_stats
parameters:
- name: dag_ids
in: query
required: false
schema:
type: array
items:
type: string
title: Dag Ids
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DagStatsCollectionResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
AppBuilderMenuItemResponse:
Expand Down Expand Up @@ -3270,6 +3323,51 @@ components:
- asset_triggered
title: DagRunType
description: Class with DagRun types.
DagStatsCollectionResponse:
properties:
dags:
items:
$ref: '#/components/schemas/DagStatsResponse'
type: array
title: Dags
total_entries:
type: integer
title: Total Entries
type: object
required:
- dags
- total_entries
title: DagStatsCollectionResponse
description: DAG Stats Collection serializer for responses.
DagStatsResponse:
properties:
dag_id:
type: string
title: Dag Id
stats:
items:
$ref: '#/components/schemas/DagStatsStateResponse'
type: array
title: Stats
type: object
required:
- dag_id
- stats
title: DagStatsResponse
description: DAG Stats serializer for responses.
DagStatsStateResponse:
properties:
state:
$ref: '#/components/schemas/DagRunState'
count:
type: integer
title: Count
type: object
required:
- state
- count
title: DagStatsStateResponse
description: DagStatsState serializer for responses.
DagTagPydantic:
properties:
name:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from airflow.api_fastapi.core_api.routes.public.connections import connections_router
from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
from airflow.api_fastapi.core_api.routes.public.dag_sources import dag_sources_router
from airflow.api_fastapi.core_api.routes.public.dag_stats import dag_stats_router
from airflow.api_fastapi.core_api.routes.public.dag_warning import dag_warning_router
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router
Expand Down Expand Up @@ -52,3 +53,4 @@
public_router.include_router(variables_router)
public_router.include_router(variables_router)
public_router.include_router(version_router)
public_router.include_router(dag_stats_router)
79 changes: 79 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from fastapi import Depends
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import (
get_session,
paginated_select,
)
from airflow.api_fastapi.common.db.dag_runs import dagruns_select_with_state_count
from airflow.api_fastapi.common.parameters import QueryDagIdsFilter
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_stats import (
DagStatsCollectionResponse,
DagStatsResponse,
DagStatsStateResponse,
)
from airflow.utils.state import DagRunState

dag_stats_router = AirflowRouter(tags=["DagStats"], prefix="/dagStats")


@dag_stats_router.get(
"/",
responses=create_openapi_http_exception_doc([400, 401, 403, 404]),
Comment thread
pierrejeambrun marked this conversation as resolved.
)
async def get_dag_stats(
session: Annotated[Session, Depends(get_session)],
dag_ids: QueryDagIdsFilter,
) -> DagStatsCollectionResponse:
"""Get Dag statistics."""
dagruns_select, _ = paginated_select(
base_select=dagruns_select_with_state_count,
filters=[dag_ids],
session=session,
return_total_entries=False,
)
query_result = session.execute(dagruns_select)

result_dag_ids = []
dag_state_data = {}
for dag_id, state, count in query_result:
dag_state_data[(dag_id, state)] = count
if dag_id not in result_dag_ids:
result_dag_ids.append(dag_id)

dags = [
DagStatsResponse(
dag_id=dag_id,
stats=[
DagStatsStateResponse(
state=state,
count=dag_state_data.get((dag_id, state), 0),
)
for state in DagRunState
],
)
for dag_id in result_dag_ids
]
return DagStatsCollectionResponse(dags=dags, total_entries=len(dags))
43 changes: 43 additions & 0 deletions airflow/api_fastapi/core_api/serializers/dag_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from pydantic import BaseModel

from airflow.utils.state import DagRunState


class DagStatsStateResponse(BaseModel):
"""DagStatsState serializer for responses."""

state: DagRunState
count: int


class DagStatsResponse(BaseModel):
"""DAG Stats serializer for responses."""

dag_id: str
stats: list[DagStatsStateResponse]


class DagStatsCollectionResponse(BaseModel):
"""DAG Stats Collection serializer for responses."""

dags: list[DagStatsResponse]
total_entries: int
17 changes: 17 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
DagRunService,
DagService,
DagSourceService,
DagStatsService,
DagWarningService,
DagsService,
DashboardService,
Expand Down Expand Up @@ -634,6 +635,22 @@ export const UseVersionServiceGetVersionKeyFn = (queryKey?: Array<unknown>) => [
useVersionServiceGetVersionKey,
...(queryKey ?? []),
];
export type DagStatsServiceGetDagStatsDefaultResponse = Awaited<
ReturnType<typeof DagStatsService.getDagStats>
>;
export type DagStatsServiceGetDagStatsQueryResult<
TData = DagStatsServiceGetDagStatsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useDagStatsServiceGetDagStatsKey = "DagStatsServiceGetDagStats";
export const UseDagStatsServiceGetDagStatsKeyFn = (
{
dagIds,
}: {
dagIds?: string[];
} = {},
queryKey?: Array<unknown>,
) => [useDagStatsServiceGetDagStatsKey, ...(queryKey ?? [{ dagIds }])];
export type BackfillServiceCreateBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.createBackfill>
>;
Expand Down
Loading