From 577fe8ef049763358ade768a9a5588595ba8c56a Mon Sep 17 00:00:00 2001 From: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> Date: Sun, 15 Mar 2026 00:10:23 +0100 Subject: [PATCH 01/13] Add dag runs filters (Consuming Asset) --- .../airflow/api_fastapi/common/parameters.py | 28 ++++++++++++++ .../openapi/v2-rest-api-generated.yaml | 10 +++++ .../core_api/routes/public/dag_run.py | 3 ++ .../airflow/ui/openapi-gen/queries/common.ts | 5 ++- .../ui/openapi-gen/queries/ensureQueryData.ts | 6 ++- .../ui/openapi-gen/queries/prefetch.ts | 6 ++- .../airflow/ui/openapi-gen/queries/queries.ts | 6 ++- .../ui/openapi-gen/queries/suspense.ts | 6 ++- .../ui/openapi-gen/requests/services.gen.ts | 4 +- .../ui/openapi-gen/requests/types.gen.ts | 4 ++ .../ui/src/constants/filterConfigs.tsx | 6 +++ .../airflow/ui/src/constants/searchParams.ts | 1 + .../src/airflow/ui/src/pages/DagRuns.tsx | 3 ++ .../airflow/ui/src/pages/DagRunsFilters.tsx | 1 + .../airflow/ui/src/utils/useFiltersHandler.ts | 1 + .../core_api/routes/public/test_dag_run.py | 38 +++++++++++++++++++ 16 files changed, 117 insertions(+), 11 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index e963cdbff557f..3a8ee987e22c0 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -46,6 +46,7 @@ from airflow.models import Base from airflow.models.asset import ( AssetAliasModel, + AssetEvent, AssetModel, AssetPartitionDagRun, DagScheduleAssetReference, @@ -785,6 +786,33 @@ def depends( QueryAssetDependencyFilter = Annotated[_AssetDependencyFilter, Depends(_AssetDependencyFilter.depends)] +class _ConsumingAssetFilter(BaseParam[str | None]): + """Filter DAG runs by consuming asset (name or URI).""" + + def to_orm(self, select: Select) -> Select: + if self.value is None and self.skip_none: + return select + + from airflow.models.asset import AssetModel + + return ( + select.distinct() + .join(DagRun.consumed_asset_events) # DagRun → AssetEvent + .join(AssetModel, AssetEvent.asset_id == AssetModel.id) # AssetEvent → AssetModel (explicit join) + .where(or_(AssetModel.name.ilike(f"%{self.value}%"), AssetModel.uri.ilike(f"%{self.value}%"))) + ) + + @classmethod + def depends( + cls, + consuming_asset: str | None = Query(None, description="Filter by consuming asset name or URI"), + ) -> _ConsumingAssetFilter: + return cls().set_value(consuming_asset) + + +QueryConsumingAssetFilter = Annotated[_ConsumingAssetFilter, Depends(_ConsumingAssetFilter.depends)] + + class _PendingActionsFilter(BaseParam[bool]): """Filter Dags by having pending HITL actions (more than 1).""" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 17c4f87a6e44f..a48688bca55e6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2367,6 +2367,16 @@ paths: description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ \ or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions\ \ are **not** supported." + - name: consuming_asset + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: Filter by consuming asset name or URI + title: Consuming Asset + description: Filter by consuming asset name or URI responses: '200': description: Successful Response diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 67f7630712488..09a77e16c5f3b 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -45,6 +45,7 @@ FilterParam, LimitFilter, OffsetFilter, + QueryConsumingAssetFilter, QueryDagRunPartitionKeySearch, QueryDagRunRunTypesFilter, QueryDagRunStateFilter, @@ -384,6 +385,7 @@ def get_dag_runs( ], dag_id_pattern: Annotated[_SearchParam, Depends(search_param_factory(DagRun.dag_id, "dag_id_pattern"))], partition_key_pattern: QueryDagRunPartitionKeySearch, + consuming_asset: QueryConsumingAssetFilter, ) -> DAGRunCollectionResponse: """ Get all DAG Runs. @@ -419,6 +421,7 @@ def get_dag_runs( triggering_user_name_pattern, dag_id_pattern, partition_key_pattern, + consuming_asset, ], order_by=order_by, offset=offset, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 612a3d56747e3..87f25a14c9d15 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -143,9 +143,10 @@ export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId }: export type DagRunServiceGetDagRunsDefaultResponse = Awaited>; export type DagRunServiceGetDagRunsQueryResult = UseQueryResult; export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns"; -export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { bundleVersion?: string; confContains?: string; + consumingAsset?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; @@ -181,7 +182,7 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, d updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])]; +}, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])]; export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = Awaited>; export type DagRunServiceWaitDagRunUntilFinishedQueryResult = UseQueryResult; export const useDagRunServiceWaitDagRunUntilFinishedKey = "DagRunServiceWaitDagRunUntilFinished"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 8bc9e9df8e6df..bb17bc5f99dac 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -299,12 +299,14 @@ export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu * @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.partitionKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. +* @param data.consumingAsset Filter by consuming asset name or URI * @returns DAGRunCollectionResponse Successful Response * @throws ApiError */ -export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { bundleVersion?: string; confContains?: string; + consumingAsset?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; @@ -340,7 +342,7 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); +}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index f4cb6f482bdf6..38111e383e220 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -299,12 +299,14 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: Quer * @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.partitionKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. +* @param data.consumingAsset Filter by consuming asset name or URI * @returns DAGRunCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { bundleVersion?: string; confContains?: string; + consumingAsset?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; @@ -340,7 +342,7 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { b updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); +}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 8e9ef5aa29d0c..59ecd21144e40 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -299,12 +299,14 @@ export const useDagRunServiceGetUpstreamAssetEvents = = unknown[]>({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const useDagRunServiceGetDagRuns = = unknown[]>({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { bundleVersion?: string; confContains?: string; + consumingAsset?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; @@ -340,7 +342,7 @@ export const useDagRunServiceGetDagRuns = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index c4a41691b1a2e..e3bbde579adda 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -299,12 +299,14 @@ export const useDagRunServiceGetUpstreamAssetEventsSuspense = = unknown[]>({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const useDagRunServiceGetDagRunsSuspense = = unknown[]>({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { bundleVersion?: string; confContains?: string; + consumingAsset?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; @@ -340,7 +342,7 @@ export const useDagRunServiceGetDagRunsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 6e701bf68dc6e..335632e0870cb 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -1010,6 +1010,7 @@ export class DagRunService { * @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.partitionKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. + * @param data.consumingAsset Filter by consuming asset name or URI * @returns DAGRunCollectionResponse Successful Response * @throws ApiError */ @@ -1056,7 +1057,8 @@ export class DagRunService { run_id_pattern: data.runIdPattern, triggering_user_name_pattern: data.triggeringUserNamePattern, dag_id_pattern: data.dagIdPattern, - partition_key_pattern: data.partitionKeyPattern + partition_key_pattern: data.partitionKeyPattern, + consuming_asset: data.consumingAsset }, errors: { 401: 'Unauthorized', diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 7423c08d42c05..2dcecbd2c8e5f 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2527,6 +2527,10 @@ export type ClearDagRunResponse = TaskInstanceCollectionResponse | DAGRunRespons export type GetDagRunsData = { bundleVersion?: string | null; confContains?: string; + /** + * Filter by consuming asset name or URI + */ + consumingAsset?: string | null; dagId: string; /** * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. diff --git a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx index 691ed35c04272..27fb38524e363 100644 --- a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx +++ b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx @@ -89,6 +89,12 @@ export const useFilterConfigs = () => { label: translate("common:dagRun.conf"), type: FilterTypes.TEXT, }, + [SearchParamsKeys.CONSUMING_ASSET]: { + icon: , + label: "Consuming Asset", + placeholder: "Search by asset name", + type: FilterTypes.TEXT, + }, [SearchParamsKeys.CREATED_AT_RANGE]: { endKey: SearchParamsKeys.CREATED_AT_LTE, icon: , diff --git a/airflow-core/src/airflow/ui/src/constants/searchParams.ts b/airflow-core/src/airflow/ui/src/constants/searchParams.ts index d37001e768802..8f65259e64826 100644 --- a/airflow-core/src/airflow/ui/src/constants/searchParams.ts +++ b/airflow-core/src/airflow/ui/src/constants/searchParams.ts @@ -23,6 +23,7 @@ export enum SearchParamsKeys { BODY_SEARCH = "body_search", BUNDLE_VERSION = "bundle_version", CONF_CONTAINS = "conf_contains", + CONSUMING_ASSET = "consuming_asset", CREATED_AT_GTE = "created_at_gte", CREATED_AT_LTE = "created_at_lte", CREATED_AT_RANGE = "created_at_range", diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx index 94b33640312e1..f998b01df3c25 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx @@ -47,6 +47,7 @@ type DagRunRow = { row: { original: DAGRunResponse } }; const { BUNDLE_VERSION: BUNDLE_VERSION_PARAM, CONF_CONTAINS: CONF_CONTAINS_PARAM, + CONSUMING_ASSET: CONSUMING_ASSET_PARAM, DAG_ID_PATTERN: DAG_ID_PATTERN_PARAM, DAG_VERSION: DAG_VERSION_PARAM, DURATION_GTE: DURATION_GTE_PARAM, @@ -214,6 +215,7 @@ export const DagRuns = () => { const filteredTriggeringUserNamePattern = searchParams.get(TRIGGERING_USER_NAME_PATTERN_PARAM); const filteredDagIdPattern = searchParams.get(DAG_ID_PATTERN_PARAM); const filteredDagVersion = searchParams.get(DAG_VERSION_PARAM); + const filteredConsumingAsset = searchParams.get(CONSUMING_ASSET_PARAM); const bundleVersion = searchParams.get(BUNDLE_VERSION_PARAM); const startDateGte = searchParams.get(START_DATE_GTE_PARAM); const startDateLte = searchParams.get(START_DATE_LTE_PARAM); @@ -234,6 +236,7 @@ export const DagRuns = () => { { bundleVersion: bundleVersion ?? undefined, confContains: confContains !== null && confContains !== "" ? confContains : undefined, + consumingAsset: filteredConsumingAsset ?? undefined, dagId: dagId ?? "~", dagIdPattern: filteredDagIdPattern ?? undefined, dagVersion: diff --git a/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx b/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx index b0f90d66f9876..a26334a9a4f07 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx @@ -42,6 +42,7 @@ export const DagRunsFilters = ({ dagId }: DagRunsFiltersProps) => { SearchParamsKeys.DAG_VERSION, SearchParamsKeys.PARTITION_KEY_PATTERN, SearchParamsKeys.BUNDLE_VERSION, + SearchParamsKeys.CONSUMING_ASSET, ]; if (dagId === undefined) { diff --git a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts index 17ab2fc35061f..a98653607ae14 100644 --- a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts +++ b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts @@ -60,6 +60,7 @@ export type FilterableSearchParamsKeys = | SearchParamsKeys.BODY_SEARCH | SearchParamsKeys.BUNDLE_VERSION | SearchParamsKeys.CONF_CONTAINS + | SearchParamsKeys.CONSUMING_ASSET | SearchParamsKeys.CREATED_AT_RANGE | SearchParamsKeys.DAG_DISPLAY_NAME_PATTERN | SearchParamsKeys.DAG_ID diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 3a55532e3d73b..d19ded0c5ab24 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -40,6 +40,7 @@ from tests_common.test_utils.api_fastapi import _check_dag_run_note, _check_last_log from tests_common.test_utils.asserts import assert_queries_count from tests_common.test_utils.db import ( + clear_db_assets, clear_db_connections, clear_db_dag_bundles, clear_db_dags, @@ -130,6 +131,7 @@ def setup(request, dag_maker, session=None): clear_db_dag_bundles() clear_db_serialized_dags() clear_db_logs() + clear_db_assets() if "no_setup" in request.keywords: return @@ -213,6 +215,35 @@ def setup(request, dag_maker, session=None): # Set conf for testing conf_contains filter dag_run4.conf = {"env": "testing", "mode": "ci"} + asset1 = AssetModel(name="sales", uri="s3://bucket/sales") + asset2 = AssetModel(name="customer", uri="s3://bucket/customer") + session.add(asset1) + session.add(asset2) + session.flush() + + event1 = AssetEvent( + asset_id=asset1.id, + source_dag_id="source_dag", + source_run_id="source_run", + source_task_id="source_task", + ) + event2 = AssetEvent( + asset_id=asset2.id, + source_dag_id="source_dag", + source_run_id="source_run", + source_task_id="source_task", + ) + session.add(event1) + session.add(event2) + session.flush() + + dag_run1.consumed_asset_events.append(event1) + dag_run2.consumed_asset_events.append(event2) + + session.merge(dag_run1) + session.merge(dag_run2) + session.flush() + dag_maker.sync_dagbag_to_db() dag_maker.dag_model.has_task_concurrency_limits = True session.merge(ti1) @@ -710,6 +741,13 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): ), # Test for debug key ("~", {"conf_contains": "version"}, [DAG1_RUN1_ID]), # Test for the key "version" ("~", {"conf_contains": "nonexistent_key"}, []), # Test for a key that doesn't exist + # Test consuming_asset filter + ("~", {"consuming_asset": "sales"}, [DAG1_RUN1_ID]), # Filter by asset name + ("~", {"consuming_asset": "s3://bucket/sales"}, [DAG1_RUN1_ID]), # Filter by asset URI + ("~", {"consuming_asset": "customer"}, [DAG1_RUN2_ID]), # Filter by another asset + ("~", {"consuming_asset": "s3://bucket/customer"}, [DAG1_RUN2_ID]), # Filter by customer URI + ("~", {"consuming_asset": "s3://bucket"}, [DAG1_RUN1_ID, DAG1_RUN2_ID]), # Partial URI match + ("~", {"consuming_asset": "nonexistent_asset"}, []), # Non-existent asset returns empty ], ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") From aa0cc23ee883a7d500bf5dd4d729c51eb8336927 Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Tue, 17 Mar 2026 01:56:09 +0800 Subject: [PATCH 02/13] Fix: correct consuming asset filter setup using association_table --- .../airflow/api_fastapi/common/parameters.py | 24 ++++++++++++++---- .../core_api/routes/public/test_dag_run.py | 25 +++++++++---------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 3a8ee987e22c0..efe0d229197e1 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -52,6 +52,7 @@ DagScheduleAssetReference, TaskInletAssetReference, TaskOutletAssetReference, + association_table, ) from airflow.models.connection import Connection from airflow.models.dag import DagModel, DagTag @@ -793,13 +794,26 @@ def to_orm(self, select: Select) -> Select: if self.value is None and self.skip_none: return select - from airflow.models.asset import AssetModel - return ( select.distinct() - .join(DagRun.consumed_asset_events) # DagRun → AssetEvent - .join(AssetModel, AssetEvent.asset_id == AssetModel.id) # AssetEvent → AssetModel (explicit join) - .where(or_(AssetModel.name.ilike(f"%{self.value}%"), AssetModel.uri.ilike(f"%{self.value}%"))) + .join( + association_table, + DagRun.id == association_table.c.dag_run_id, + ) + .join( + AssetEvent, + association_table.c.event_id == AssetEvent.id, + ) + .join( + AssetModel, + AssetEvent.asset_id == AssetModel.id, + ) + .where( + or_( + AssetModel.name.ilike(f"%{self.value}%"), + AssetModel.uri.ilike(f"%{self.value}%"), + ) + ) ) @classmethod diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index d19ded0c5ab24..5d198d34d00c3 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -215,10 +215,16 @@ def setup(request, dag_maker, session=None): # Set conf for testing conf_contains filter dag_run4.conf = {"env": "testing", "mode": "ci"} + dag_maker.sync_dagbag_to_db() + dag_maker.dag_model.has_task_concurrency_limits = True + session.merge(ti1) + session.merge(ti2) + session.merge(dag_maker.dag_model) + session.commit() + asset1 = AssetModel(name="sales", uri="s3://bucket/sales") asset2 = AssetModel(name="customer", uri="s3://bucket/customer") - session.add(asset1) - session.add(asset2) + session.add_all([asset1, asset2]) session.flush() event1 = AssetEvent( @@ -233,22 +239,15 @@ def setup(request, dag_maker, session=None): source_run_id="source_run", source_task_id="source_task", ) - session.add(event1) - session.add(event2) + session.add_all([event1, event2]) session.flush() + dag_run1 = session.scalar(select(DagRun).filter(DagRun.id == dag_run1.id)) + dag_run2 = session.scalar(select(DagRun).filter(DagRun.id == dag_run2.id)) + dag_run1.consumed_asset_events.append(event1) dag_run2.consumed_asset_events.append(event2) - session.merge(dag_run1) - session.merge(dag_run2) - session.flush() - - dag_maker.sync_dagbag_to_db() - dag_maker.dag_model.has_task_concurrency_limits = True - session.merge(ti1) - session.merge(ti2) - session.merge(dag_maker.dag_model) session.commit() From 19446163026a54f3a56bdf5e27bc456ae344d13d Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Tue, 17 Mar 2026 02:23:29 +0800 Subject: [PATCH 03/13] Trigger CI rebuild From 55ccac02100b721deb7054c994f98095034833de Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Sat, 21 Mar 2026 02:09:23 +0800 Subject: [PATCH 04/13] Rename consuming_asset filter to consuming_asset_pattern with database icon --- airflow-core/src/airflow/api_fastapi/common/parameters.py | 8 +++++--- .../core_api/openapi/v2-rest-api-generated.yaml | 8 ++++---- .../airflow/api_fastapi/core_api/routes/public/dag_run.py | 4 ++-- airflow-core/src/airflow/ui/openapi-gen/queries/common.ts | 6 +++--- .../src/airflow/ui/openapi-gen/queries/ensureQueryData.ts | 8 ++++---- .../src/airflow/ui/openapi-gen/queries/prefetch.ts | 8 ++++---- .../src/airflow/ui/openapi-gen/queries/queries.ts | 8 ++++---- .../src/airflow/ui/openapi-gen/queries/suspense.ts | 8 ++++---- .../src/airflow/ui/openapi-gen/requests/services.gen.ts | 4 ++-- .../src/airflow/ui/openapi-gen/requests/types.gen.ts | 4 ++-- .../src/airflow/ui/src/constants/filterConfigs.tsx | 8 +++++--- 11 files changed, 39 insertions(+), 35 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index efe0d229197e1..981aaba90f2f5 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -819,12 +819,14 @@ def to_orm(self, select: Select) -> Select: @classmethod def depends( cls, - consuming_asset: str | None = Query(None, description="Filter by consuming asset name or URI"), + consuming_asset_pattern: str | None = Query( + None, description="Filter by consuming asset name or URI using pattern matching" + ), ) -> _ConsumingAssetFilter: - return cls().set_value(consuming_asset) + return cls().set_value(consuming_asset_pattern) -QueryConsumingAssetFilter = Annotated[_ConsumingAssetFilter, Depends(_ConsumingAssetFilter.depends)] +QueryConsumingAssetPatternSearch = Annotated[_ConsumingAssetFilter, Depends(_ConsumingAssetFilter.depends)] class _PendingActionsFilter(BaseParam[bool]): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index a48688bca55e6..43900c3f8b202 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2367,16 +2367,16 @@ paths: description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ \ or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions\ \ are **not** supported." - - name: consuming_asset + - name: consuming_asset_pattern in: query required: false schema: anyOf: - type: string - type: 'null' - description: Filter by consuming asset name or URI - title: Consuming Asset - description: Filter by consuming asset name or URI + description: Filter by consuming asset name or URI using pattern matching + title: Consuming Asset Pattern + description: Filter by consuming asset name or URI using pattern matching responses: '200': description: Successful Response diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 09a77e16c5f3b..1e148a2e27961 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -45,7 +45,7 @@ FilterParam, LimitFilter, OffsetFilter, - QueryConsumingAssetFilter, + QueryConsumingAssetPatternSearch, QueryDagRunPartitionKeySearch, QueryDagRunRunTypesFilter, QueryDagRunStateFilter, @@ -385,7 +385,7 @@ def get_dag_runs( ], dag_id_pattern: Annotated[_SearchParam, Depends(search_param_factory(DagRun.dag_id, "dag_id_pattern"))], partition_key_pattern: QueryDagRunPartitionKeySearch, - consuming_asset: QueryConsumingAssetFilter, + consuming_asset: QueryConsumingAssetPatternSearch, ) -> DAGRunCollectionResponse: """ Get all DAG Runs. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 87f25a14c9d15..c02f01452b39e 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -143,10 +143,10 @@ export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId }: export type DagRunServiceGetDagRunsDefaultResponse = Awaited>; export type DagRunServiceGetDagRunsQueryResult = UseQueryResult; export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns"; -export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { bundleVersion?: string; confContains?: string; - consumingAsset?: string; + consumingAssetPattern?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; @@ -182,7 +182,7 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, c updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])]; +}, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])]; export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = Awaited>; export type DagRunServiceWaitDagRunUntilFinishedQueryResult = UseQueryResult; export const useDagRunServiceWaitDagRunUntilFinishedKey = "DagRunServiceWaitDagRunUntilFinished"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index bb17bc5f99dac..c0ca5feb6cbd4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -299,14 +299,14 @@ export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu * @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.partitionKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. -* @param data.consumingAsset Filter by consuming asset name or URI +* @param data.consumingAssetPattern Filter by consuming asset name or URI using pattern matching * @returns DAGRunCollectionResponse Successful Response * @throws ApiError */ -export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { bundleVersion?: string; confContains?: string; - consumingAsset?: string; + consumingAssetPattern?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; @@ -342,7 +342,7 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); +}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 38111e383e220..d7c8731e6a686 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -299,14 +299,14 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: Quer * @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.partitionKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. -* @param data.consumingAsset Filter by consuming asset name or URI +* @param data.consumingAssetPattern Filter by consuming asset name or URI using pattern matching * @returns DAGRunCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { bundleVersion?: string; confContains?: string; - consumingAsset?: string; + consumingAssetPattern?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; @@ -342,7 +342,7 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { b updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); +}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 59ecd21144e40..7ab58bf81ad26 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -299,14 +299,14 @@ export const useDagRunServiceGetUpstreamAssetEvents = = unknown[]>({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const useDagRunServiceGetDagRuns = = unknown[]>({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { bundleVersion?: string; confContains?: string; - consumingAsset?: string; + consumingAssetPattern?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; @@ -342,7 +342,7 @@ export const useDagRunServiceGetDagRuns = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index e3bbde579adda..cfc50f2d88308 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -299,14 +299,14 @@ export const useDagRunServiceGetUpstreamAssetEventsSuspense = = unknown[]>({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const useDagRunServiceGetDagRunsSuspense = = unknown[]>({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { bundleVersion?: string; confContains?: string; - consumingAsset?: string; + consumingAssetPattern?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; @@ -342,7 +342,7 @@ export const useDagRunServiceGetDagRunsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAsset, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAssetPattern, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 335632e0870cb..0e43f82154ac0 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -1010,7 +1010,7 @@ export class DagRunService { * @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.dagIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. * @param data.partitionKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. - * @param data.consumingAsset Filter by consuming asset name or URI + * @param data.consumingAssetPattern Filter by consuming asset name or URI using pattern matching * @returns DAGRunCollectionResponse Successful Response * @throws ApiError */ @@ -1058,7 +1058,7 @@ export class DagRunService { triggering_user_name_pattern: data.triggeringUserNamePattern, dag_id_pattern: data.dagIdPattern, partition_key_pattern: data.partitionKeyPattern, - consuming_asset: data.consumingAsset + consuming_asset_pattern: data.consumingAssetPattern }, errors: { 401: 'Unauthorized', diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 2dcecbd2c8e5f..c988b3abda575 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2528,9 +2528,9 @@ export type GetDagRunsData = { bundleVersion?: string | null; confContains?: string; /** - * Filter by consuming asset name or URI + * Filter by consuming asset name or URI using pattern matching */ - consumingAsset?: string | null; + consumingAssetPattern?: string | null; dagId: string; /** * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`). Regular expressions are **not** supported. diff --git a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx index 27fb38524e363..6729a24140147 100644 --- a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx +++ b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx @@ -22,6 +22,7 @@ import { Flex } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; import { BiTargetLock } from "react-icons/bi"; import { FiBarChart, FiUser } from "react-icons/fi"; +import { ImDatabase } from "react-icons/im"; import { LuBrackets } from "react-icons/lu"; import { MdDateRange, @@ -90,9 +91,10 @@ export const useFilterConfigs = () => { type: FilterTypes.TEXT, }, [SearchParamsKeys.CONSUMING_ASSET]: { - icon: , - label: "Consuming Asset", - placeholder: "Search by asset name", + hotkeyDisabled: true, + icon: , + label: translate("common:consumingAsset"), + placeholder: translate("common:filters.searchAsset"), type: FilterTypes.TEXT, }, [SearchParamsKeys.CREATED_AT_RANGE]: { From a41ef6571f4267634a8558a86ae548b2ddd28ee4 Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Sat, 21 Mar 2026 02:18:17 +0800 Subject: [PATCH 05/13] Rename consuming_asset filter to consuming_asset_pattern with database icon --- airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx index 6729a24140147..1d1c9406903bb 100644 --- a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx +++ b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx @@ -21,8 +21,7 @@ import { Flex } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; import { BiTargetLock } from "react-icons/bi"; -import { FiBarChart, FiUser } from "react-icons/fi"; -import { ImDatabase } from "react-icons/im"; +import { FiBarChart, FiUser, FiDatabase } from "react-icons/fi"; import { LuBrackets } from "react-icons/lu"; import { MdDateRange, @@ -92,7 +91,7 @@ export const useFilterConfigs = () => { }, [SearchParamsKeys.CONSUMING_ASSET]: { hotkeyDisabled: true, - icon: , + icon: , label: translate("common:consumingAsset"), placeholder: translate("common:filters.searchAsset"), type: FilterTypes.TEXT, From 64df21b217387bfe72e7134f2a1c8e1f7d2341bb Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Tue, 17 Mar 2026 02:23:29 +0800 Subject: [PATCH 06/13] Trigger CI rebuild From a35f59ae1bc16b674fc3dd72fcd05e50bb765506 Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Sat, 21 Mar 2026 11:26:13 +0800 Subject: [PATCH 07/13] Fix consuming_asset_pattern naming --- airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx | 2 +- airflow-core/src/airflow/ui/src/constants/searchParams.ts | 2 +- airflow-core/src/airflow/ui/src/pages/DagRuns.tsx | 6 +++--- airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx | 2 +- airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx index 1d1c9406903bb..4b06175db885e 100644 --- a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx +++ b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx @@ -89,7 +89,7 @@ export const useFilterConfigs = () => { label: translate("common:dagRun.conf"), type: FilterTypes.TEXT, }, - [SearchParamsKeys.CONSUMING_ASSET]: { + [SearchParamsKeys.CONSUMING_ASSET_PATTERN]: { hotkeyDisabled: true, icon: , label: translate("common:consumingAsset"), diff --git a/airflow-core/src/airflow/ui/src/constants/searchParams.ts b/airflow-core/src/airflow/ui/src/constants/searchParams.ts index 8f65259e64826..7194fe69ceeb2 100644 --- a/airflow-core/src/airflow/ui/src/constants/searchParams.ts +++ b/airflow-core/src/airflow/ui/src/constants/searchParams.ts @@ -23,7 +23,7 @@ export enum SearchParamsKeys { BODY_SEARCH = "body_search", BUNDLE_VERSION = "bundle_version", CONF_CONTAINS = "conf_contains", - CONSUMING_ASSET = "consuming_asset", + CONSUMING_ASSET_PATTERN = "consuming_asset_pattern", CREATED_AT_GTE = "created_at_gte", CREATED_AT_LTE = "created_at_lte", CREATED_AT_RANGE = "created_at_range", diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx index f998b01df3c25..08eb817db5b70 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx @@ -47,7 +47,7 @@ type DagRunRow = { row: { original: DAGRunResponse } }; const { BUNDLE_VERSION: BUNDLE_VERSION_PARAM, CONF_CONTAINS: CONF_CONTAINS_PARAM, - CONSUMING_ASSET: CONSUMING_ASSET_PARAM, + CONSUMING_ASSET_PATTERN: CONSUMING_ASSET_PATTERN_PARAM, DAG_ID_PATTERN: DAG_ID_PATTERN_PARAM, DAG_VERSION: DAG_VERSION_PARAM, DURATION_GTE: DURATION_GTE_PARAM, @@ -215,7 +215,7 @@ export const DagRuns = () => { const filteredTriggeringUserNamePattern = searchParams.get(TRIGGERING_USER_NAME_PATTERN_PARAM); const filteredDagIdPattern = searchParams.get(DAG_ID_PATTERN_PARAM); const filteredDagVersion = searchParams.get(DAG_VERSION_PARAM); - const filteredConsumingAsset = searchParams.get(CONSUMING_ASSET_PARAM); + const filteredConsumingAsset = searchParams.get(CONSUMING_ASSET_PATTERN_PARAM); const bundleVersion = searchParams.get(BUNDLE_VERSION_PARAM); const startDateGte = searchParams.get(START_DATE_GTE_PARAM); const startDateLte = searchParams.get(START_DATE_LTE_PARAM); @@ -236,7 +236,7 @@ export const DagRuns = () => { { bundleVersion: bundleVersion ?? undefined, confContains: confContains !== null && confContains !== "" ? confContains : undefined, - consumingAsset: filteredConsumingAsset ?? undefined, + consumingAssetPattern: filteredConsumingAsset ?? undefined, dagId: dagId ?? "~", dagIdPattern: filteredDagIdPattern ?? undefined, dagVersion: diff --git a/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx b/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx index a26334a9a4f07..f39e66cab41ba 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx @@ -42,7 +42,7 @@ export const DagRunsFilters = ({ dagId }: DagRunsFiltersProps) => { SearchParamsKeys.DAG_VERSION, SearchParamsKeys.PARTITION_KEY_PATTERN, SearchParamsKeys.BUNDLE_VERSION, - SearchParamsKeys.CONSUMING_ASSET, + SearchParamsKeys.CONSUMING_ASSET_PATTERN, ]; if (dagId === undefined) { diff --git a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts index a98653607ae14..6df86f6f6ab18 100644 --- a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts +++ b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts @@ -60,7 +60,7 @@ export type FilterableSearchParamsKeys = | SearchParamsKeys.BODY_SEARCH | SearchParamsKeys.BUNDLE_VERSION | SearchParamsKeys.CONF_CONTAINS - | SearchParamsKeys.CONSUMING_ASSET + | SearchParamsKeys.CONSUMING_ASSET_PATTERN | SearchParamsKeys.CREATED_AT_RANGE | SearchParamsKeys.DAG_DISPLAY_NAME_PATTERN | SearchParamsKeys.DAG_ID From e5fb1006423fa649aa24c2c661bad2eebf66fc3f Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Mon, 23 Mar 2026 00:15:33 +0800 Subject: [PATCH 08/13] Fix: rename consuming_asset to consuming_asset_pattern --- .../core_api/routes/public/dag_run.py | 4 ++-- .../core_api/routes/public/test_dag_run.py | 22 +++++++++++++------ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 1e148a2e27961..bd50be17b9dc4 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -385,7 +385,7 @@ def get_dag_runs( ], dag_id_pattern: Annotated[_SearchParam, Depends(search_param_factory(DagRun.dag_id, "dag_id_pattern"))], partition_key_pattern: QueryDagRunPartitionKeySearch, - consuming_asset: QueryConsumingAssetPatternSearch, + consuming_asset_pattern: QueryConsumingAssetPatternSearch, ) -> DAGRunCollectionResponse: """ Get all DAG Runs. @@ -421,7 +421,7 @@ def get_dag_runs( triggering_user_name_pattern, dag_id_pattern, partition_key_pattern, - consuming_asset, + consuming_asset_pattern, ], order_by=order_by, offset=offset, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 5d198d34d00c3..716d096a80392 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -740,13 +740,21 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): ), # Test for debug key ("~", {"conf_contains": "version"}, [DAG1_RUN1_ID]), # Test for the key "version" ("~", {"conf_contains": "nonexistent_key"}, []), # Test for a key that doesn't exist - # Test consuming_asset filter - ("~", {"consuming_asset": "sales"}, [DAG1_RUN1_ID]), # Filter by asset name - ("~", {"consuming_asset": "s3://bucket/sales"}, [DAG1_RUN1_ID]), # Filter by asset URI - ("~", {"consuming_asset": "customer"}, [DAG1_RUN2_ID]), # Filter by another asset - ("~", {"consuming_asset": "s3://bucket/customer"}, [DAG1_RUN2_ID]), # Filter by customer URI - ("~", {"consuming_asset": "s3://bucket"}, [DAG1_RUN1_ID, DAG1_RUN2_ID]), # Partial URI match - ("~", {"consuming_asset": "nonexistent_asset"}, []), # Non-existent asset returns empty + # Test consuming_asset_pattern filter + ("~", {"consuming_asset_pattern": "sales"}, [DAG1_RUN1_ID]), # Filter by asset name + ("~", {"consuming_asset_pattern": "s3://bucket/sales"}, [DAG1_RUN1_ID]), # Filter by asset URI + ("~", {"consuming_asset_pattern": "customer"}, [DAG1_RUN2_ID]), # Filter by another asset + ( + "~", + {"consuming_asset_pattern": "s3://bucket/customer"}, + [DAG1_RUN2_ID], + ), # Filter by customer URI + ( + "~", + {"consuming_asset_pattern": "s3://bucket"}, + [DAG1_RUN1_ID, DAG1_RUN2_ID], + ), # Partial URI match + ("~", {"consuming_asset_pattern": "nonexistent_asset"}, []), # Non-existent asset returns empty ], ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") From b08a4e851fde8b55652d0b15819bb14f32b08301 Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Mon, 23 Mar 2026 01:33:02 +0800 Subject: [PATCH 09/13] Fix: rename consuming_asset to consuming_asset_pattern --- .../airflow/api_fastapi/common/parameters.py | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 981aaba90f2f5..ce879a061feb8 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -794,26 +794,27 @@ def to_orm(self, select: Select) -> Select: if self.value is None and self.skip_none: return select - return ( - select.distinct() - .join( - association_table, - DagRun.id == association_table.c.dag_run_id, - ) - .join( - AssetEvent, - association_table.c.event_id == AssetEvent.id, - ) - .join( - AssetModel, - AssetEvent.asset_id == AssetModel.id, - ) + event_subquery = ( + sql_select(association_table.c.event_id) + .join(AssetEvent, association_table.c.event_id == AssetEvent.id) + .join(AssetModel, AssetEvent.asset_id == AssetModel.id) .where( or_( AssetModel.name.ilike(f"%{self.value}%"), AssetModel.uri.ilike(f"%{self.value}%"), ) ) + .distinct() + .subquery() + ) + + return ( + select.distinct() + .join( + association_table, + DagRun.id == association_table.c.dag_run_id, + ) + .where(association_table.c.event_id.in_(sql_select(event_subquery.c.event_id))) ) @classmethod From 3cabadeaa74ebb3138d1e1131a4f4f0df5582289 Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Mon, 23 Mar 2026 10:03:24 +0800 Subject: [PATCH 10/13] Fix: Resolve PostgreSQL JSON comparison error in _ConsumingAssetFilter --- .../airflow/api_fastapi/common/parameters.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index ce879a061feb8..dd96d721bd37b 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -795,8 +795,7 @@ def to_orm(self, select: Select) -> Select: return select event_subquery = ( - sql_select(association_table.c.event_id) - .join(AssetEvent, association_table.c.event_id == AssetEvent.id) + sql_select(AssetEvent.id) .join(AssetModel, AssetEvent.asset_id == AssetModel.id) .where( or_( @@ -805,18 +804,16 @@ def to_orm(self, select: Select) -> Select: ) ) .distinct() - .subquery() ) - return ( - select.distinct() - .join( - association_table, - DagRun.id == association_table.c.dag_run_id, - ) - .where(association_table.c.event_id.in_(sql_select(event_subquery.c.event_id))) + dagrun_subquery = ( + sql_select(association_table.c.dag_run_id) + .where(association_table.c.event_id.in_(event_subquery)) + .distinct() ) + return select.where(DagRun.id.in_(dagrun_subquery)) + @classmethod def depends( cls, From f32dff3c883e34df0b1e18072a3a311f561e8763 Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Thu, 26 Mar 2026 02:16:00 +0800 Subject: [PATCH 11/13] Rebase and fix _ConsumingAssetFilter --- airflow-core/src/airflow/api_fastapi/common/parameters.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index dd96d721bd37b..065dff8cda8bb 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -791,7 +791,7 @@ class _ConsumingAssetFilter(BaseParam[str | None]): """Filter DAG runs by consuming asset (name or URI).""" def to_orm(self, select: Select) -> Select: - if self.value is None and self.skip_none: + if not self.value and self.skip_none: return select event_subquery = ( From 84a0285b4b4a1cb419a68a18e76758c06bc80833 Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Thu, 26 Mar 2026 14:08:03 +0800 Subject: [PATCH 12/13] Trigger CI From 2c342ed4fa84241d92bdfe689fc6d9d751af7564 Mon Sep 17 00:00:00 2001 From: VincentHsiao Date: Thu, 2 Apr 2026 01:28:48 +0800 Subject: [PATCH 13/13] add consumingAsset and filters.searchAsset to en/common.json --- airflow-core/src/airflow/ui/public/i18n/locales/en/common.json | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index f0ccd787a2633..e44f9cc3856e1 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -28,6 +28,7 @@ }, "collapseAllExtra": "Collapse all extra JSON", "collapseDetailsPanel": "Collapse Details Panel", + "consumingAsset": "Consuming Asset", "createdAssetEvent_one": "Created Asset Event", "createdAssetEvent_other": "Created Asset Events", "dag_one": "Dag", @@ -131,6 +132,7 @@ "logicalDateTo": "Logical Date To", "runAfterFrom": "Run After From", "runAfterTo": "Run After To", + "searchAsset": "Search Asset", "selectDateRange": "Select Date Range", "startTime": "Start Time" },