From 7446c0187cf25a5435271b196a89897bfb1ccce2 Mon Sep 17 00:00:00 2001 From: bhavaniravi Date: Thu, 9 Dec 2021 10:00:36 +0000 Subject: [PATCH 1/3] 20139 - organize aws step_function --- .../amazon/aws/operators/step_function.py | 122 ++++++++++++++++++ .../step_function_get_execution_output.py | 55 ++------ .../step_function_start_execution.py | 65 ++-------- .../amazon/aws/sensors/step_function.py | 88 +++++++++++++ .../aws/sensors/step_function_execution.py | 76 ++--------- airflow/providers/amazon/provider.yaml | 2 + .../prepare_provider_packages.py | 2 + tests/deprecated_classes.py | 14 ++ 8 files changed, 254 insertions(+), 170 deletions(-) create mode 100644 airflow/providers/amazon/aws/operators/step_function.py create mode 100644 airflow/providers/amazon/aws/sensors/step_function.py diff --git a/airflow/providers/amazon/aws/operators/step_function.py b/airflow/providers/amazon/aws/operators/step_function.py new file mode 100644 index 0000000000000..2853cea91faaf --- /dev/null +++ b/airflow/providers/amazon/aws/operators/step_function.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import json +from typing import Optional, Union + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook + + +class StepFunctionStartExecutionOperator(BaseOperator): + """ + An Operator that begins execution of an Step Function State Machine + + Additional arguments may be specified and are passed down to the underlying BaseOperator. + + .. seealso:: + :class:`~airflow.models.BaseOperator` + + :param state_machine_arn: ARN of the Step Function State Machine + :type state_machine_arn: str + :param name: The name of the execution. + :type name: Optional[str] + :param state_machine_input: JSON data input to pass to the State Machine + :type state_machine_input: Union[Dict[str, any], str, None] + :param aws_conn_id: aws connection to uses + :type aws_conn_id: str + :param do_xcom_push: if True, execution_arn is pushed to XCom with key execution_arn. + :type do_xcom_push: bool + """ + + template_fields = ['state_machine_arn', 'name', 'input'] + template_ext = () + ui_color = '#f9c915' + + def __init__( + self, + *, + state_machine_arn: str, + name: Optional[str] = None, + state_machine_input: Union[dict, str, None] = None, + aws_conn_id: str = 'aws_default', + region_name: Optional[str] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.state_machine_arn = state_machine_arn + self.name = name + self.input = state_machine_input + self.aws_conn_id = aws_conn_id + self.region_name = region_name + + def execute(self, context): + hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) + + execution_arn = hook.start_execution(self.state_machine_arn, self.name, self.input) + + if execution_arn is None: + raise AirflowException(f'Failed to start State Machine execution for: {self.state_machine_arn}') + + self.log.info('Started State Machine execution for %s: %s', self.state_machine_arn, execution_arn) + + return execution_arn + + +class StepFunctionGetExecutionOutputOperator(BaseOperator): + """ + An Operator that begins execution of an Step Function State Machine + + Additional arguments may be specified and are passed down to the underlying BaseOperator. + + .. seealso:: + :class:`~airflow.models.BaseOperator` + + :param execution_arn: ARN of the Step Function State Machine Execution + :type execution_arn: str + :param aws_conn_id: aws connection to use, defaults to 'aws_default' + :type aws_conn_id: str + """ + + template_fields = ['execution_arn'] + template_ext = () + ui_color = '#f9c915' + + def __init__( + self, + *, + execution_arn: str, + aws_conn_id: str = 'aws_default', + region_name: Optional[str] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.execution_arn = execution_arn + self.aws_conn_id = aws_conn_id + self.region_name = region_name + + def execute(self, context): + hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) + + execution_status = hook.describe_execution(self.execution_arn) + execution_output = json.loads(execution_status['output']) if 'output' in execution_status else None + + self.log.info('Got State Machine Execution output for %s', self.execution_arn) + + return execution_output diff --git a/airflow/providers/amazon/aws/operators/step_function_get_execution_output.py b/airflow/providers/amazon/aws/operators/step_function_get_execution_output.py index 770ca5266346a..2b047241ae861 100644 --- a/airflow/providers/amazon/aws/operators/step_function_get_execution_output.py +++ b/airflow/providers/amazon/aws/operators/step_function_get_execution_output.py @@ -15,51 +15,16 @@ # specific language governing permissions and limitations # under the License. -import json -from typing import Optional +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.sensors.step_function`.""" -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook +import warnings +from airflow.providers.amazon.aws.operators.step_function import ( # noqa + StepFunctionGetExecutionOutputOperator, +) -class StepFunctionGetExecutionOutputOperator(BaseOperator): - """ - An Operator that begins execution of an Step Function State Machine - - Additional arguments may be specified and are passed down to the underlying BaseOperator. - - .. seealso:: - :class:`~airflow.models.BaseOperator` - - :param execution_arn: ARN of the Step Function State Machine Execution - :type execution_arn: str - :param aws_conn_id: aws connection to use, defaults to 'aws_default' - :type aws_conn_id: str - """ - - template_fields = ['execution_arn'] - template_ext = () - ui_color = '#f9c915' - - def __init__( - self, - *, - execution_arn: str, - aws_conn_id: str = 'aws_default', - region_name: Optional[str] = None, - **kwargs, - ): - super().__init__(**kwargs) - self.execution_arn = execution_arn - self.aws_conn_id = aws_conn_id - self.region_name = region_name - - def execute(self, context): - hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) - - execution_status = hook.describe_execution(self.execution_arn) - execution_output = json.loads(execution_status['output']) if 'output' in execution_status else None - - self.log.info('Got State Machine Execution output for %s', self.execution_arn) - - return execution_output +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.step_function`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/operators/step_function_start_execution.py b/airflow/providers/amazon/aws/operators/step_function_start_execution.py index 8507e3b3c061b..10a847ffc90bd 100644 --- a/airflow/providers/amazon/aws/operators/step_function_start_execution.py +++ b/airflow/providers/amazon/aws/operators/step_function_start_execution.py @@ -15,63 +15,14 @@ # specific language governing permissions and limitations # under the License. -from typing import Optional, Union +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.step_function`.""" -from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook +import warnings +from airflow.providers.amazon.aws.operators.step_function import StepFunctionStartExecutionOperator # noqa -class StepFunctionStartExecutionOperator(BaseOperator): - """ - An Operator that begins execution of an Step Function State Machine - - Additional arguments may be specified and are passed down to the underlying BaseOperator. - - .. seealso:: - :class:`~airflow.models.BaseOperator` - - :param state_machine_arn: ARN of the Step Function State Machine - :type state_machine_arn: str - :param name: The name of the execution. - :type name: Optional[str] - :param state_machine_input: JSON data input to pass to the State Machine - :type state_machine_input: Union[Dict[str, any], str, None] - :param aws_conn_id: aws connection to uses - :type aws_conn_id: str - :param do_xcom_push: if True, execution_arn is pushed to XCom with key execution_arn. - :type do_xcom_push: bool - """ - - template_fields = ['state_machine_arn', 'name', 'input'] - template_ext = () - ui_color = '#f9c915' - - def __init__( - self, - *, - state_machine_arn: str, - name: Optional[str] = None, - state_machine_input: Union[dict, str, None] = None, - aws_conn_id: str = 'aws_default', - region_name: Optional[str] = None, - **kwargs, - ): - super().__init__(**kwargs) - self.state_machine_arn = state_machine_arn - self.name = name - self.input = state_machine_input - self.aws_conn_id = aws_conn_id - self.region_name = region_name - - def execute(self, context): - hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) - - execution_arn = hook.start_execution(self.state_machine_arn, self.name, self.input) - - if execution_arn is None: - raise AirflowException(f'Failed to start State Machine execution for: {self.state_machine_arn}') - - self.log.info('Started State Machine execution for %s: %s', self.state_machine_arn, execution_arn) - - return execution_arn +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.step_function`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/sensors/step_function.py b/airflow/providers/amazon/aws/sensors/step_function.py new file mode 100644 index 0000000000000..39cd74f69397d --- /dev/null +++ b/airflow/providers/amazon/aws/sensors/step_function.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from typing import Optional + +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook +from airflow.sensors.base import BaseSensorOperator + + +class StepFunctionExecutionSensor(BaseSensorOperator): + """ + Asks for the state of the Step Function State Machine Execution until it + reaches a failure state or success state. + If it fails, failing the task. + + On successful completion of the Execution the Sensor will do an XCom Push + of the State Machine's output to `output` + + :param execution_arn: execution_arn to check the state of + :type execution_arn: str + :param aws_conn_id: aws connection to use, defaults to 'aws_default' + :type aws_conn_id: str + """ + + INTERMEDIATE_STATES = ('RUNNING',) + FAILURE_STATES = ( + 'FAILED', + 'TIMED_OUT', + 'ABORTED', + ) + SUCCESS_STATES = ('SUCCEEDED',) + + template_fields = ['execution_arn'] + template_ext = () + ui_color = '#66c3ff' + + def __init__( + self, + *, + execution_arn: str, + aws_conn_id: str = 'aws_default', + region_name: Optional[str] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.execution_arn = execution_arn + self.aws_conn_id = aws_conn_id + self.region_name = region_name + self.hook: Optional[StepFunctionHook] = None + + def poke(self, context): + execution_status = self.get_hook().describe_execution(self.execution_arn) + state = execution_status['status'] + output = json.loads(execution_status['output']) if 'output' in execution_status else None + + if state in self.FAILURE_STATES: + raise AirflowException(f'Step Function sensor failed. State Machine Output: {output}') + + if state in self.INTERMEDIATE_STATES: + return False + + self.log.info('Doing xcom_push of output') + self.xcom_push(context, 'output', output) + return True + + def get_hook(self) -> StepFunctionHook: + """Create and return a StepFunctionHook""" + if self.hook: + return self.hook + + self.hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) + return self.hook diff --git a/airflow/providers/amazon/aws/sensors/step_function_execution.py b/airflow/providers/amazon/aws/sensors/step_function_execution.py index 39cd74f69397d..267343cd1df9e 100644 --- a/airflow/providers/amazon/aws/sensors/step_function_execution.py +++ b/airflow/providers/amazon/aws/sensors/step_function_execution.py @@ -15,74 +15,14 @@ # specific language governing permissions and limitations # under the License. -import json -from typing import Optional +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.sensors.step_function`.""" -from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook -from airflow.sensors.base import BaseSensorOperator +import warnings +from airflow.providers.amazon.aws.sensors.step_function import StepFunctionExecutionSensor # noqa -class StepFunctionExecutionSensor(BaseSensorOperator): - """ - Asks for the state of the Step Function State Machine Execution until it - reaches a failure state or success state. - If it fails, failing the task. - - On successful completion of the Execution the Sensor will do an XCom Push - of the State Machine's output to `output` - - :param execution_arn: execution_arn to check the state of - :type execution_arn: str - :param aws_conn_id: aws connection to use, defaults to 'aws_default' - :type aws_conn_id: str - """ - - INTERMEDIATE_STATES = ('RUNNING',) - FAILURE_STATES = ( - 'FAILED', - 'TIMED_OUT', - 'ABORTED', - ) - SUCCESS_STATES = ('SUCCEEDED',) - - template_fields = ['execution_arn'] - template_ext = () - ui_color = '#66c3ff' - - def __init__( - self, - *, - execution_arn: str, - aws_conn_id: str = 'aws_default', - region_name: Optional[str] = None, - **kwargs, - ): - super().__init__(**kwargs) - self.execution_arn = execution_arn - self.aws_conn_id = aws_conn_id - self.region_name = region_name - self.hook: Optional[StepFunctionHook] = None - - def poke(self, context): - execution_status = self.get_hook().describe_execution(self.execution_arn) - state = execution_status['status'] - output = json.loads(execution_status['output']) if 'output' in execution_status else None - - if state in self.FAILURE_STATES: - raise AirflowException(f'Step Function sensor failed. State Machine Output: {output}') - - if state in self.INTERMEDIATE_STATES: - return False - - self.log.info('Doing xcom_push of output') - self.xcom_push(context, 'output', output) - return True - - def get_hook(self) -> StepFunctionHook: - """Create and return a StepFunctionHook""" - if self.hook: - return self.hook - - self.hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) - return self.hook +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.step_function`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index f7c392f5e228d..5ce2a3c794081 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -243,6 +243,7 @@ operators: python-modules: - airflow.providers.amazon.aws.operators.step_function_get_execution_output - airflow.providers.amazon.aws.operators.step_function_start_execution + - airflow.providers.amazon.aws.operators.step_function - integration-name: Amazon Redshift python-modules: - airflow.providers.amazon.aws.operators.redshift @@ -303,6 +304,7 @@ sensors: - integration-name: AWS Step Functions python-modules: - airflow.providers.amazon.aws.sensors.step_function_execution + - airflow.providers.amazon.aws.sensors.step_function hooks: - integration-name: Amazon Athena diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py index 007f677bedb22..f5e49e43c8bf9 100755 --- a/dev/provider_packages/prepare_provider_packages.py +++ b/dev/provider_packages/prepare_provider_packages.py @@ -2137,6 +2137,8 @@ def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: List[warnin "This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.", 'numpy.ufunc size changed, may indicate binary incompatibility. Expected 192 from C header,' ' got 216 from PyObject', + "This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.step_function`.", + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.step_function`.", } diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py index 2cb927947f716..7f5c689db7511 100644 --- a/tests/deprecated_classes.py +++ b/tests/deprecated_classes.py @@ -1510,6 +1510,16 @@ 'airflow.providers.amazon.aws.operators.s3_file_transform.S3FileTransformOperator', 'airflow.operators.s3_file_transform_operator.S3FileTransformOperator', ), + ( + 'airflow.providers.amazon.aws.operators.step_function.StepFunctionStartExecutionOperator', + 'airflow.providers.amazon.aws.operators.step_function_start_execution' + '.StepFunctionStartExecutionOperator', + ), + ( + 'airflow.providers.amazon.aws.operators.step_function.StepFunctionGetExecutionOutputOperator', + 'airflow.providers.amazon.aws.operators.step_function_get_execution_output' + '.StepFunctionGetExecutionOutputOperator', + ), ( 'airflow.providers.amazon.aws.sensors.s3_key.S3KeySensor', 'airflow.sensors.s3_key_sensor.S3KeySensor', @@ -1518,6 +1528,10 @@ 'airflow.providers.amazon.aws.sensors.s3_prefix.S3PrefixSensor', 'airflow.sensors.s3_prefix_sensor.S3PrefixSensor', ), + ( + 'airflow.providers.amazon.aws.sensors.step_function.StepFunctionExecutionSensor', + 'airflow.providers.amazon.aws.sensors.step_function_execution.StepFunctionExecutionSensor', + ), ( 'airflow.sensors.bash.BashSensor', 'airflow.contrib.sensors.bash_sensor.BashSensor', From 525583463b3f2bbf35bb7d97afa321290c005b3f Mon Sep 17 00:00:00 2001 From: bhavaniravi Date: Mon, 13 Dec 2021 05:39:14 +0000 Subject: [PATCH 2/3] 20139 - move step_function testcases --- ...art_execution.py => test_step_function.py} | 69 ++++++++++++++++--- ...test_step_function_get_execution_output.py | 69 ------------------- .../sensors/test_step_function_execution.py | 8 +-- 3 files changed, 64 insertions(+), 82 deletions(-) rename tests/providers/amazon/aws/operators/{test_step_function_start_execution.py => test_step_function.py} (55%) delete mode 100644 tests/providers/amazon/aws/operators/test_step_function_get_execution_output.py diff --git a/tests/providers/amazon/aws/operators/test_step_function_start_execution.py b/tests/providers/amazon/aws/operators/test_step_function.py similarity index 55% rename from tests/providers/amazon/aws/operators/test_step_function_start_execution.py rename to tests/providers/amazon/aws/operators/test_step_function.py index f71b8f02bb638..0aa41ff745dfa 100644 --- a/tests/providers/amazon/aws/operators/test_step_function_start_execution.py +++ b/tests/providers/amazon/aws/operators/test_step_function.py @@ -1,5 +1,7 @@ # # Licensed to the Apache Software Foundation (ASF) under one +# TODO: This license is not consistent with license used in the project. +# Delete the inconsistent license and above line and rerun pre-commit to insert a good license. # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file @@ -13,7 +15,7 @@ # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the -# specific language governing permissions and limitations +# specific language governing permissions and limitations ho # under the License. # @@ -21,26 +23,75 @@ from unittest import mock from unittest.mock import MagicMock -from airflow.providers.amazon.aws.operators.step_function_start_execution import ( +from airflow.providers.amazon.aws.operators.step_function import ( + StepFunctionGetExecutionOutputOperator, StepFunctionStartExecutionOperator, ) -TASK_ID = 'step_function_start_execution_task' +EXECUTION_ARN = ( + 'arn:aws:states:us-east-1:123456789012:execution:' + 'pseudo-state-machine:020f5b16-b1a1-4149-946f-92dd32d97934' +) +AWS_CONN_ID = 'aws_non_default' +REGION_NAME = 'us-west-2' STATE_MACHINE_ARN = 'arn:aws:states:us-east-1:000000000000:stateMachine:pseudo-state-machine' NAME = 'NAME' INPUT = '{}' -AWS_CONN_ID = 'aws_non_default' -REGION_NAME = 'us-west-2' + + +class TestStepFunctionGetExecutionOutputOperator(unittest.TestCase): + TASK_ID = 'step_function_get_execution_output' + + def setUp(self): + self.mock_context = MagicMock() + + def test_init(self): + # Given / When + operator = StepFunctionGetExecutionOutputOperator( + task_id=self.TASK_ID, + execution_arn=EXECUTION_ARN, + aws_conn_id=AWS_CONN_ID, + region_name=REGION_NAME, + ) + + # Then + assert self.TASK_ID == operator.task_id + assert EXECUTION_ARN == operator.execution_arn + assert AWS_CONN_ID == operator.aws_conn_id + assert REGION_NAME == operator.region_name + + @mock.patch('airflow.providers.amazon.aws.operators.step_function.StepFunctionHook') + def test_execute(self, mock_hook): + # Given + hook_response = {'output': '{}'} + + hook_instance = mock_hook.return_value + hook_instance.describe_execution.return_value = hook_response + + operator = StepFunctionGetExecutionOutputOperator( + task_id=self.TASK_ID, + execution_arn=EXECUTION_ARN, + aws_conn_id=AWS_CONN_ID, + region_name=REGION_NAME, + ) + + # When + result = operator.execute(self.mock_context) + + # Then + assert {} == result class TestStepFunctionStartExecutionOperator(unittest.TestCase): + TASK_ID = 'step_function_start_execution_task' + def setUp(self): self.mock_context = MagicMock() def test_init(self): # Given / When operator = StepFunctionStartExecutionOperator( - task_id=TASK_ID, + task_id=self.TASK_ID, state_machine_arn=STATE_MACHINE_ARN, name=NAME, state_machine_input=INPUT, @@ -49,14 +100,14 @@ def test_init(self): ) # Then - assert TASK_ID == operator.task_id + assert self.TASK_ID == operator.task_id assert STATE_MACHINE_ARN == operator.state_machine_arn assert NAME == operator.name assert INPUT == operator.input assert AWS_CONN_ID == operator.aws_conn_id assert REGION_NAME == operator.region_name - @mock.patch('airflow.providers.amazon.aws.operators.step_function_start_execution.StepFunctionHook') + @mock.patch('airflow.providers.amazon.aws.operators.step_function.StepFunctionHook') def test_execute(self, mock_hook): # Given hook_response = ( @@ -68,7 +119,7 @@ def test_execute(self, mock_hook): hook_instance.start_execution.return_value = hook_response operator = StepFunctionStartExecutionOperator( - task_id=TASK_ID, + task_id=self.TASK_ID, state_machine_arn=STATE_MACHINE_ARN, name=NAME, state_machine_input=INPUT, diff --git a/tests/providers/amazon/aws/operators/test_step_function_get_execution_output.py b/tests/providers/amazon/aws/operators/test_step_function_get_execution_output.py deleted file mode 100644 index 8bad0321b359c..0000000000000 --- a/tests/providers/amazon/aws/operators/test_step_function_get_execution_output.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import unittest -from unittest import mock -from unittest.mock import MagicMock - -from airflow.providers.amazon.aws.operators.step_function_get_execution_output import ( - StepFunctionGetExecutionOutputOperator, -) - -TASK_ID = 'step_function_get_execution_output' -EXECUTION_ARN = ( - 'arn:aws:states:us-east-1:123456789012:execution:' - 'pseudo-state-machine:020f5b16-b1a1-4149-946f-92dd32d97934' -) -AWS_CONN_ID = 'aws_non_default' -REGION_NAME = 'us-west-2' - - -class TestStepFunctionGetExecutionOutputOperator(unittest.TestCase): - def setUp(self): - self.mock_context = MagicMock() - - def test_init(self): - # Given / When - operator = StepFunctionGetExecutionOutputOperator( - task_id=TASK_ID, execution_arn=EXECUTION_ARN, aws_conn_id=AWS_CONN_ID, region_name=REGION_NAME - ) - - # Then - assert TASK_ID == operator.task_id - assert EXECUTION_ARN == operator.execution_arn - assert AWS_CONN_ID == operator.aws_conn_id - assert REGION_NAME == operator.region_name - - @mock.patch('airflow.providers.amazon.aws.operators.step_function_get_execution_output.StepFunctionHook') - def test_execute(self, mock_hook): - # Given - hook_response = {'output': '{}'} - - hook_instance = mock_hook.return_value - hook_instance.describe_execution.return_value = hook_response - - operator = StepFunctionGetExecutionOutputOperator( - task_id=TASK_ID, execution_arn=EXECUTION_ARN, aws_conn_id=AWS_CONN_ID, region_name=REGION_NAME - ) - - # When - result = operator.execute(self.mock_context) - - # Then - assert {} == result diff --git a/tests/providers/amazon/aws/sensors/test_step_function_execution.py b/tests/providers/amazon/aws/sensors/test_step_function_execution.py index 03235392f0335..2d06583cb13d7 100644 --- a/tests/providers/amazon/aws/sensors/test_step_function_execution.py +++ b/tests/providers/amazon/aws/sensors/test_step_function_execution.py @@ -24,7 +24,7 @@ from parameterized import parameterized from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.sensors.step_function_execution import StepFunctionExecutionSensor +from airflow.providers.amazon.aws.sensors.step_function import StepFunctionExecutionSensor TASK_ID = 'step_function_execution_sensor' EXECUTION_ARN = ( @@ -50,7 +50,7 @@ def test_init(self): assert REGION_NAME == sensor.region_name @parameterized.expand([('FAILED',), ('TIMED_OUT',), ('ABORTED',)]) - @mock.patch('airflow.providers.amazon.aws.sensors.step_function_execution.StepFunctionHook') + @mock.patch('airflow.providers.amazon.aws.sensors.step_function.StepFunctionHook') def test_exceptions(self, mock_status, mock_hook): hook_response = {'status': mock_status} @@ -64,7 +64,7 @@ def test_exceptions(self, mock_status, mock_hook): with pytest.raises(AirflowException): sensor.poke(self.mock_context) - @mock.patch('airflow.providers.amazon.aws.sensors.step_function_execution.StepFunctionHook') + @mock.patch('airflow.providers.amazon.aws.sensors.step_function.StepFunctionHook') def test_running(self, mock_hook): hook_response = {'status': 'RUNNING'} @@ -77,7 +77,7 @@ def test_running(self, mock_hook): assert not sensor.poke(self.mock_context) - @mock.patch('airflow.providers.amazon.aws.sensors.step_function_execution.StepFunctionHook') + @mock.patch('airflow.providers.amazon.aws.sensors.step_function.StepFunctionHook') def test_succeeded(self, mock_hook): hook_response = {'status': 'SUCCEEDED'} From 029b5ccae21f60fab1d71117827eb7ff1391c210 Mon Sep 17 00:00:00 2001 From: bhavaniravi Date: Mon, 13 Dec 2021 08:02:51 +0000 Subject: [PATCH 3/3] fix test path and license --- tests/providers/amazon/aws/operators/test_step_function.py | 5 +---- ...test_step_function_execution.py => test_step_function.py} | 0 2 files changed, 1 insertion(+), 4 deletions(-) rename tests/providers/amazon/aws/sensors/{test_step_function_execution.py => test_step_function.py} (100%) diff --git a/tests/providers/amazon/aws/operators/test_step_function.py b/tests/providers/amazon/aws/operators/test_step_function.py index 0aa41ff745dfa..bc044d40e9b2c 100644 --- a/tests/providers/amazon/aws/operators/test_step_function.py +++ b/tests/providers/amazon/aws/operators/test_step_function.py @@ -1,7 +1,5 @@ # # Licensed to the Apache Software Foundation (ASF) under one -# TODO: This license is not consistent with license used in the project. -# Delete the inconsistent license and above line and rerun pre-commit to insert a good license. # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file @@ -15,9 +13,8 @@ # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the -# specific language governing permissions and limitations ho +# specific language governing permissions and limitations # under the License. -# import unittest from unittest import mock diff --git a/tests/providers/amazon/aws/sensors/test_step_function_execution.py b/tests/providers/amazon/aws/sensors/test_step_function.py similarity index 100% rename from tests/providers/amazon/aws/sensors/test_step_function_execution.py rename to tests/providers/amazon/aws/sensors/test_step_function.py