diff --git a/airflow/providers/http/operators/http.py b/airflow/providers/http/operators/http.py index 216de77e311ed..96415ed9771c9 100644 --- a/airflow/providers/http/operators/http.py +++ b/airflow/providers/http/operators/http.py @@ -53,14 +53,15 @@ class HttpOperator(BaseOperator): :param data: The data to pass. POST-data in POST/PUT and params in the URL for a GET request. (templated) :param headers: The HTTP headers to be added to the GET request - :param pagination_function: A callable that generates the parameters used to call the API again. - Typically used when the API is paginated and returns for e.g a cursor, a 'next page id', or - a 'next page URL'. When provided, the Operator will call the API repeatedly until this callable - returns None. Also, the result of the Operator will become by default a list of Response.text - objects (instead of a single response object). Same with the other injected functions (like - response_check, response_filter, ...) which will also receive a list of Response object. This - function should return a dict of parameters (`endpoint`, `data`, `headers`, `extra_options`), - which will be merged and override the one used in the initial API call. + :param pagination_function: A callable that generates the parameters used to call the API again, + based on the previous response. Typically used when the API is paginated and returns for e.g a + cursor, a 'next page id', or a 'next page URL'. When provided, the Operator will call the API + repeatedly until this callable returns None. Also, the result of the Operator will become by + default a list of Response.text objects (instead of a single response object). Same with the + other injected functions (like response_check, response_filter, ...) which will also receive a + list of Response object. This function receives a Response object form previous call, and should + return a dict of parameters (`endpoint`, `data`, `headers`, `extra_options`), which will be merged + and will override the one used in the initial API call. :param response_check: A check against the 'requests' response object. The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. @@ -162,16 +163,16 @@ def execute(self, context: Context) -> Any: def execute_sync(self, context: Context) -> Any: self.log.info("Calling HTTP method") response = self.hook.run(self.endpoint, self.data, self.headers, self.extra_options) - response = self.paginate_sync(first_response=response) + response = self.paginate_sync(response=response) return self.process_response(context=context, response=response) - def paginate_sync(self, first_response: Response) -> Response | list[Response]: + def paginate_sync(self, response: Response) -> Response | list[Response]: if not self.pagination_function: - return first_response + return response - all_responses = [first_response] + all_responses = [response] while True: - next_page_params = self.pagination_function(first_response) + next_page_params = self.pagination_function(response) if not next_page_params: break response = self.hook.run(**self._merge_next_page_parameters(next_page_params)) diff --git a/tests/providers/http/operators/test_http.py b/tests/providers/http/operators/test_http.py index a57af7d764293..451cd93d44144 100644 --- a/tests/providers/http/operators/test_http.py +++ b/tests/providers/http/operators/test_http.py @@ -118,31 +118,33 @@ def test_paginated_responses(self, requests_mock): pagination_function is provided, and as long as this function returns a dictionary that override previous' call parameters. """ - has_returned: bool = False + iterations: int = 0 def pagination_function(response: Response) -> dict | None: """Paginated function which returns None at the second call.""" - nonlocal has_returned - if not has_returned: - has_returned = True + nonlocal iterations + if iterations < 2: + iterations += 1 return dict( - endpoint="/", - data={"cursor": "example"}, + endpoint=response.json()["endpoint"], + data={}, headers={}, extra_options={}, ) return None - requests_mock.get("http://www.example.com", json={"value": 5}) + requests_mock.get("http://www.example.com/foo", json={"value": 5, "endpoint": "bar"}) + requests_mock.get("http://www.example.com/bar", json={"value": 10, "endpoint": "foo"}) operator = HttpOperator( task_id="test_HTTP_op", method="GET", - endpoint="/", + endpoint="/foo", http_conn_id="HTTP_EXAMPLE", pagination_function=pagination_function, + response_filter=lambda resp: [entry.json()["value"] for entry in resp], ) result = operator.execute({}) - assert result == ['{"value": 5}', '{"value": 5}'] + assert result == [5, 10, 5] def test_async_paginated_responses(self, requests_mock): """