From 10a527c42979798fa5c9b6b51f3c38d8ec8c7ae5 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 17 Sep 2023 01:08:55 +0200 Subject: [PATCH 1/2] Consolidate hook management in HiveOperator --- .../providers/apache/hive/operators/hive.py | 19 ++++++++++--------- .../apache/hive/operators/test_hive.py | 10 ++++------ 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/airflow/providers/apache/hive/operators/hive.py b/airflow/providers/apache/hive/operators/hive.py index 71bd8ac49b193..73d4072d3ffa9 100644 --- a/airflow/providers/apache/hive/operators/hive.py +++ b/airflow/providers/apache/hive/operators/hive.py @@ -19,8 +19,11 @@ import os import re +from functools import cached_property from typing import TYPE_CHECKING, Any, Sequence +from deprecated.classic import deprecated + from airflow.configuration import conf from airflow.models import BaseOperator from airflow.providers.apache.hive.hooks.hive import HiveCliHook @@ -116,13 +119,8 @@ def __init__( ) self.mapred_job_name_template: str = job_name_template - # assigned lazily - just for consistency we can create the attribute with a - # `None` initial value, later it will be populated by the execute method. - # This also makes `on_kill` implementation consistent since it assumes `self.hook` - # is defined. - self.hook: HiveCliHook | None = None - - def get_hook(self) -> HiveCliHook: + @cached_property + def hook(self) -> HiveCliHook: """Get Hive cli hook.""" return HiveCliHook( hive_cli_conn_id=self.hive_cli_conn_id, @@ -134,6 +132,11 @@ def get_hook(self) -> HiveCliHook: auth=self.auth, ) + @deprecated(reason="use `hook` property instead.") + def get_hook(self) -> HiveCliHook: + """Get Hive cli hook.""" + return self.hook + def prepare_template(self) -> None: if self.hiveconf_jinja_translate: self.hql = re.sub(r"(\$\{(hiveconf:)?([ a-zA-Z0-9_]*)\})", r"{{ \g<3> }}", self.hql) @@ -142,7 +145,6 @@ def prepare_template(self) -> None: def execute(self, context: Context) -> None: self.log.info("Executing: %s", self.hql) - self.hook = self.get_hook() # set the mapred_job_name if it's not set with dag, task, execution time info if not self.mapred_job_name: @@ -169,7 +171,6 @@ def dry_run(self) -> None: # existing env vars from impacting behavior. self.clear_airflow_vars() - self.hook = self.get_hook() self.hook.test_hql(hql=self.hql) def on_kill(self) -> None: diff --git a/tests/providers/apache/hive/operators/test_hive.py b/tests/providers/apache/hive/operators/test_hive.py index d64a2f7bc3422..f02f69c2a482d 100644 --- a/tests/providers/apache/hive/operators/test_hive.py +++ b/tests/providers/apache/hive/operators/test_hive.py @@ -41,7 +41,7 @@ def test_hive_airflow_default_config_queue(self): # just check that the correct default value in test_default.cfg is used test_config_hive_mapred_queue = conf.get("hive", "default_hive_mapred_queue") - assert op.get_hook().mapred_queue == test_config_hive_mapred_queue + assert op.hook.mapred_queue == test_config_hive_mapred_queue def test_hive_airflow_default_config_queue_override(self): specific_mapred_queue = "default" @@ -54,7 +54,7 @@ def test_hive_airflow_default_config_queue_override(self): dag=self.dag, ) - assert op.get_hook().mapred_queue == specific_mapred_queue + assert op.hook.mapred_queue == specific_mapred_queue class HiveOperatorTest(TestHiveEnvironment): @@ -75,10 +75,8 @@ def test_hiveconf(self): op.prepare_template() assert op.hql == "SELECT * FROM ${hiveconf:table} PARTITION (${hiveconf:day});" - @mock.patch("airflow.providers.apache.hive.operators.hive.HiveOperator.get_hook") - def test_mapred_job_name(self, mock_get_hook): - mock_hook = mock.MagicMock() - mock_get_hook.return_value = mock_hook + @mock.patch("airflow.providers.apache.hive.operators.hive.HiveOperator.hook", mock.MagicMock()) + def test_mapred_job_name(self, mock_hook): op = HiveOperator(task_id="test_mapred_job_name", hql=self.hql, dag=self.dag) fake_run_id = "test_mapred_job_name" From f13552dbb06f70dc0c666719079f673551d056ef Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 17 Sep 2023 12:49:22 +0200 Subject: [PATCH 2/2] use AirflowProviderDeprecationWarning --- airflow/providers/apache/hive/operators/hive.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/apache/hive/operators/hive.py b/airflow/providers/apache/hive/operators/hive.py index 73d4072d3ffa9..640943467fed1 100644 --- a/airflow/providers/apache/hive/operators/hive.py +++ b/airflow/providers/apache/hive/operators/hive.py @@ -25,6 +25,7 @@ from deprecated.classic import deprecated from airflow.configuration import conf +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import BaseOperator from airflow.providers.apache.hive.hooks.hive import HiveCliHook from airflow.utils import operator_helpers @@ -132,7 +133,7 @@ def hook(self) -> HiveCliHook: auth=self.auth, ) - @deprecated(reason="use `hook` property instead.") + @deprecated(reason="use `hook` property instead.", category=AirflowProviderDeprecationWarning) def get_hook(self) -> HiveCliHook: """Get Hive cli hook.""" return self.hook