From 289d38cb01f9b0a6d2a8725ce5cae31e444b42fa Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Fri, 14 Apr 2023 21:19:48 +0530 Subject: [PATCH 1/3] Add delete run endpoint --- .../providers/databricks/hooks/databricks.py | 11 +++++++++- .../databricks/hooks/test_databricks.py | 22 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index bb8a1dc88080e..7bbe140f562d1 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -38,11 +38,11 @@ RESTART_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/restart") START_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/start") TERMINATE_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/delete") - RUN_NOW_ENDPOINT = ("POST", "api/2.1/jobs/run-now") SUBMIT_RUN_ENDPOINT = ("POST", "api/2.1/jobs/runs/submit") GET_RUN_ENDPOINT = ("GET", "api/2.1/jobs/runs/get") CANCEL_RUN_ENDPOINT = ("POST", "api/2.1/jobs/runs/cancel") +DELETE_RUN_ENDPOINT = ("POST", "api/2.1/jobs/runs/delete") OUTPUT_RUNS_JOB_ENDPOINT = ("GET", "api/2.1/jobs/runs/get-output") INSTALL_LIBS_ENDPOINT = ("POST", "api/2.0/libraries/install") @@ -351,6 +351,15 @@ def cancel_run(self, run_id: int) -> None: json = {"run_id": run_id} self._do_api_call(CANCEL_RUN_ENDPOINT, json) + def delete_run(self, run_id: int) -> None: + """ + Deletes a non-active run. + + :param run_id: id of the run + """ + json = {"run_id": run_id} + self._do_api_call(DELETE_RUN_ENDPOINT, json) + def restart_cluster(self, json: dict) -> None: """ Restarts the cluster. diff --git a/tests/providers/databricks/hooks/test_databricks.py b/tests/providers/databricks/hooks/test_databricks.py index 895be832b4cd0..a4b0461671b7a 100644 --- a/tests/providers/databricks/hooks/test_databricks.py +++ b/tests/providers/databricks/hooks/test_databricks.py @@ -143,6 +143,13 @@ def cancel_run_endpoint(host): return f"https://{host}/api/2.1/jobs/runs/cancel" +def delete_run_endpoint(host): + """ + Utility function to generate delete run endpoint given the host. + """ + return f"https://{host}/api/2.1/jobs/runs/delete" + + def start_cluster_endpoint(host): """ Utility function to generate the get run endpoint given the host. @@ -521,6 +528,21 @@ def test_cancel_run(self, mock_requests): timeout=self.hook.timeout_seconds, ) + @mock.patch("airflow.providers.databricks.hooks.databricks_base.requests") + def test_delete_run(self, mock_requests): + mock_requests.post.return_value.json.return_value = GET_RUN_RESPONSE + + self.hook.delete_run(RUN_ID) + + mock_requests.post.assert_called_once_with( + delete_run_endpoint(HOST), + json={"run_id": RUN_ID}, + params=None, + auth=HTTPBasicAuth(LOGIN, PASSWORD), + headers=self.hook.user_agent_header, + timeout=self.hook.timeout_seconds, + ) + @mock.patch("airflow.providers.databricks.hooks.databricks_base.requests") def test_start_cluster(self, mock_requests): mock_requests.codes.ok = 200 From e0ea10887cd7d106ce243c4cc4f24d408e931c68 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Fri, 14 Apr 2023 21:34:28 +0530 Subject: [PATCH 2/3] Add tests --- tests/providers/databricks/hooks/test_databricks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/databricks/hooks/test_databricks.py b/tests/providers/databricks/hooks/test_databricks.py index a4b0461671b7a..5cbd2f186def2 100644 --- a/tests/providers/databricks/hooks/test_databricks.py +++ b/tests/providers/databricks/hooks/test_databricks.py @@ -530,7 +530,7 @@ def test_cancel_run(self, mock_requests): @mock.patch("airflow.providers.databricks.hooks.databricks_base.requests") def test_delete_run(self, mock_requests): - mock_requests.post.return_value.json.return_value = GET_RUN_RESPONSE + mock_requests.post.return_value.json.return_value = {} self.hook.delete_run(RUN_ID) From f96c200c212abb380ca9aa744f3686b465c669c8 Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Fri, 14 Apr 2023 21:37:41 +0530 Subject: [PATCH 3/3] fixup --- airflow/providers/databricks/hooks/databricks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index 7bbe140f562d1..9dc1df5afe675 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -38,6 +38,7 @@ RESTART_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/restart") START_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/start") TERMINATE_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/delete") + RUN_NOW_ENDPOINT = ("POST", "api/2.1/jobs/run-now") SUBMIT_RUN_ENDPOINT = ("POST", "api/2.1/jobs/runs/submit") GET_RUN_ENDPOINT = ("GET", "api/2.1/jobs/runs/get")