diff --git a/airflow/providers/amazon/aws/operators/step_function.py b/airflow/providers/amazon/aws/operators/step_function.py new file mode 100644 index 0000000000000..2853cea91faaf --- /dev/null +++ b/airflow/providers/amazon/aws/operators/step_function.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import json +from typing import Optional, Union + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook + + +class StepFunctionStartExecutionOperator(BaseOperator): + """ + An Operator that begins execution of an Step Function State Machine + + Additional arguments may be specified and are passed down to the underlying BaseOperator. + + .. seealso:: + :class:`~airflow.models.BaseOperator` + + :param state_machine_arn: ARN of the Step Function State Machine + :type state_machine_arn: str + :param name: The name of the execution. + :type name: Optional[str] + :param state_machine_input: JSON data input to pass to the State Machine + :type state_machine_input: Union[Dict[str, any], str, None] + :param aws_conn_id: aws connection to uses + :type aws_conn_id: str + :param do_xcom_push: if True, execution_arn is pushed to XCom with key execution_arn. + :type do_xcom_push: bool + """ + + template_fields = ['state_machine_arn', 'name', 'input'] + template_ext = () + ui_color = '#f9c915' + + def __init__( + self, + *, + state_machine_arn: str, + name: Optional[str] = None, + state_machine_input: Union[dict, str, None] = None, + aws_conn_id: str = 'aws_default', + region_name: Optional[str] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.state_machine_arn = state_machine_arn + self.name = name + self.input = state_machine_input + self.aws_conn_id = aws_conn_id + self.region_name = region_name + + def execute(self, context): + hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) + + execution_arn = hook.start_execution(self.state_machine_arn, self.name, self.input) + + if execution_arn is None: + raise AirflowException(f'Failed to start State Machine execution for: {self.state_machine_arn}') + + self.log.info('Started State Machine execution for %s: %s', self.state_machine_arn, execution_arn) + + return execution_arn + + +class StepFunctionGetExecutionOutputOperator(BaseOperator): + """ + An Operator that begins execution of an Step Function State Machine + + Additional arguments may be specified and are passed down to the underlying BaseOperator. + + .. seealso:: + :class:`~airflow.models.BaseOperator` + + :param execution_arn: ARN of the Step Function State Machine Execution + :type execution_arn: str + :param aws_conn_id: aws connection to use, defaults to 'aws_default' + :type aws_conn_id: str + """ + + template_fields = ['execution_arn'] + template_ext = () + ui_color = '#f9c915' + + def __init__( + self, + *, + execution_arn: str, + aws_conn_id: str = 'aws_default', + region_name: Optional[str] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.execution_arn = execution_arn + self.aws_conn_id = aws_conn_id + self.region_name = region_name + + def execute(self, context): + hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) + + execution_status = hook.describe_execution(self.execution_arn) + execution_output = json.loads(execution_status['output']) if 'output' in execution_status else None + + self.log.info('Got State Machine Execution output for %s', self.execution_arn) + + return execution_output diff --git a/airflow/providers/amazon/aws/operators/step_function_get_execution_output.py b/airflow/providers/amazon/aws/operators/step_function_get_execution_output.py index 770ca5266346a..2b047241ae861 100644 --- a/airflow/providers/amazon/aws/operators/step_function_get_execution_output.py +++ b/airflow/providers/amazon/aws/operators/step_function_get_execution_output.py @@ -15,51 +15,16 @@ # specific language governing permissions and limitations # under the License. -import json -from typing import Optional +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.sensors.step_function`.""" -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook +import warnings +from airflow.providers.amazon.aws.operators.step_function import ( # noqa + StepFunctionGetExecutionOutputOperator, +) -class StepFunctionGetExecutionOutputOperator(BaseOperator): - """ - An Operator that begins execution of an Step Function State Machine - - Additional arguments may be specified and are passed down to the underlying BaseOperator. - - .. seealso:: - :class:`~airflow.models.BaseOperator` - - :param execution_arn: ARN of the Step Function State Machine Execution - :type execution_arn: str - :param aws_conn_id: aws connection to use, defaults to 'aws_default' - :type aws_conn_id: str - """ - - template_fields = ['execution_arn'] - template_ext = () - ui_color = '#f9c915' - - def __init__( - self, - *, - execution_arn: str, - aws_conn_id: str = 'aws_default', - region_name: Optional[str] = None, - **kwargs, - ): - super().__init__(**kwargs) - self.execution_arn = execution_arn - self.aws_conn_id = aws_conn_id - self.region_name = region_name - - def execute(self, context): - hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) - - execution_status = hook.describe_execution(self.execution_arn) - execution_output = json.loads(execution_status['output']) if 'output' in execution_status else None - - self.log.info('Got State Machine Execution output for %s', self.execution_arn) - - return execution_output +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.step_function`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/operators/step_function_start_execution.py b/airflow/providers/amazon/aws/operators/step_function_start_execution.py index 8507e3b3c061b..10a847ffc90bd 100644 --- a/airflow/providers/amazon/aws/operators/step_function_start_execution.py +++ b/airflow/providers/amazon/aws/operators/step_function_start_execution.py @@ -15,63 +15,14 @@ # specific language governing permissions and limitations # under the License. -from typing import Optional, Union +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.step_function`.""" -from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook +import warnings +from airflow.providers.amazon.aws.operators.step_function import StepFunctionStartExecutionOperator # noqa -class StepFunctionStartExecutionOperator(BaseOperator): - """ - An Operator that begins execution of an Step Function State Machine - - Additional arguments may be specified and are passed down to the underlying BaseOperator. - - .. seealso:: - :class:`~airflow.models.BaseOperator` - - :param state_machine_arn: ARN of the Step Function State Machine - :type state_machine_arn: str - :param name: The name of the execution. - :type name: Optional[str] - :param state_machine_input: JSON data input to pass to the State Machine - :type state_machine_input: Union[Dict[str, any], str, None] - :param aws_conn_id: aws connection to uses - :type aws_conn_id: str - :param do_xcom_push: if True, execution_arn is pushed to XCom with key execution_arn. - :type do_xcom_push: bool - """ - - template_fields = ['state_machine_arn', 'name', 'input'] - template_ext = () - ui_color = '#f9c915' - - def __init__( - self, - *, - state_machine_arn: str, - name: Optional[str] = None, - state_machine_input: Union[dict, str, None] = None, - aws_conn_id: str = 'aws_default', - region_name: Optional[str] = None, - **kwargs, - ): - super().__init__(**kwargs) - self.state_machine_arn = state_machine_arn - self.name = name - self.input = state_machine_input - self.aws_conn_id = aws_conn_id - self.region_name = region_name - - def execute(self, context): - hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) - - execution_arn = hook.start_execution(self.state_machine_arn, self.name, self.input) - - if execution_arn is None: - raise AirflowException(f'Failed to start State Machine execution for: {self.state_machine_arn}') - - self.log.info('Started State Machine execution for %s: %s', self.state_machine_arn, execution_arn) - - return execution_arn +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.step_function`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/sensors/step_function.py b/airflow/providers/amazon/aws/sensors/step_function.py new file mode 100644 index 0000000000000..39cd74f69397d --- /dev/null +++ b/airflow/providers/amazon/aws/sensors/step_function.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from typing import Optional + +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook +from airflow.sensors.base import BaseSensorOperator + + +class StepFunctionExecutionSensor(BaseSensorOperator): + """ + Asks for the state of the Step Function State Machine Execution until it + reaches a failure state or success state. + If it fails, failing the task. + + On successful completion of the Execution the Sensor will do an XCom Push + of the State Machine's output to `output` + + :param execution_arn: execution_arn to check the state of + :type execution_arn: str + :param aws_conn_id: aws connection to use, defaults to 'aws_default' + :type aws_conn_id: str + """ + + INTERMEDIATE_STATES = ('RUNNING',) + FAILURE_STATES = ( + 'FAILED', + 'TIMED_OUT', + 'ABORTED', + ) + SUCCESS_STATES = ('SUCCEEDED',) + + template_fields = ['execution_arn'] + template_ext = () + ui_color = '#66c3ff' + + def __init__( + self, + *, + execution_arn: str, + aws_conn_id: str = 'aws_default', + region_name: Optional[str] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.execution_arn = execution_arn + self.aws_conn_id = aws_conn_id + self.region_name = region_name + self.hook: Optional[StepFunctionHook] = None + + def poke(self, context): + execution_status = self.get_hook().describe_execution(self.execution_arn) + state = execution_status['status'] + output = json.loads(execution_status['output']) if 'output' in execution_status else None + + if state in self.FAILURE_STATES: + raise AirflowException(f'Step Function sensor failed. State Machine Output: {output}') + + if state in self.INTERMEDIATE_STATES: + return False + + self.log.info('Doing xcom_push of output') + self.xcom_push(context, 'output', output) + return True + + def get_hook(self) -> StepFunctionHook: + """Create and return a StepFunctionHook""" + if self.hook: + return self.hook + + self.hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) + return self.hook diff --git a/airflow/providers/amazon/aws/sensors/step_function_execution.py b/airflow/providers/amazon/aws/sensors/step_function_execution.py index 39cd74f69397d..267343cd1df9e 100644 --- a/airflow/providers/amazon/aws/sensors/step_function_execution.py +++ b/airflow/providers/amazon/aws/sensors/step_function_execution.py @@ -15,74 +15,14 @@ # specific language governing permissions and limitations # under the License. -import json -from typing import Optional +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.sensors.step_function`.""" -from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook -from airflow.sensors.base import BaseSensorOperator +import warnings +from airflow.providers.amazon.aws.sensors.step_function import StepFunctionExecutionSensor # noqa -class StepFunctionExecutionSensor(BaseSensorOperator): - """ - Asks for the state of the Step Function State Machine Execution until it - reaches a failure state or success state. - If it fails, failing the task. - - On successful completion of the Execution the Sensor will do an XCom Push - of the State Machine's output to `output` - - :param execution_arn: execution_arn to check the state of - :type execution_arn: str - :param aws_conn_id: aws connection to use, defaults to 'aws_default' - :type aws_conn_id: str - """ - - INTERMEDIATE_STATES = ('RUNNING',) - FAILURE_STATES = ( - 'FAILED', - 'TIMED_OUT', - 'ABORTED', - ) - SUCCESS_STATES = ('SUCCEEDED',) - - template_fields = ['execution_arn'] - template_ext = () - ui_color = '#66c3ff' - - def __init__( - self, - *, - execution_arn: str, - aws_conn_id: str = 'aws_default', - region_name: Optional[str] = None, - **kwargs, - ): - super().__init__(**kwargs) - self.execution_arn = execution_arn - self.aws_conn_id = aws_conn_id - self.region_name = region_name - self.hook: Optional[StepFunctionHook] = None - - def poke(self, context): - execution_status = self.get_hook().describe_execution(self.execution_arn) - state = execution_status['status'] - output = json.loads(execution_status['output']) if 'output' in execution_status else None - - if state in self.FAILURE_STATES: - raise AirflowException(f'Step Function sensor failed. State Machine Output: {output}') - - if state in self.INTERMEDIATE_STATES: - return False - - self.log.info('Doing xcom_push of output') - self.xcom_push(context, 'output', output) - return True - - def get_hook(self) -> StepFunctionHook: - """Create and return a StepFunctionHook""" - if self.hook: - return self.hook - - self.hook = StepFunctionHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) - return self.hook +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.step_function`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 31f05a26563a3..12bdf831adcd6 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -244,6 +244,7 @@ operators: python-modules: - airflow.providers.amazon.aws.operators.step_function_get_execution_output - airflow.providers.amazon.aws.operators.step_function_start_execution + - airflow.providers.amazon.aws.operators.step_function - integration-name: Amazon Redshift python-modules: - airflow.providers.amazon.aws.operators.redshift @@ -305,6 +306,7 @@ sensors: - integration-name: AWS Step Functions python-modules: - airflow.providers.amazon.aws.sensors.step_function_execution + - airflow.providers.amazon.aws.sensors.step_function hooks: - integration-name: Amazon Athena diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py index 6565410d7bb3e..bdea1876f5511 100755 --- a/dev/provider_packages/prepare_provider_packages.py +++ b/dev/provider_packages/prepare_provider_packages.py @@ -2137,6 +2137,8 @@ def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: List[warnin "This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.", 'numpy.ufunc size changed, may indicate binary incompatibility. Expected 192 from C header,' ' got 216 from PyObject', + "This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.step_function`.", + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.step_function`.", 'This module is deprecated. Please use `airflow.providers.amazon.aws.operators.ec2`.', 'This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.ec2`.', } diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py index 5fcd2f4c47ec5..5efb57367f154 100644 --- a/tests/deprecated_classes.py +++ b/tests/deprecated_classes.py @@ -1518,6 +1518,16 @@ 'airflow.providers.amazon.aws.operators.s3_file_transform.S3FileTransformOperator', 'airflow.operators.s3_file_transform_operator.S3FileTransformOperator', ), + ( + 'airflow.providers.amazon.aws.operators.step_function.StepFunctionStartExecutionOperator', + 'airflow.providers.amazon.aws.operators.step_function_start_execution' + '.StepFunctionStartExecutionOperator', + ), + ( + 'airflow.providers.amazon.aws.operators.step_function.StepFunctionGetExecutionOutputOperator', + 'airflow.providers.amazon.aws.operators.step_function_get_execution_output' + '.StepFunctionGetExecutionOutputOperator', + ), ( 'airflow.providers.amazon.aws.sensors.s3_key.S3KeySensor', 'airflow.sensors.s3_key_sensor.S3KeySensor', @@ -1526,6 +1536,10 @@ 'airflow.providers.amazon.aws.sensors.s3_prefix.S3PrefixSensor', 'airflow.sensors.s3_prefix_sensor.S3PrefixSensor', ), + ( + 'airflow.providers.amazon.aws.sensors.step_function.StepFunctionExecutionSensor', + 'airflow.providers.amazon.aws.sensors.step_function_execution.StepFunctionExecutionSensor', + ), ( 'airflow.sensors.bash.BashSensor', 'airflow.contrib.sensors.bash_sensor.BashSensor', diff --git a/tests/providers/amazon/aws/operators/test_step_function_start_execution.py b/tests/providers/amazon/aws/operators/test_step_function.py similarity index 59% rename from tests/providers/amazon/aws/operators/test_step_function_start_execution.py rename to tests/providers/amazon/aws/operators/test_step_function.py index f71b8f02bb638..bc044d40e9b2c 100644 --- a/tests/providers/amazon/aws/operators/test_step_function_start_execution.py +++ b/tests/providers/amazon/aws/operators/test_step_function.py @@ -15,32 +15,80 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# import unittest from unittest import mock from unittest.mock import MagicMock -from airflow.providers.amazon.aws.operators.step_function_start_execution import ( +from airflow.providers.amazon.aws.operators.step_function import ( + StepFunctionGetExecutionOutputOperator, StepFunctionStartExecutionOperator, ) -TASK_ID = 'step_function_start_execution_task' +EXECUTION_ARN = ( + 'arn:aws:states:us-east-1:123456789012:execution:' + 'pseudo-state-machine:020f5b16-b1a1-4149-946f-92dd32d97934' +) +AWS_CONN_ID = 'aws_non_default' +REGION_NAME = 'us-west-2' STATE_MACHINE_ARN = 'arn:aws:states:us-east-1:000000000000:stateMachine:pseudo-state-machine' NAME = 'NAME' INPUT = '{}' -AWS_CONN_ID = 'aws_non_default' -REGION_NAME = 'us-west-2' + + +class TestStepFunctionGetExecutionOutputOperator(unittest.TestCase): + TASK_ID = 'step_function_get_execution_output' + + def setUp(self): + self.mock_context = MagicMock() + + def test_init(self): + # Given / When + operator = StepFunctionGetExecutionOutputOperator( + task_id=self.TASK_ID, + execution_arn=EXECUTION_ARN, + aws_conn_id=AWS_CONN_ID, + region_name=REGION_NAME, + ) + + # Then + assert self.TASK_ID == operator.task_id + assert EXECUTION_ARN == operator.execution_arn + assert AWS_CONN_ID == operator.aws_conn_id + assert REGION_NAME == operator.region_name + + @mock.patch('airflow.providers.amazon.aws.operators.step_function.StepFunctionHook') + def test_execute(self, mock_hook): + # Given + hook_response = {'output': '{}'} + + hook_instance = mock_hook.return_value + hook_instance.describe_execution.return_value = hook_response + + operator = StepFunctionGetExecutionOutputOperator( + task_id=self.TASK_ID, + execution_arn=EXECUTION_ARN, + aws_conn_id=AWS_CONN_ID, + region_name=REGION_NAME, + ) + + # When + result = operator.execute(self.mock_context) + + # Then + assert {} == result class TestStepFunctionStartExecutionOperator(unittest.TestCase): + TASK_ID = 'step_function_start_execution_task' + def setUp(self): self.mock_context = MagicMock() def test_init(self): # Given / When operator = StepFunctionStartExecutionOperator( - task_id=TASK_ID, + task_id=self.TASK_ID, state_machine_arn=STATE_MACHINE_ARN, name=NAME, state_machine_input=INPUT, @@ -49,14 +97,14 @@ def test_init(self): ) # Then - assert TASK_ID == operator.task_id + assert self.TASK_ID == operator.task_id assert STATE_MACHINE_ARN == operator.state_machine_arn assert NAME == operator.name assert INPUT == operator.input assert AWS_CONN_ID == operator.aws_conn_id assert REGION_NAME == operator.region_name - @mock.patch('airflow.providers.amazon.aws.operators.step_function_start_execution.StepFunctionHook') + @mock.patch('airflow.providers.amazon.aws.operators.step_function.StepFunctionHook') def test_execute(self, mock_hook): # Given hook_response = ( @@ -68,7 +116,7 @@ def test_execute(self, mock_hook): hook_instance.start_execution.return_value = hook_response operator = StepFunctionStartExecutionOperator( - task_id=TASK_ID, + task_id=self.TASK_ID, state_machine_arn=STATE_MACHINE_ARN, name=NAME, state_machine_input=INPUT, diff --git a/tests/providers/amazon/aws/operators/test_step_function_get_execution_output.py b/tests/providers/amazon/aws/operators/test_step_function_get_execution_output.py deleted file mode 100644 index 8bad0321b359c..0000000000000 --- a/tests/providers/amazon/aws/operators/test_step_function_get_execution_output.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import unittest -from unittest import mock -from unittest.mock import MagicMock - -from airflow.providers.amazon.aws.operators.step_function_get_execution_output import ( - StepFunctionGetExecutionOutputOperator, -) - -TASK_ID = 'step_function_get_execution_output' -EXECUTION_ARN = ( - 'arn:aws:states:us-east-1:123456789012:execution:' - 'pseudo-state-machine:020f5b16-b1a1-4149-946f-92dd32d97934' -) -AWS_CONN_ID = 'aws_non_default' -REGION_NAME = 'us-west-2' - - -class TestStepFunctionGetExecutionOutputOperator(unittest.TestCase): - def setUp(self): - self.mock_context = MagicMock() - - def test_init(self): - # Given / When - operator = StepFunctionGetExecutionOutputOperator( - task_id=TASK_ID, execution_arn=EXECUTION_ARN, aws_conn_id=AWS_CONN_ID, region_name=REGION_NAME - ) - - # Then - assert TASK_ID == operator.task_id - assert EXECUTION_ARN == operator.execution_arn - assert AWS_CONN_ID == operator.aws_conn_id - assert REGION_NAME == operator.region_name - - @mock.patch('airflow.providers.amazon.aws.operators.step_function_get_execution_output.StepFunctionHook') - def test_execute(self, mock_hook): - # Given - hook_response = {'output': '{}'} - - hook_instance = mock_hook.return_value - hook_instance.describe_execution.return_value = hook_response - - operator = StepFunctionGetExecutionOutputOperator( - task_id=TASK_ID, execution_arn=EXECUTION_ARN, aws_conn_id=AWS_CONN_ID, region_name=REGION_NAME - ) - - # When - result = operator.execute(self.mock_context) - - # Then - assert {} == result diff --git a/tests/providers/amazon/aws/sensors/test_step_function_execution.py b/tests/providers/amazon/aws/sensors/test_step_function.py similarity index 94% rename from tests/providers/amazon/aws/sensors/test_step_function_execution.py rename to tests/providers/amazon/aws/sensors/test_step_function.py index 03235392f0335..2d06583cb13d7 100644 --- a/tests/providers/amazon/aws/sensors/test_step_function_execution.py +++ b/tests/providers/amazon/aws/sensors/test_step_function.py @@ -24,7 +24,7 @@ from parameterized import parameterized from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.sensors.step_function_execution import StepFunctionExecutionSensor +from airflow.providers.amazon.aws.sensors.step_function import StepFunctionExecutionSensor TASK_ID = 'step_function_execution_sensor' EXECUTION_ARN = ( @@ -50,7 +50,7 @@ def test_init(self): assert REGION_NAME == sensor.region_name @parameterized.expand([('FAILED',), ('TIMED_OUT',), ('ABORTED',)]) - @mock.patch('airflow.providers.amazon.aws.sensors.step_function_execution.StepFunctionHook') + @mock.patch('airflow.providers.amazon.aws.sensors.step_function.StepFunctionHook') def test_exceptions(self, mock_status, mock_hook): hook_response = {'status': mock_status} @@ -64,7 +64,7 @@ def test_exceptions(self, mock_status, mock_hook): with pytest.raises(AirflowException): sensor.poke(self.mock_context) - @mock.patch('airflow.providers.amazon.aws.sensors.step_function_execution.StepFunctionHook') + @mock.patch('airflow.providers.amazon.aws.sensors.step_function.StepFunctionHook') def test_running(self, mock_hook): hook_response = {'status': 'RUNNING'} @@ -77,7 +77,7 @@ def test_running(self, mock_hook): assert not sensor.poke(self.mock_context) - @mock.patch('airflow.providers.amazon.aws.sensors.step_function_execution.StepFunctionHook') + @mock.patch('airflow.providers.amazon.aws.sensors.step_function.StepFunctionHook') def test_succeeded(self, mock_hook): hook_response = {'status': 'SUCCEEDED'}