From c5109c85ceade0fe72cfbaf183a8f8e4b60a3cc1 Mon Sep 17 00:00:00 2001 From: Henry Chen Date: Mon, 23 Mar 2026 01:18:30 +0800 Subject: [PATCH 1/2] fix dagrun list limit --- airflow-ctl/src/airflowctl/api/operations.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index 64424eff9c557..7793f67cdff6c 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -608,32 +608,33 @@ def list( dag_id: str | None = None, ) -> DAGRunCollectionResponse | ServerResponseError: """ - List all dag runs. + List dag runs (at most `limit` results). Args: state: Filter dag runs by state start_date: Filter dag runs by start date (optional) end_date: Filter dag runs by end date (optional) - state: Filter dag runs by state - limit: Limit the number of results + limit: Limit the number of results returned dag_id: The DAG ID to filter by. If None, retrieves dag runs for all DAGs (using "~"). """ # Use "~" for all DAGs if dag_id is not specified if not dag_id: dag_id = "~" - params: dict[str, object] = { + params: dict[str, Any] = { "state": state, "limit": limit, } if start_date is not None: - params["start_date"] = start_date + params["start_date"] = start_date.isoformat() if end_date is not None: - params["end_date"] = end_date + params["end_date"] = end_date.isoformat() - return super().execute_list( - path=f"/dags/{dag_id}/dagRuns", data_model=DAGRunCollectionResponse, params=params - ) + try: + self.response = self.client.get(f"/dags/{dag_id}/dagRuns", params=params) + return DAGRunCollectionResponse.model_validate_json(self.response.content) + except ServerResponseError as e: + raise e class JobsOperations(BaseOperations): From 459e24ae0d24776e8d6544f7a8524b0e55d1cf5b Mon Sep 17 00:00:00 2001 From: Henry Chen Date: Mon, 23 Mar 2026 23:39:24 +0800 Subject: [PATCH 2/2] add unit test for limit parameter --- airflow-ctl/src/airflowctl/api/operations.py | 2 +- .../tests/airflow_ctl/api/test_operations.py | 81 ++++++++++++------- 2 files changed, 51 insertions(+), 32 deletions(-) diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index 7793f67cdff6c..9c492022221bb 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -622,7 +622,7 @@ def list( dag_id = "~" params: dict[str, Any] = { - "state": state, + "state": str(state), "limit": limit, } if start_date is not None: diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index f75a3c9678fd2..8254409863261 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -1064,44 +1064,63 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) assert response == self.dag_run_collection_response - def test_list_all_dags(self): - """Test listing dag runs for all DAGs using default dag_id='~'.""" - - def handle_request(request: httpx.Request) -> httpx.Response: - # When dag_id is "~", it should query all DAGs - assert request.url.path == "/api/v2/dags/~/dagRuns" + @pytest.mark.parametrize( + ( + "dag_id_input", + "state", + "limit", + "start_date", + "end_date", + "expected_path_suffix", + "expected_params_subset", + ), + [ + # Test --limit with various values and configurations (covers CLI --limit flag) + ("dag1", "queued", 5, None, None, "dag1", {"state": "queued", "limit": "5"}), + (None, "running", 1, None, None, "~", {"state": "running", "limit": "1"}), + ( + "example_dag", + "success", + 10, + None, + None, + "example_dag", + {"state": "success", "limit": "10"}, + ), + ("dag2", "failed", 0, None, None, "dag2", {"state": "failed", "limit": "0"}), + ], + ids=["limit-5", "all-dags-limit-1", "string-state-limit-10", "limit-zero"], + ) + def test_list_with_various_limits( + self, + dag_id_input: str | None, + state: str | DagRunState, + limit: int, + start_date: datetime.datetime | None, + end_date: datetime.datetime | None, + expected_path_suffix: str, + expected_params_subset: dict, + ) -> None: + """Test listing dag runs with various limit values (especially --limit flag).""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path.endswith(f"/dags/{expected_path_suffix}/dagRuns") + params = dict(request.url.params) + for key, value in expected_params_subset.items(): + assert key in params + assert str(params[key]) == str(value) return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) - # Call without specifying dag_id - should use default "~" response = client.dag_runs.list( - start_date=datetime.datetime(2025, 1, 1, 0, 0, 0), - end_date=datetime.datetime(2025, 1, 1, 0, 0, 0), - state="running", - limit=1, + state=state, + limit=limit, + start_date=start_date, + end_date=end_date, + dag_id=dag_id_input, ) assert response == self.dag_run_collection_response - def test_list_with_optional_parameters(self): - """Test listing dag runs with only some optional parameters.""" - - def handle_request(request: httpx.Request) -> httpx.Response: - assert request.url.path == "/api/v2/dags/dag1/dagRuns" - # Verify that only state and limit are in query params - params = dict(request.url.params) - assert "state" in params - assert params["state"] == "queued" - assert "limit" in params - assert params["limit"] == "5" - # start_date and end_date should not be present - assert "start_date" not in params - assert "end_date" not in params - return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json())) - - client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.dag_runs.list(state="queued", limit=5, dag_id="dag1") - assert response == self.dag_run_collection_response - class TestJobsOperations: job_response = JobResponse(