From c0d164385879429f028100dc41cd58aca7a4f24e Mon Sep 17 00:00:00 2001 From: Dov Benyomin Sohacheski Date: Wed, 20 Jul 2022 15:18:38 +0300 Subject: [PATCH 1/7] Add auth_type to LivyHook --- airflow/providers/apache/livy/hooks/livy.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/providers/apache/livy/hooks/livy.py b/airflow/providers/apache/livy/hooks/livy.py index e809dc02c7fb4..058b0183f8b23 100644 --- a/airflow/providers/apache/livy/hooks/livy.py +++ b/airflow/providers/apache/livy/hooks/livy.py @@ -75,10 +75,12 @@ def __init__( livy_conn_id: str = default_conn_name, extra_options: Optional[Dict[str, Any]] = None, extra_headers: Optional[Dict[str, Any]] = None, + auth_type: Optional[Any] = None ) -> None: super().__init__(http_conn_id=livy_conn_id) self.extra_headers = extra_headers or {} self.extra_options = extra_options or {} + self.auth_type = auth_type or self.auth_type def get_conn(self, headers: Optional[Dict[str, Any]] = None) -> Any: """ From 916d36b9cb757e79f629248771b1bae8d4aad2ba Mon Sep 17 00:00:00 2001 From: Dov Benyomin Sohacheski Date: Thu, 21 Jul 2022 09:31:44 +0300 Subject: [PATCH 2/7] Added docblock --- airflow/providers/apache/livy/hooks/livy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/apache/livy/hooks/livy.py b/airflow/providers/apache/livy/hooks/livy.py index 058b0183f8b23..8e546bece5bba 100644 --- a/airflow/providers/apache/livy/hooks/livy.py +++ b/airflow/providers/apache/livy/hooks/livy.py @@ -50,6 +50,7 @@ class LivyHook(HttpHook, LoggingMixin): :param livy_conn_id: reference to a pre-defined Livy Connection. :param extra_options: A dictionary of options passed to Livy. :param extra_headers: A dictionary of headers passed to the HTTP request to livy. + :param auth_type: The auth type for the service. .. seealso:: For more details refer to the Apache Livy API reference: From 1207916fd75148b6b74879786eedc0af7b6976da Mon Sep 17 00:00:00 2001 From: Dov Benyomin Sohacheski Date: Thu, 21 Jul 2022 09:35:02 +0300 Subject: [PATCH 3/7] Delegated auth_type to LivyOperator --- airflow/providers/apache/livy/operators/livy.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airflow/providers/apache/livy/operators/livy.py b/airflow/providers/apache/livy/operators/livy.py index f0dbc9e3165fe..d89f2a6550a40 100644 --- a/airflow/providers/apache/livy/operators/livy.py +++ b/airflow/providers/apache/livy/operators/livy.py @@ -49,6 +49,7 @@ class LivyOperator(BaseOperator): :param conf: Spark configuration properties. :param proxy_user: user to impersonate when running the job. :param livy_conn_id: reference to a pre-defined Livy Connection. + :param livy_conn_auth_type: The auth type for the Livy Connection. :param polling_interval: time in seconds between polling for job completion. Don't poll for values >=0 :param extra_options: A dictionary of options, where key is string and value depends on the option that's being modified. @@ -79,6 +80,7 @@ def __init__( name: Optional[str] = None, proxy_user: Optional[str] = None, livy_conn_id: str = 'livy_default', + livy_conn_auth_type: Optional[Any] = None, polling_interval: int = 0, extra_options: Optional[Dict[str, Any]] = None, extra_headers: Optional[Dict[str, Any]] = None, @@ -108,6 +110,7 @@ def __init__( } self._livy_conn_id = livy_conn_id + self._livy_conn_auth_type = livy_conn_auth_type self._polling_interval = polling_interval self._extra_options = extra_options or {} self._extra_headers = extra_headers or {} @@ -128,6 +131,7 @@ def get_hook(self) -> LivyHook: livy_conn_id=self._livy_conn_id, extra_headers=self._extra_headers, extra_options=self._extra_options, + livy_conn_auth_type=self._livy_conn_auth_type ) return self._livy_hook From 720b998ca3a71f28862d6ee669ecc81e5451fc61 Mon Sep 17 00:00:00 2001 From: Dov Benyomin Sohacheski Date: Thu, 21 Jul 2022 09:37:23 +0300 Subject: [PATCH 4/7] Delegated auth_type to LivySensor --- airflow/providers/apache/livy/sensors/livy.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airflow/providers/apache/livy/sensors/livy.py b/airflow/providers/apache/livy/sensors/livy.py index 4c3419f2af4b2..56252ecc8de7f 100644 --- a/airflow/providers/apache/livy/sensors/livy.py +++ b/airflow/providers/apache/livy/sensors/livy.py @@ -41,12 +41,14 @@ def __init__( *, batch_id: Union[int, str], livy_conn_id: str = 'livy_default', + livy_conn_auth_type: Optional[Any] = None, extra_options: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: super().__init__(**kwargs) self.batch_id = batch_id self._livy_conn_id = livy_conn_id + self._livy_conn_auth_type = livy_conn_auth_type self._livy_hook: Optional[LivyHook] = None self._extra_options = extra_options or {} @@ -58,7 +60,11 @@ def get_hook(self) -> LivyHook: :rtype: LivyHook """ if self._livy_hook is None or not isinstance(self._livy_hook, LivyHook): - self._livy_hook = LivyHook(livy_conn_id=self._livy_conn_id, extra_options=self._extra_options) + self._livy_hook = LivyHook( + livy_conn_id=self._livy_conn_id, + extra_options=self._extra_options, + livy_conn_auth_type=self._livy_conn_auth_type + ) return self._livy_hook def poke(self, context: "Context") -> bool: From fbb071163f0260b592e03fd2976948c911047998 Mon Sep 17 00:00:00 2001 From: Dov Benyomin Sohacheski Date: Sun, 7 Aug 2022 13:56:30 +0300 Subject: [PATCH 5/7] added unit tests to hook --- airflow/providers/apache/livy/operators/livy.py | 2 +- airflow/providers/apache/livy/sensors/livy.py | 2 +- tests/providers/apache/livy/hooks/test_livy.py | 14 +++++++++++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/airflow/providers/apache/livy/operators/livy.py b/airflow/providers/apache/livy/operators/livy.py index d89f2a6550a40..3d1fa302db49f 100644 --- a/airflow/providers/apache/livy/operators/livy.py +++ b/airflow/providers/apache/livy/operators/livy.py @@ -131,7 +131,7 @@ def get_hook(self) -> LivyHook: livy_conn_id=self._livy_conn_id, extra_headers=self._extra_headers, extra_options=self._extra_options, - livy_conn_auth_type=self._livy_conn_auth_type + auth_type=self._livy_conn_auth_type ) return self._livy_hook diff --git a/airflow/providers/apache/livy/sensors/livy.py b/airflow/providers/apache/livy/sensors/livy.py index 56252ecc8de7f..11ddff538291e 100644 --- a/airflow/providers/apache/livy/sensors/livy.py +++ b/airflow/providers/apache/livy/sensors/livy.py @@ -63,7 +63,7 @@ def get_hook(self) -> LivyHook: self._livy_hook = LivyHook( livy_conn_id=self._livy_conn_id, extra_options=self._extra_options, - livy_conn_auth_type=self._livy_conn_auth_type + auth_type=self._livy_conn_auth_type ) return self._livy_hook diff --git a/tests/providers/apache/livy/hooks/test_livy.py b/tests/providers/apache/livy/hooks/test_livy.py index 92f0e9937d745..20d76933c98ae 100644 --- a/tests/providers/apache/livy/hooks/test_livy.py +++ b/tests/providers/apache/livy/hooks/test_livy.py @@ -17,7 +17,7 @@ import json import unittest -from unittest.mock import patch +from unittest.mock import patch, MagicMock import pytest import requests_mock @@ -47,6 +47,7 @@ def setUpClass(cls): ) db.merge_conn(Connection(conn_id='missing_host', conn_type='http', port=1234)) db.merge_conn(Connection(conn_id='invalid_uri', uri='http://invalid_uri:4321')) + db.merge_conn(Connection(conn_id='with_credentials', login='login', password='secret', host='host')) def test_build_get_hook(self): @@ -459,3 +460,14 @@ def test_extra_headers(self, mock): hook = LivyHook(extra_headers={'X-Requested-By': 'user'}) hook.post_batch(file='sparkapp') + + def test_alternate_auth_type(self): + auth_type = MagicMock() + + hook = LivyHook(livy_conn_id='with_credentials', auth_type=auth_type) + + auth_type.assert_not_called() + + hook.get_conn() + + auth_type.assert_called_once_with('login', 'secret') From 48fbd896c3baf9991666319058b312c7fc6d1421 Mon Sep 17 00:00:00 2001 From: Dov Benyomin Sohacheski Date: Sun, 7 Aug 2022 14:37:25 +0300 Subject: [PATCH 6/7] fixed static styling and connection --- airflow/providers/apache/livy/hooks/livy.py | 2 +- airflow/providers/apache/livy/operators/livy.py | 2 +- airflow/providers/apache/livy/sensors/livy.py | 2 +- tests/providers/apache/livy/hooks/test_livy.py | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/providers/apache/livy/hooks/livy.py b/airflow/providers/apache/livy/hooks/livy.py index 8e546bece5bba..84b2fd12ade6d 100644 --- a/airflow/providers/apache/livy/hooks/livy.py +++ b/airflow/providers/apache/livy/hooks/livy.py @@ -76,7 +76,7 @@ def __init__( livy_conn_id: str = default_conn_name, extra_options: Optional[Dict[str, Any]] = None, extra_headers: Optional[Dict[str, Any]] = None, - auth_type: Optional[Any] = None + auth_type: Optional[Any] = None, ) -> None: super().__init__(http_conn_id=livy_conn_id) self.extra_headers = extra_headers or {} diff --git a/airflow/providers/apache/livy/operators/livy.py b/airflow/providers/apache/livy/operators/livy.py index 3d1fa302db49f..ce2910449d609 100644 --- a/airflow/providers/apache/livy/operators/livy.py +++ b/airflow/providers/apache/livy/operators/livy.py @@ -131,7 +131,7 @@ def get_hook(self) -> LivyHook: livy_conn_id=self._livy_conn_id, extra_headers=self._extra_headers, extra_options=self._extra_options, - auth_type=self._livy_conn_auth_type + auth_type=self._livy_conn_auth_type, ) return self._livy_hook diff --git a/airflow/providers/apache/livy/sensors/livy.py b/airflow/providers/apache/livy/sensors/livy.py index 11ddff538291e..fb3e60c82865b 100644 --- a/airflow/providers/apache/livy/sensors/livy.py +++ b/airflow/providers/apache/livy/sensors/livy.py @@ -63,7 +63,7 @@ def get_hook(self) -> LivyHook: self._livy_hook = LivyHook( livy_conn_id=self._livy_conn_id, extra_options=self._extra_options, - auth_type=self._livy_conn_auth_type + auth_type=self._livy_conn_auth_type, ) return self._livy_hook diff --git a/tests/providers/apache/livy/hooks/test_livy.py b/tests/providers/apache/livy/hooks/test_livy.py index 20d76933c98ae..83828078cd065 100644 --- a/tests/providers/apache/livy/hooks/test_livy.py +++ b/tests/providers/apache/livy/hooks/test_livy.py @@ -17,7 +17,7 @@ import json import unittest -from unittest.mock import patch, MagicMock +from unittest.mock import MagicMock, patch import pytest import requests_mock @@ -47,7 +47,7 @@ def setUpClass(cls): ) db.merge_conn(Connection(conn_id='missing_host', conn_type='http', port=1234)) db.merge_conn(Connection(conn_id='invalid_uri', uri='http://invalid_uri:4321')) - db.merge_conn(Connection(conn_id='with_credentials', login='login', password='secret', host='host')) + db.merge_conn(Connection(conn_id='with_credentials', login='login', password='secret', conn_type='http', host='host')) def test_build_get_hook(self): From 47a1677e71f67ca93062a867b83f688695c2bb3a Mon Sep 17 00:00:00 2001 From: Dov Benyomin Sohacheski Date: Sun, 7 Aug 2022 15:23:40 +0300 Subject: [PATCH 7/7] final style commit --- tests/providers/apache/livy/hooks/test_livy.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/providers/apache/livy/hooks/test_livy.py b/tests/providers/apache/livy/hooks/test_livy.py index 83828078cd065..7d663e0fd0b6d 100644 --- a/tests/providers/apache/livy/hooks/test_livy.py +++ b/tests/providers/apache/livy/hooks/test_livy.py @@ -47,7 +47,11 @@ def setUpClass(cls): ) db.merge_conn(Connection(conn_id='missing_host', conn_type='http', port=1234)) db.merge_conn(Connection(conn_id='invalid_uri', uri='http://invalid_uri:4321')) - db.merge_conn(Connection(conn_id='with_credentials', login='login', password='secret', conn_type='http', host='host')) + db.merge_conn( + Connection( + conn_id='with_credentials', login='login', password='secret', conn_type='http', host='host' + ) + ) def test_build_get_hook(self):