diff --git a/airflow/providers/apache/pig/CHANGELOG.rst b/airflow/providers/apache/pig/CHANGELOG.rst index fffe58e951e24..1c6a05de20d3c 100644 --- a/airflow/providers/apache/pig/CHANGELOG.rst +++ b/airflow/providers/apache/pig/CHANGELOG.rst @@ -24,6 +24,20 @@ Changelog --------- +4.0.0 +..... + +Breaking changes +~~~~~~~~~~~~~~~~ + +You cannot use ``pig_properties`` any more as connection extras. If you want to add extra parameters +to ``pig`` command, you need to do it via ``pig_properties`` (string list) of the PigCliHook (new parameter) +or via ``pig_opts`` (string with options separated by spaces) or ``pig_properties`` (string list) in +the PigOperator . Any use of ``pig_properties`` extras in connection will raise an exception, +informing that you need to remove them and pass them as parameters. + +Both ``pig_properties`` and ``pig_opts`` are now templated fields in the PigOperator. + 3.0.0 ..... diff --git a/airflow/providers/apache/pig/hooks/pig.py b/airflow/providers/apache/pig/hooks/pig.py index 2ff813f43c11d..023b308e13e2c 100644 --- a/airflow/providers/apache/pig/hooks/pig.py +++ b/airflow/providers/apache/pig/hooks/pig.py @@ -26,13 +26,10 @@ class PigCliHook(BaseHook): - """ - Simple wrapper around the pig CLI. - - Note that you can also set default pig CLI properties using the - ``pig_properties`` to be used in your connection as in - ``{"pig_properties": "-Dpig.tmpfilecompression=true"}`` + """Simple wrapper around the pig CLI. + :param pig_cli_conn_id: Connection id used by the hook + :param pig_properties: additional properties added after pig cli command as list of strings. """ conn_name_attr = "pig_cli_conn_id" @@ -40,16 +37,27 @@ class PigCliHook(BaseHook): conn_type = "pig_cli" hook_name = "Pig Client Wrapper" - def __init__(self, pig_cli_conn_id: str = default_conn_name) -> None: + def __init__( + self, pig_cli_conn_id: str = default_conn_name, pig_properties: list[str] | None = None + ) -> None: super().__init__() conn = self.get_connection(pig_cli_conn_id) - self.pig_properties = conn.extra_dejson.get("pig_properties", "") + conn_pig_properties = conn.extra_dejson.get("pig_properties") + if conn_pig_properties: + raise RuntimeError( + "The PigCliHook used to have possibility of passing `pig_properties` to the Hook," + " however with the 4.0.0 version of `apache-pig` provider it has been removed. You should" + " use ``pig_opts`` (space separated string) or ``pig_properties`` (string list) in the" + " PigOperator. You can also pass ``pig-properties`` in the PigCliHook `init`. Currently," + f" the {pig_cli_conn_id} connection has those extras: `{conn_pig_properties}`." + ) + self.pig_properties = pig_properties if pig_properties else [] self.conn = conn self.sub_process = None def run_cli(self, pig: str, pig_opts: str | None = None, verbose: bool = True) -> Any: """ - Run an pig script using the pig cli + Run a pig script using the pig cli >>> ph = PigCliHook() >>> result = ph.run_cli("ls /;", pig_opts="-x mapreduce") @@ -67,8 +75,7 @@ def run_cli(self, pig: str, pig_opts: str | None = None, verbose: bool = True) - pig_cmd = [pig_bin] if self.pig_properties: - pig_properties_list = self.pig_properties.split() - pig_cmd.extend(pig_properties_list) + pig_cmd.extend(self.pig_properties) if pig_opts: pig_opts_list = pig_opts.split() pig_cmd.extend(pig_opts_list) diff --git a/airflow/providers/apache/pig/operators/pig.py b/airflow/providers/apache/pig/operators/pig.py index 47d7c89b8e776..544895cea09dd 100644 --- a/airflow/providers/apache/pig/operators/pig.py +++ b/airflow/providers/apache/pig/operators/pig.py @@ -38,10 +38,11 @@ class PigOperator(BaseOperator): you may want to use this along with the ``DAG(user_defined_macros=myargs)`` parameter. View the DAG object documentation for more details. - :param pig_opts: pig options, such as: -x tez, -useHCatalog, ... + :param pig_opts: pig options, such as: -x tez, -useHCatalog, ... - space separated list + :param pig_properties: pig properties, additional pig properties passed as list """ - template_fields: Sequence[str] = ("pig",) + template_fields: Sequence[str] = ("pig", "pig_opts", "pig_properties") template_ext: Sequence[str] = ( ".pig", ".piglatin", @@ -55,6 +56,7 @@ def __init__( pig_cli_conn_id: str = "pig_cli_default", pigparams_jinja_translate: bool = False, pig_opts: str | None = None, + pig_properties: list[str] | None = None, **kwargs: Any, ) -> None: @@ -63,6 +65,7 @@ def __init__( self.pig = pig self.pig_cli_conn_id = pig_cli_conn_id self.pig_opts = pig_opts + self.pig_properties = pig_properties self.hook: PigCliHook | None = None def prepare_template(self): @@ -71,7 +74,7 @@ def prepare_template(self): def execute(self, context: Context): self.log.info("Executing: %s", self.pig) - self.hook = PigCliHook(pig_cli_conn_id=self.pig_cli_conn_id) + self.hook = PigCliHook(pig_cli_conn_id=self.pig_cli_conn_id, pig_properties=self.pig_properties) self.hook.run_cli(pig=self.pig, pig_opts=self.pig_opts) def on_kill(self): diff --git a/airflow/providers/apache/pig/provider.yaml b/airflow/providers/apache/pig/provider.yaml index 998029c78b742..df1c55a8a7b61 100644 --- a/airflow/providers/apache/pig/provider.yaml +++ b/airflow/providers/apache/pig/provider.yaml @@ -22,6 +22,7 @@ description: | `Apache Pig `__ versions: + - 4.0.0 - 3.0.0 - 2.0.4 - 2.0.3 diff --git a/tests/providers/apache/pig/hooks/test_pig.py b/tests/providers/apache/pig/hooks/test_pig.py index ea28fa6b8c5cd..7599014e12fae 100644 --- a/tests/providers/apache/pig/hooks/test_pig.py +++ b/tests/providers/apache/pig/hooks/test_pig.py @@ -43,7 +43,6 @@ def get_connection(self, unused_id): def test_init(self): self.pig_hook() - self.extra_dejson.get.assert_called_once_with("pig_properties", "") @mock.patch("subprocess.Popen") def test_run_cli_success(self, popen_mock): @@ -80,8 +79,7 @@ def test_run_cli_with_properties(self, popen_mock): proc_mock.stdout.readline.return_value = b"" popen_mock.return_value = proc_mock - hook = self.pig_hook() - hook.pig_properties = test_properties + hook = self.pig_hook(pig_properties=["one", "two"]) stdout = hook.run_cli("") assert stdout == "" @@ -90,6 +88,15 @@ def test_run_cli_with_properties(self, popen_mock): for pig_prop in test_properties.split(): assert pig_prop in popen_first_arg + def test_runtime_exception_not_raised_by_default(self): + PigCliHook() + + @mock.patch("airflow.providers.apache.pig.hooks.pig.PigCliHook.get_connection") + def test_runtime_exception_when_properties_passed_by_connection(self, mock_get_connection): + mock_get_connection.return_value.extra_dejson = {"pig_properties": "one two three"} + with pytest.raises(RuntimeError): + PigCliHook() + @mock.patch("subprocess.Popen") def test_run_cli_verbose(self, popen_mock): test_stdout_lines = [b"one", b"two", b""]