From 64b5fb9ecef1f90fc4b3127b9c289fa542ced3c0 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Thu, 23 Jun 2022 18:19:48 +0530 Subject: [PATCH 1/3] Add test_connection method to Databricks hook --- .../providers/databricks/hooks/databricks.py | 18 ++++++++ .../databricks/hooks/test_databricks.py | 44 ++++++++++++++++++- 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index 400bbe895588a..b875d5bc60d84 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -52,6 +52,8 @@ RUN_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'TERMINATING', 'TERMINATED', 'SKIPPED', 'INTERNAL_ERROR'] +LIST_ZONES_ENDPOINT = ('GET', 'api/2.0/clusters/list-zones') + class RunState: """Utility class for the run state concept of Databricks runs.""" @@ -408,3 +410,19 @@ def get_repo_by_path(self, path: str) -> Optional[str]: raise e return None + + def test_connection(self): + """Test the Databricks connectivity from UI""" + status, message = False, '' + hook = DatabricksHook(databricks_conn_id=self.databricks_conn_id) + try: + result = hook._do_api_call(endpoint_info=LIST_ZONES_ENDPOINT).get('zones', []) + if result: + self.log.info(result) + status = True + message = 'Connection successfully tested' + except Exception as e: + status = False + message = str(e) + + return status, message diff --git a/tests/providers/databricks/hooks/test_databricks.py b/tests/providers/databricks/hooks/test_databricks.py index 2997984f7b90b..71098367b0b7e 100644 --- a/tests/providers/databricks/hooks/test_databricks.py +++ b/tests/providers/databricks/hooks/test_databricks.py @@ -104,6 +104,7 @@ ], 'has_more': False, } +LIST_ZONES_RESPONSE = {'zones': ['us-east-2b', 'us-east-2c', 'us-east-2a'], 'default_zone': 'us-east-2b'} def run_now_endpoint(host): @@ -178,11 +179,18 @@ def uninstall_endpoint(host): def list_jobs_endpoint(host): """ - Utility function to generate the list jobs endpoint giver the host + Utility function to generate the list jobs endpoint given the host """ return f'https://{host}/api/2.1/jobs/list' +def list_zones_endpoint(host): + """ + Utility function to generate the list zones endpoint given the host + """ + return f'https://{host}/api/2.0/clusters/list-zones' + + def create_valid_response_mock(content): response = mock.MagicMock() response.json.return_value = content @@ -706,6 +714,40 @@ def test_get_job_id_by_name_raise_exception_with_duplicates(self, mock_requests) timeout=self.hook.timeout_seconds, ) + @mock.patch('airflow.providers.databricks.hooks.databricks_base.requests') + def test_connection_success(self, mock_requests): + mock_requests.codes.ok = 200 + mock_requests.get.return_value.json.return_value = LIST_ZONES_RESPONSE + status_code_mock = mock.PropertyMock(return_value=200) + type(mock_requests.get.return_value).status_code = status_code_mock + response = self.hook.test_connection() + assert response == (True, 'Connection successfully tested') + mock_requests.get.assert_called_once_with( + list_zones_endpoint(HOST), + json=None, + params=None, + auth=HTTPBasicAuth(LOGIN, PASSWORD), + headers=USER_AGENT_HEADER, + timeout=self.hook.timeout_seconds, + ) + + @mock.patch('airflow.providers.databricks.hooks.databricks_base.requests') + def test_connection_failure(self, mock_requests): + mock_requests.codes.ok = 404 + mock_requests.get.side_effect = Exception('Connection Failure') + status_code_mock = mock.PropertyMock(return_value=404) + type(mock_requests.get.return_value).status_code = status_code_mock + response = self.hook.test_connection() + assert response == (False, 'Connection Failure') + mock_requests.get.assert_called_once_with( + list_zones_endpoint(HOST), + json=None, + params=None, + auth=HTTPBasicAuth(LOGIN, PASSWORD), + headers=USER_AGENT_HEADER, + timeout=self.hook.timeout_seconds, + ) + class TestDatabricksHookToken(unittest.TestCase): """ From 7def13e7098f0b443be5c59354aa258df92de5e7 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Fri, 24 Jun 2022 13:20:05 +0530 Subject: [PATCH 2/3] Apply review suggestions --- airflow/providers/databricks/hooks/databricks.py | 1 - tests/providers/databricks/hooks/test_databricks.py | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index b875d5bc60d84..f2389eb501aa9 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -418,7 +418,6 @@ def test_connection(self): try: result = hook._do_api_call(endpoint_info=LIST_ZONES_ENDPOINT).get('zones', []) if result: - self.log.info(result) status = True message = 'Connection successfully tested' except Exception as e: diff --git a/tests/providers/databricks/hooks/test_databricks.py b/tests/providers/databricks/hooks/test_databricks.py index 71098367b0b7e..8599ac9918622 100644 --- a/tests/providers/databricks/hooks/test_databricks.py +++ b/tests/providers/databricks/hooks/test_databricks.py @@ -185,9 +185,7 @@ def list_jobs_endpoint(host): def list_zones_endpoint(host): - """ - Utility function to generate the list zones endpoint given the host - """ + """Utility function to generate the list zones endpoint given the host""" return f'https://{host}/api/2.0/clusters/list-zones' From 280ac3144900bfc35865744d6c7ab960e3756eca Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Fri, 24 Jun 2022 21:17:25 +0530 Subject: [PATCH 3/3] Apply review suggestions --- airflow/providers/databricks/hooks/databricks.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index f2389eb501aa9..33b4aeb8f78f5 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -413,13 +413,11 @@ def get_repo_by_path(self, path: str) -> Optional[str]: def test_connection(self): """Test the Databricks connectivity from UI""" - status, message = False, '' hook = DatabricksHook(databricks_conn_id=self.databricks_conn_id) try: - result = hook._do_api_call(endpoint_info=LIST_ZONES_ENDPOINT).get('zones', []) - if result: - status = True - message = 'Connection successfully tested' + hook._do_api_call(endpoint_info=LIST_ZONES_ENDPOINT).get('zones') + status = True + message = 'Connection successfully tested' except Exception as e: status = False message = str(e)