Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions airflow-ctl/src/airflowctl/api/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,32 +608,33 @@ def list(
dag_id: str | None = None,
) -> DAGRunCollectionResponse | ServerResponseError:
"""
List all dag runs.
List dag runs (at most `limit` results).

Args:
state: Filter dag runs by state
start_date: Filter dag runs by start date (optional)
end_date: Filter dag runs by end date (optional)
state: Filter dag runs by state
limit: Limit the number of results
limit: Limit the number of results returned
dag_id: The DAG ID to filter by. If None, retrieves dag runs for all DAGs (using "~").
"""
# Use "~" for all DAGs if dag_id is not specified
if not dag_id:
dag_id = "~"

params: dict[str, object] = {
"state": state,
params: dict[str, Any] = {
"state": str(state),
"limit": limit,
}
if start_date is not None:
params["start_date"] = start_date
params["start_date"] = start_date.isoformat()
if end_date is not None:
params["end_date"] = end_date
params["end_date"] = end_date.isoformat()

return super().execute_list(
path=f"/dags/{dag_id}/dagRuns", data_model=DAGRunCollectionResponse, params=params
)
try:
self.response = self.client.get(f"/dags/{dag_id}/dagRuns", params=params)
return DAGRunCollectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e


class JobsOperations(BaseOperations):
Expand Down
81 changes: 50 additions & 31 deletions airflow-ctl/tests/airflow_ctl/api/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1064,44 +1064,63 @@ def handle_request(request: httpx.Request) -> httpx.Response:
)
assert response == self.dag_run_collection_response

def test_list_all_dags(self):
"""Test listing dag runs for all DAGs using default dag_id='~'."""

def handle_request(request: httpx.Request) -> httpx.Response:
# When dag_id is "~", it should query all DAGs
assert request.url.path == "/api/v2/dags/~/dagRuns"
@pytest.mark.parametrize(
(
"dag_id_input",
"state",
"limit",
"start_date",
"end_date",
"expected_path_suffix",
"expected_params_subset",
),
[
# Test --limit with various values and configurations (covers CLI --limit flag)
("dag1", "queued", 5, None, None, "dag1", {"state": "queued", "limit": "5"}),
(None, "running", 1, None, None, "~", {"state": "running", "limit": "1"}),
(
"example_dag",
"success",
10,
None,
None,
"example_dag",
{"state": "success", "limit": "10"},
),
("dag2", "failed", 0, None, None, "dag2", {"state": "failed", "limit": "0"}),
],
ids=["limit-5", "all-dags-limit-1", "string-state-limit-10", "limit-zero"],
)
def test_list_with_various_limits(
self,
dag_id_input: str | None,
state: str | DagRunState,
limit: int,
start_date: datetime.datetime | None,
end_date: datetime.datetime | None,
expected_path_suffix: str,
expected_params_subset: dict,
) -> None:
"""Test listing dag runs with various limit values (especially --limit flag)."""

def handle_request(request: httpx.Request) -> httpx.Response:
assert request.url.path.endswith(f"/dags/{expected_path_suffix}/dagRuns")
params = dict(request.url.params)
for key, value in expected_params_subset.items():
assert key in params
assert str(params[key]) == str(value)
return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json()))

client = make_api_client(transport=httpx.MockTransport(handle_request))
# Call without specifying dag_id - should use default "~"
response = client.dag_runs.list(
start_date=datetime.datetime(2025, 1, 1, 0, 0, 0),
end_date=datetime.datetime(2025, 1, 1, 0, 0, 0),
state="running",
limit=1,
state=state,
limit=limit,
start_date=start_date,
end_date=end_date,
dag_id=dag_id_input,
)
assert response == self.dag_run_collection_response

def test_list_with_optional_parameters(self):
"""Test listing dag runs with only some optional parameters."""

def handle_request(request: httpx.Request) -> httpx.Response:
assert request.url.path == "/api/v2/dags/dag1/dagRuns"
# Verify that only state and limit are in query params
params = dict(request.url.params)
assert "state" in params
assert params["state"] == "queued"
assert "limit" in params
assert params["limit"] == "5"
# start_date and end_date should not be present
assert "start_date" not in params
assert "end_date" not in params
return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json()))

client = make_api_client(transport=httpx.MockTransport(handle_request))
response = client.dag_runs.list(state="queued", limit=5, dag_id="dag1")
assert response == self.dag_run_collection_response


class TestJobsOperations:
job_response = JobResponse(
Expand Down
Loading