diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index bb8a1dc88080e..9dc1df5afe675 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -43,6 +43,7 @@ SUBMIT_RUN_ENDPOINT = ("POST", "api/2.1/jobs/runs/submit") GET_RUN_ENDPOINT = ("GET", "api/2.1/jobs/runs/get") CANCEL_RUN_ENDPOINT = ("POST", "api/2.1/jobs/runs/cancel") +DELETE_RUN_ENDPOINT = ("POST", "api/2.1/jobs/runs/delete") OUTPUT_RUNS_JOB_ENDPOINT = ("GET", "api/2.1/jobs/runs/get-output") INSTALL_LIBS_ENDPOINT = ("POST", "api/2.0/libraries/install") @@ -351,6 +352,15 @@ def cancel_run(self, run_id: int) -> None: json = {"run_id": run_id} self._do_api_call(CANCEL_RUN_ENDPOINT, json) + def delete_run(self, run_id: int) -> None: + """ + Deletes a non-active run. + + :param run_id: id of the run + """ + json = {"run_id": run_id} + self._do_api_call(DELETE_RUN_ENDPOINT, json) + def restart_cluster(self, json: dict) -> None: """ Restarts the cluster. diff --git a/tests/providers/databricks/hooks/test_databricks.py b/tests/providers/databricks/hooks/test_databricks.py index 895be832b4cd0..5cbd2f186def2 100644 --- a/tests/providers/databricks/hooks/test_databricks.py +++ b/tests/providers/databricks/hooks/test_databricks.py @@ -143,6 +143,13 @@ def cancel_run_endpoint(host): return f"https://{host}/api/2.1/jobs/runs/cancel" +def delete_run_endpoint(host): + """ + Utility function to generate delete run endpoint given the host. + """ + return f"https://{host}/api/2.1/jobs/runs/delete" + + def start_cluster_endpoint(host): """ Utility function to generate the get run endpoint given the host. @@ -521,6 +528,21 @@ def test_cancel_run(self, mock_requests): timeout=self.hook.timeout_seconds, ) + @mock.patch("airflow.providers.databricks.hooks.databricks_base.requests") + def test_delete_run(self, mock_requests): + mock_requests.post.return_value.json.return_value = {} + + self.hook.delete_run(RUN_ID) + + mock_requests.post.assert_called_once_with( + delete_run_endpoint(HOST), + json={"run_id": RUN_ID}, + params=None, + auth=HTTPBasicAuth(LOGIN, PASSWORD), + headers=self.hook.user_agent_header, + timeout=self.hook.timeout_seconds, + ) + @mock.patch("airflow.providers.databricks.hooks.databricks_base.requests") def test_start_cluster(self, mock_requests): mock_requests.codes.ok = 200