From 6f530d7aca379647b64aae26878790392536980f Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Sat, 12 Feb 2022 03:04:43 +0530 Subject: [PATCH 1/6] Added template_ext = ('.json') to databricks operators #18925 reference: #18925 --- airflow/providers/databricks/operators/databricks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py index 3e3d64adc36ce..715c5041c6a41 100644 --- a/airflow/providers/databricks/operators/databricks.py +++ b/airflow/providers/databricks/operators/databricks.py @@ -246,6 +246,7 @@ class DatabricksSubmitRunOperator(BaseOperator): # Used in airflow.models.BaseOperator template_fields: Sequence[str] = ('json',) + template_ext: Sequence[str] = ('json',) # Databricks brand color (blue) under white text ui_color = '#1CB1C2' ui_fgcolor = '#fff' @@ -479,6 +480,7 @@ class DatabricksRunNowOperator(BaseOperator): # Used in airflow.models.BaseOperator template_fields: Sequence[str] = ('json',) + template_ext: Sequence[str] = ('json',) # Databricks brand color (blue) under white text ui_color = '#1CB1C2' ui_fgcolor = '#fff' From b5a2211d0fb2e1475dc96c30473100f0c6e969a0 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Sat, 12 Feb 2022 10:06:34 +0530 Subject: [PATCH 2/6] Corrected the template_ext value from 'json' to '.json' --- airflow/providers/databricks/operators/databricks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py index 715c5041c6a41..26c330815daa5 100644 --- a/airflow/providers/databricks/operators/databricks.py +++ b/airflow/providers/databricks/operators/databricks.py @@ -246,7 +246,7 @@ class DatabricksSubmitRunOperator(BaseOperator): # Used in airflow.models.BaseOperator template_fields: Sequence[str] = ('json',) - template_ext: Sequence[str] = ('json',) + template_ext: Sequence[str] = ('.json',) # Databricks brand color (blue) under white text ui_color = '#1CB1C2' ui_fgcolor = '#fff' @@ -480,7 +480,7 @@ class DatabricksRunNowOperator(BaseOperator): # Used in airflow.models.BaseOperator template_fields: Sequence[str] = ('json',) - template_ext: Sequence[str] = ('json',) + template_ext: Sequence[str] = ('.json',) # Databricks brand color (blue) under white text ui_color = '#1CB1C2' ui_fgcolor = '#fff' From c8dd3826f7f0fa90e8993b804e02b60d5eaeab36 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Sun, 13 Feb 2022 23:40:12 +0530 Subject: [PATCH 3/6] Added retries to LivyHook #19384 --- airflow/providers/apache/livy/hooks/livy.py | 20 ++++++++++++++++--- .../providers/apache/livy/operators/livy.py | 8 ++++++-- .../apache/livy/operators/test_livy.py | 12 +++++------ 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/airflow/providers/apache/livy/hooks/livy.py b/airflow/providers/apache/livy/hooks/livy.py index 865218c51e7b8..7f519ca0000c7 100644 --- a/airflow/providers/apache/livy/hooks/livy.py +++ b/airflow/providers/apache/livy/hooks/livy.py @@ -99,6 +99,7 @@ def run_method( method: str = 'GET', data: Optional[Any] = None, headers: Optional[Dict[str, Any]] = None, + _retry_args: Optional[Dict[str, Any]] = None ) -> Any: """ Wrapper for HttpHook, allows to change method on the same HttpHook @@ -107,6 +108,8 @@ def run_method( :param endpoint: endpoint :param data: request payload :param headers: headers + :param _retry_args: Arguments which define the retry behaviour. + See Tenacity documentation at https://github.com/jd/tenacity :return: http response :rtype: requests.Response """ @@ -118,7 +121,16 @@ def run_method( back_method = self.method self.method = method try: - result = self.run(endpoint, data, headers, self.extra_options) + if _retry_args: + result = self.run_with_advanced_retry( + endpoint=endpoint, + data=data, + headers=headers, + extra_options=self.extra_options, + _retry_args=_retry_args) + else: + result = self.run(endpoint, data, headers, self.extra_options) + finally: self.method = back_method return result @@ -180,18 +192,20 @@ def get_batch(self, session_id: Union[int, str]) -> Any: return response.json() - def get_batch_state(self, session_id: Union[int, str]) -> BatchState: + def get_batch_state(self, session_id: Union[int, str], _retry_args: Optional[Dict[str, Any]] = None) -> BatchState: """ Fetch the state of the specified batch :param session_id: identifier of the batch sessions + :param _retry_args: Arguments which define the retry behaviour. + See Tenacity documentation at https://github.com/jd/tenacity :return: batch state :rtype: BatchState """ self._validate_session_id(session_id) self.log.debug("Fetching info for batch session %d", session_id) - response = self.run_method(endpoint=f'/batches/{session_id}/state') + response = self.run_method(endpoint=f'/batches/{session_id}/state', _retry_args=_retry_args) try: response.raise_for_status() diff --git a/airflow/providers/apache/livy/operators/livy.py b/airflow/providers/apache/livy/operators/livy.py index 3b0a2bb93277c..7db60c26d1ddd 100644 --- a/airflow/providers/apache/livy/operators/livy.py +++ b/airflow/providers/apache/livy/operators/livy.py @@ -53,6 +53,8 @@ class LivyOperator(BaseOperator): :param extra_options: A dictionary of options, where key is string and value depends on the option that's being modified. :param extra_headers: A dictionary of headers passed to the HTTP request to livy. + :param _retry_args: Arguments which define the retry behaviour. + See Tenacity documentation at https://github.com/jd/tenacity """ template_fields: Sequence[str] = ('spark_params',) @@ -80,6 +82,7 @@ def __init__( polling_interval: int = 0, extra_options: Optional[Dict[str, Any]] = None, extra_headers: Optional[Dict[str, Any]] = None, + _retry_args: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: @@ -111,6 +114,7 @@ def __init__( self._livy_hook: Optional[LivyHook] = None self._batch_id: Union[int, str] + self._retry_args = _retry_args def get_hook(self) -> LivyHook: """ @@ -142,11 +146,11 @@ def poll_for_termination(self, batch_id: Union[int, str]) -> None: :param batch_id: id of the batch session to monitor. """ hook = self.get_hook() - state = hook.get_batch_state(batch_id) + state = hook.get_batch_state(batch_id, _retry_args=self._retry_args) while state not in hook.TERMINAL_STATES: self.log.debug('Batch with id %s is in state: %s', batch_id, state.value) sleep(self._polling_interval) - state = hook.get_batch_state(batch_id) + state = hook.get_batch_state(batch_id, _retry_args=self._retry_args) self.log.info("Batch with id %s terminated with state: %s", batch_id, state.value) hook.dump_batch_logs(batch_id) if state != BatchState.SUCCESS: diff --git a/tests/providers/apache/livy/operators/test_livy.py b/tests/providers/apache/livy/operators/test_livy.py index 8038f32125ec7..153f5a9644240 100644 --- a/tests/providers/apache/livy/operators/test_livy.py +++ b/tests/providers/apache/livy/operators/test_livy.py @@ -55,7 +55,7 @@ def test_poll_for_termination(self, mock_livy, mock_dump_logs): state_list = 2 * [BatchState.RUNNING] + [BatchState.SUCCESS] - def side_effect(_): + def side_effect(_, _retry_args): if state_list: return state_list.pop(0) # fail if does not stop right before @@ -67,7 +67,7 @@ def side_effect(_): task._livy_hook = task.get_hook() task.poll_for_termination(BATCH_ID) - mock_livy.assert_called_with(BATCH_ID) + mock_livy.assert_called_with(BATCH_ID, _retry_args=None) mock_dump_logs.assert_called_with(BATCH_ID) assert mock_livy.call_count == 3 @@ -80,7 +80,7 @@ def test_poll_for_termination_fail(self, mock_livy, mock_dump_logs): state_list = 2 * [BatchState.RUNNING] + [BatchState.ERROR] - def side_effect(_): + def side_effect(_, _retry_args): if state_list: return state_list.pop(0) # fail if does not stop right before @@ -94,7 +94,7 @@ def side_effect(_): with pytest.raises(AirflowException): task.poll_for_termination(BATCH_ID) - mock_livy.assert_called_with(BATCH_ID) + mock_livy.assert_called_with(BATCH_ID, _retry_args=None) mock_dump_logs.assert_called_with(BATCH_ID) assert mock_livy.call_count == 3 @@ -119,7 +119,7 @@ def test_execution(self, mock_post, mock_get, mock_dump_logs): call_args = {k: v for k, v in mock_post.call_args[1].items() if v} assert call_args == {'file': 'sparkapp'} - mock_get.assert_called_once_with(BATCH_ID) + mock_get.assert_called_once_with(BATCH_ID, _retry_args=None) mock_dump_logs.assert_called_once_with(BATCH_ID) @patch('airflow.providers.apache.livy.operators.livy.LivyHook.post_batch') @@ -171,5 +171,5 @@ def test_log_dump(self, mock_post, mock_get_logs, mock_get): assert 'INFO:airflow.providers.apache.livy.hooks.livy.LivyHook:first_line' in cm.output assert 'INFO:airflow.providers.apache.livy.hooks.livy.LivyHook:second_line' in cm.output assert 'INFO:airflow.providers.apache.livy.hooks.livy.LivyHook:third_line' in cm.output - mock_get.assert_called_once_with(BATCH_ID) + mock_get.assert_called_once_with(BATCH_ID, _retry_args=None) mock_get_logs.assert_called_once_with(BATCH_ID, 0, 100) From a7b58b8ead47b507df44be466adb451680bdde2b Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 14 Feb 2022 01:19:37 +0530 Subject: [PATCH 4/6] Static code fixes. --- airflow/providers/apache/livy/hooks/livy.py | 9 ++++++--- airflow/providers/apache/livy/operators/livy.py | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/airflow/providers/apache/livy/hooks/livy.py b/airflow/providers/apache/livy/hooks/livy.py index 7f519ca0000c7..9898073d59e94 100644 --- a/airflow/providers/apache/livy/hooks/livy.py +++ b/airflow/providers/apache/livy/hooks/livy.py @@ -99,7 +99,7 @@ def run_method( method: str = 'GET', data: Optional[Any] = None, headers: Optional[Dict[str, Any]] = None, - _retry_args: Optional[Dict[str, Any]] = None + _retry_args: Optional[Dict[str, Any]] = None, ) -> Any: """ Wrapper for HttpHook, allows to change method on the same HttpHook @@ -127,7 +127,8 @@ def run_method( data=data, headers=headers, extra_options=self.extra_options, - _retry_args=_retry_args) + _retry_args=_retry_args, + ) else: result = self.run(endpoint, data, headers, self.extra_options) @@ -192,7 +193,9 @@ def get_batch(self, session_id: Union[int, str]) -> Any: return response.json() - def get_batch_state(self, session_id: Union[int, str], _retry_args: Optional[Dict[str, Any]] = None) -> BatchState: + def get_batch_state( + self, session_id: Union[int, str], _retry_args: Optional[Dict[str, Any]] = None + ) -> BatchState: """ Fetch the state of the specified batch diff --git a/airflow/providers/apache/livy/operators/livy.py b/airflow/providers/apache/livy/operators/livy.py index 7db60c26d1ddd..2d713c28d8db2 100644 --- a/airflow/providers/apache/livy/operators/livy.py +++ b/airflow/providers/apache/livy/operators/livy.py @@ -82,7 +82,7 @@ def __init__( polling_interval: int = 0, extra_options: Optional[Dict[str, Any]] = None, extra_headers: Optional[Dict[str, Any]] = None, - _retry_args: Optional[Dict[str, Any]] = None, + _retry_args: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: From eb6543f63f894389a9c44163861ee77e444140cd Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 14 Feb 2022 21:47:18 +0530 Subject: [PATCH 5/6] Renamed _retry_args to retry_args. --- airflow/providers/apache/livy/hooks/livy.py | 14 +++++++------- airflow/providers/apache/livy/operators/livy.py | 10 +++++----- tests/providers/apache/livy/operators/test_livy.py | 12 ++++++------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/airflow/providers/apache/livy/hooks/livy.py b/airflow/providers/apache/livy/hooks/livy.py index 9898073d59e94..296a493ef363e 100644 --- a/airflow/providers/apache/livy/hooks/livy.py +++ b/airflow/providers/apache/livy/hooks/livy.py @@ -99,7 +99,7 @@ def run_method( method: str = 'GET', data: Optional[Any] = None, headers: Optional[Dict[str, Any]] = None, - _retry_args: Optional[Dict[str, Any]] = None, + retry_args: Optional[Dict[str, Any]] = None, ) -> Any: """ Wrapper for HttpHook, allows to change method on the same HttpHook @@ -108,7 +108,7 @@ def run_method( :param endpoint: endpoint :param data: request payload :param headers: headers - :param _retry_args: Arguments which define the retry behaviour. + :param retry_args: Arguments which define the retry behaviour. See Tenacity documentation at https://github.com/jd/tenacity :return: http response :rtype: requests.Response @@ -121,13 +121,13 @@ def run_method( back_method = self.method self.method = method try: - if _retry_args: + if retry_args: result = self.run_with_advanced_retry( endpoint=endpoint, data=data, headers=headers, extra_options=self.extra_options, - _retry_args=_retry_args, + retry_args=retry_args, ) else: result = self.run(endpoint, data, headers, self.extra_options) @@ -194,13 +194,13 @@ def get_batch(self, session_id: Union[int, str]) -> Any: return response.json() def get_batch_state( - self, session_id: Union[int, str], _retry_args: Optional[Dict[str, Any]] = None + self, session_id: Union[int, str], retry_args: Optional[Dict[str, Any]] = None ) -> BatchState: """ Fetch the state of the specified batch :param session_id: identifier of the batch sessions - :param _retry_args: Arguments which define the retry behaviour. + :param retry_args: Arguments which define the retry behaviour. See Tenacity documentation at https://github.com/jd/tenacity :return: batch state :rtype: BatchState @@ -208,7 +208,7 @@ def get_batch_state( self._validate_session_id(session_id) self.log.debug("Fetching info for batch session %d", session_id) - response = self.run_method(endpoint=f'/batches/{session_id}/state', _retry_args=_retry_args) + response = self.run_method(endpoint=f'/batches/{session_id}/state', retry_args=retry_args) try: response.raise_for_status() diff --git a/airflow/providers/apache/livy/operators/livy.py b/airflow/providers/apache/livy/operators/livy.py index 2d713c28d8db2..f0dbc9e3165fe 100644 --- a/airflow/providers/apache/livy/operators/livy.py +++ b/airflow/providers/apache/livy/operators/livy.py @@ -53,7 +53,7 @@ class LivyOperator(BaseOperator): :param extra_options: A dictionary of options, where key is string and value depends on the option that's being modified. :param extra_headers: A dictionary of headers passed to the HTTP request to livy. - :param _retry_args: Arguments which define the retry behaviour. + :param retry_args: Arguments which define the retry behaviour. See Tenacity documentation at https://github.com/jd/tenacity """ @@ -82,7 +82,7 @@ def __init__( polling_interval: int = 0, extra_options: Optional[Dict[str, Any]] = None, extra_headers: Optional[Dict[str, Any]] = None, - _retry_args: Optional[Dict[str, Any]] = None, + retry_args: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: @@ -114,7 +114,7 @@ def __init__( self._livy_hook: Optional[LivyHook] = None self._batch_id: Union[int, str] - self._retry_args = _retry_args + self.retry_args = retry_args def get_hook(self) -> LivyHook: """ @@ -146,11 +146,11 @@ def poll_for_termination(self, batch_id: Union[int, str]) -> None: :param batch_id: id of the batch session to monitor. """ hook = self.get_hook() - state = hook.get_batch_state(batch_id, _retry_args=self._retry_args) + state = hook.get_batch_state(batch_id, retry_args=self.retry_args) while state not in hook.TERMINAL_STATES: self.log.debug('Batch with id %s is in state: %s', batch_id, state.value) sleep(self._polling_interval) - state = hook.get_batch_state(batch_id, _retry_args=self._retry_args) + state = hook.get_batch_state(batch_id, retry_args=self.retry_args) self.log.info("Batch with id %s terminated with state: %s", batch_id, state.value) hook.dump_batch_logs(batch_id) if state != BatchState.SUCCESS: diff --git a/tests/providers/apache/livy/operators/test_livy.py b/tests/providers/apache/livy/operators/test_livy.py index 153f5a9644240..f1c43bf5f1759 100644 --- a/tests/providers/apache/livy/operators/test_livy.py +++ b/tests/providers/apache/livy/operators/test_livy.py @@ -55,7 +55,7 @@ def test_poll_for_termination(self, mock_livy, mock_dump_logs): state_list = 2 * [BatchState.RUNNING] + [BatchState.SUCCESS] - def side_effect(_, _retry_args): + def side_effect(_, retry_args): if state_list: return state_list.pop(0) # fail if does not stop right before @@ -67,7 +67,7 @@ def side_effect(_, _retry_args): task._livy_hook = task.get_hook() task.poll_for_termination(BATCH_ID) - mock_livy.assert_called_with(BATCH_ID, _retry_args=None) + mock_livy.assert_called_with(BATCH_ID, retry_args=None) mock_dump_logs.assert_called_with(BATCH_ID) assert mock_livy.call_count == 3 @@ -80,7 +80,7 @@ def test_poll_for_termination_fail(self, mock_livy, mock_dump_logs): state_list = 2 * [BatchState.RUNNING] + [BatchState.ERROR] - def side_effect(_, _retry_args): + def side_effect(_, retry_args): if state_list: return state_list.pop(0) # fail if does not stop right before @@ -94,7 +94,7 @@ def side_effect(_, _retry_args): with pytest.raises(AirflowException): task.poll_for_termination(BATCH_ID) - mock_livy.assert_called_with(BATCH_ID, _retry_args=None) + mock_livy.assert_called_with(BATCH_ID, retry_args=None) mock_dump_logs.assert_called_with(BATCH_ID) assert mock_livy.call_count == 3 @@ -119,7 +119,7 @@ def test_execution(self, mock_post, mock_get, mock_dump_logs): call_args = {k: v for k, v in mock_post.call_args[1].items() if v} assert call_args == {'file': 'sparkapp'} - mock_get.assert_called_once_with(BATCH_ID, _retry_args=None) + mock_get.assert_called_once_with(BATCH_ID, retry_args=None) mock_dump_logs.assert_called_once_with(BATCH_ID) @patch('airflow.providers.apache.livy.operators.livy.LivyHook.post_batch') @@ -171,5 +171,5 @@ def test_log_dump(self, mock_post, mock_get_logs, mock_get): assert 'INFO:airflow.providers.apache.livy.hooks.livy.LivyHook:first_line' in cm.output assert 'INFO:airflow.providers.apache.livy.hooks.livy.LivyHook:second_line' in cm.output assert 'INFO:airflow.providers.apache.livy.hooks.livy.LivyHook:third_line' in cm.output - mock_get.assert_called_once_with(BATCH_ID, _retry_args=None) + mock_get.assert_called_once_with(BATCH_ID, retry_args=None) mock_get_logs.assert_called_once_with(BATCH_ID, 0, 100) From 3e86860147154c472c6c44ee92736d888e26b483 Mon Sep 17 00:00:00 2001 From: utkarsh sharma Date: Mon, 14 Feb 2022 23:12:01 +0530 Subject: [PATCH 6/6] Passed _retry_args in run_with_advanced_retry(). --- airflow/providers/apache/livy/hooks/livy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/apache/livy/hooks/livy.py b/airflow/providers/apache/livy/hooks/livy.py index 296a493ef363e..e2d084ec61f78 100644 --- a/airflow/providers/apache/livy/hooks/livy.py +++ b/airflow/providers/apache/livy/hooks/livy.py @@ -127,7 +127,7 @@ def run_method( data=data, headers=headers, extra_options=self.extra_options, - retry_args=retry_args, + _retry_args=retry_args, ) else: result = self.run(endpoint, data, headers, self.extra_options)