diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index a4ffd9ef4d18d..4d85c08c8ddc1 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -21,6 +21,7 @@ from typing import Any from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +from airflow.providers.amazon.aws.utils import trim_none_values class LambdaHook(AwsBaseHook): @@ -70,7 +71,7 @@ def invoke_lambda( "Payload": payload, "Qualifier": qualifier, } - return self.conn.invoke(**{k: v for k, v in invoke_args.items() if v is not None}) + return self.conn.invoke(**trim_none_values(invoke_args)) def create_lambda( self, @@ -127,6 +128,4 @@ def create_lambda( "CodeSigningConfigArn": code_signing_config_arn, "Architectures": architectures, } - return self.conn.create_function( - **{k: v for k, v in create_function_args.items() if v is not None}, - ) + return self.conn.create_function(**trim_none_values(create_function_args)) diff --git a/airflow/providers/amazon/aws/operators/lambda_function.py b/airflow/providers/amazon/aws/operators/lambda_function.py index 91ef531505821..c53fd821327df 100644 --- a/airflow/providers/amazon/aws/operators/lambda_function.py +++ b/airflow/providers/amazon/aws/operators/lambda_function.py @@ -20,6 +20,7 @@ import json from typing import TYPE_CHECKING, Sequence +from airflow.compat.functools import cached_property from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook @@ -27,28 +28,114 @@ from airflow.utils.context import Context -class AwsLambdaInvokeFunctionOperator(BaseOperator): +class LambdaCreateFunctionOperator(BaseOperator): """ - Invokes an AWS Lambda function. - You can invoke a function synchronously (and wait for the response), - or asynchronously. - To invoke a function asynchronously, - set `invocation_type` to `Event`. For more details, - review the boto3 Lambda invoke docs. + Creates an AWS Lambda function. + + More information regarding parameters of this operator can be found here + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html#Lambda.Client.create_function + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:LambdaCreateFunctionOperator` :param function_name: The name of the AWS Lambda function, version, or alias. - :param log_type: Set to Tail to include the execution log in the response. Otherwise, set to "None". - :param qualifier: Specify a version or alias to invoke a published version of the function. - :param invocation_type: One of RequestResponse / Event / DryRun - :param client_context: Up to 3,583 bytes of base64-encoded data about the invoking client - to pass to the function in the context object. - :param payload: The JSON string that you want to provide to your Lambda function as input. + :param runtime: The identifier of the function's runtime. Runtime is required if the deployment package + is a .zip file archive. + :param role: The Amazon Resource Name (ARN) of the function's execution role. + :param handler: The name of the method within your code that Lambda calls to run your function. + Handler is required if the deployment package is a .zip file archive. + :param code: The code for the function. + :param description: A description of the function. + :param timeout: The amount of time (in seconds) that Lambda allows a function to run before stopping it. + :param config: Optional dictionary for arbitrary parameters to the boto API create_lambda call. + :param wait_for_completion: If True, the operator will wait until the function is active. :param aws_conn_id: The AWS connection ID to use + """ + + template_fields: Sequence[str] = ( + "function_name", + "runtime", + "role", + "handler", + "code", + "config", + ) + ui_color = "#ff7300" + + def __init__( + self, + *, + function_name: str, + runtime: str | None = None, + role: str, + handler: str | None = None, + code: dict, + description: str | None = None, + timeout: int | None = None, + config: dict = {}, + wait_for_completion: bool = False, + aws_conn_id: str = "aws_default", + **kwargs, + ): + super().__init__(**kwargs) + self.function_name = function_name + self.runtime = runtime + self.role = role + self.handler = handler + self.code = code + self.description = description + self.timeout = timeout + self.config = config + self.wait_for_completion = wait_for_completion + self.aws_conn_id = aws_conn_id + + @cached_property + def hook(self) -> LambdaHook: + return LambdaHook(aws_conn_id=self.aws_conn_id) + + def execute(self, context: Context): + self.log.info("Creating AWS Lambda function: %s", self.function_name) + response = self.hook.create_lambda( + function_name=self.function_name, + runtime=self.runtime, + role=self.role, + handler=self.handler, + code=self.code, + description=self.description, + timeout=self.timeout, + **self.config, + ) + self.log.info("Lambda response: %r", response) + + if self.wait_for_completion: + self.log.info("Wait for Lambda function to be active") + waiter = self.hook.conn.get_waiter("function_active_v2") + waiter.wait( + FunctionName=self.function_name, + ) + + return response.get("FunctionArn") + + +class AwsLambdaInvokeFunctionOperator(BaseOperator): + """ + Invokes an AWS Lambda function. You can invoke a function synchronously (and wait for the response), + or asynchronously. + To invoke a function asynchronously, set `invocation_type` to `Event`. For more details, + review the boto3 Lambda invoke docs. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:AwsLambdaInvokeFunctionOperator` + :param function_name: The name of the AWS Lambda function, version, or alias. + :param log_type: Set to Tail to include the execution log in the response. Otherwise, set to "None". + :param qualifier: Specify a version or alias to invoke a published version of the function. + :param invocation_type: AWS Lambda invocation type (RequestResponse, Event, DryRun) + :param client_context: Data about the invoking client to pass to the function in the context object + :param payload: JSON provided as input to the Lambda function + :param aws_conn_id: The AWS connection ID to use """ template_fields: Sequence[str] = ("function_name", "payload", "qualifier", "invocation_type") @@ -75,16 +162,19 @@ def __init__( self.client_context = client_context self.aws_conn_id = aws_conn_id + @cached_property + def hook(self) -> LambdaHook: + return LambdaHook(aws_conn_id=self.aws_conn_id) + def execute(self, context: Context): """ Invokes the target AWS Lambda function from Airflow. :return: The response payload from the function, or an error object. """ - hook = LambdaHook(aws_conn_id=self.aws_conn_id) success_status_codes = [200, 202, 204] self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.function_name, self.payload) - response = hook.invoke_lambda( + response = self.hook.invoke_lambda( function_name=self.function_name, invocation_type=self.invocation_type, log_type=self.log_type, diff --git a/airflow/providers/amazon/aws/sensors/lambda_function.py b/airflow/providers/amazon/aws/sensors/lambda_function.py new file mode 100644 index 0000000000000..44cb487352e85 --- /dev/null +++ b/airflow/providers/amazon/aws/sensors/lambda_function.py @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook +from airflow.providers.amazon.aws.utils import trim_none_values + +if TYPE_CHECKING: + from airflow.utils.context import Context + +from airflow.compat.functools import cached_property +from airflow.exceptions import AirflowException +from airflow.sensors.base import BaseSensorOperator + + +class LambdaFunctionStateSensor(BaseSensorOperator): + """ + Asks for the state of the Lambda until it reaches a target state. + If the query fails, the task will fail. + + .. seealso:: + For more information on how to use this sensor, take a look at the guide: + :ref:`howto/sensor:LambdaFunctionStateSensor` + + :param function_name: The name of the AWS Lambda function, version, or alias. + :param qualifier: Specify a version or alias to get details about a published version of the function. + :param target_states: The Lambda states desired. + :param aws_conn_id: aws connection to use, defaults to 'aws_default' + """ + + FAILURE_STATES = ("Failed",) + + template_fields: Sequence[str] = ( + "function_name", + "qualifier", + ) + + def __init__( + self, + *, + function_name: str, + qualifier: str | None = None, + target_states: list = ["Active"], + aws_conn_id: str = "aws_default", + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.aws_conn_id = aws_conn_id + self.function_name = function_name + self.qualifier = qualifier + self.target_states = target_states + + def poke(self, context: Context) -> bool: + get_function_args = { + "FunctionName": self.function_name, + "Qualifier": self.qualifier, + } + state = self.hook.conn.get_function(**trim_none_values(get_function_args))["Configuration"]["State"] + + if state in self.FAILURE_STATES: + raise AirflowException( + "Lambda function state sensor failed because the Lambda is in a failed state" + ) + + return state in self.target_states + + @cached_property + def hook(self) -> LambdaHook: + return LambdaHook(aws_conn_id=self.aws_conn_id) diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index c6dc833b787dd..7c5b557274f99 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -354,6 +354,9 @@ sensors: - airflow.providers.amazon.aws.sensors.glue - airflow.providers.amazon.aws.sensors.glue_crawler - airflow.providers.amazon.aws.sensors.glue_catalog_partition + - integration-name: AWS Lambda + python-modules: + - airflow.providers.amazon.aws.sensors.lambda_function - integration-name: Amazon RDS python-modules: - airflow.providers.amazon.aws.sensors.rds diff --git a/docs/apache-airflow-providers-amazon/operators/lambda.rst b/docs/apache-airflow-providers-amazon/operators/lambda.rst index 8f8f29bd1c4c9..eb161d4783be7 100644 --- a/docs/apache-airflow-providers-amazon/operators/lambda.rst +++ b/docs/apache-airflow-providers-amazon/operators/lambda.rst @@ -33,6 +33,20 @@ Prerequisite Tasks Operators --------- +.. _howto/operator:LambdaCreateFunctionOperator: + +Create an AWS Lambda function +============================= + +To create an AWS lambda function you can use +:class:`~airflow.providers.amazon.aws.operators.lambda_function.LambdaCreateFunctionOperator`. + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_lambda.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_create_lambda_function] + :end-before: [END howto_operator_create_lambda_function] + .. _howto/operator:AwsLambdaInvokeFunctionOperator: Invoke an AWS Lambda function @@ -41,12 +55,29 @@ Invoke an AWS Lambda function To invoke an AWS lambda function you can use :class:`~airflow.providers.amazon.aws.operators.lambda_function.AwsLambdaInvokeFunctionOperator`. +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_lambda.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_invoke_lambda_function] + :end-before: [END howto_operator_invoke_lambda_function] + +Sensors +--------- + +.. _howto/sensor:LambdaFunctionStateSensor: + +Wait on an Amazon Lambda function state +======================================= + +To check the state of an Amazon Lambda function until it reaches the target state or another terminal +state you can use :class:`~airflow.providers.amazon.aws.sensors.lambda_function.LambdaFunctionStateSensor`. .. exampleinclude:: /../../tests/system/providers/amazon/aws/example_lambda.py :language: python :dedent: 4 - :start-after: [START howto_operator_lambda] - :end-before: [END howto_operator_lambda] + :start-after: [START howto_sensor_lambda_function_state] + :end-before: [END howto_sensor_lambda_function_state] + Reference --------- diff --git a/tests/providers/amazon/aws/hooks/test_lambda_function.py b/tests/providers/amazon/aws/hooks/test_lambda_function.py index f2d9ca5ee30d3..f21c000ea6d9f 100644 --- a/tests/providers/amazon/aws/hooks/test_lambda_function.py +++ b/tests/providers/amazon/aws/hooks/test_lambda_function.py @@ -18,6 +18,7 @@ from __future__ import annotations from unittest import mock +from unittest.mock import MagicMock import pytest @@ -31,18 +32,24 @@ CODE = {} +class LambdaHookForTests(LambdaHook): + conn = MagicMock(name="conn") + + +@pytest.fixture +def hook(): + return LambdaHookForTests() + + class TestLambdaHook: - def test_get_conn_returns_a_boto3_connection(self): - hook = LambdaHook(aws_conn_id="aws_default") + def test_get_conn_returns_a_boto3_connection(self, hook): assert hook.conn is not None @mock.patch( "airflow.providers.amazon.aws.hooks.lambda_function.LambdaHook.conn", new_callable=mock.PropertyMock ) def test_invoke_lambda(self, mock_conn): - mock_conn().invoke.return_value = {} - - hook = LambdaHook(aws_conn_id="aws_default") + hook = LambdaHook() hook.invoke_lambda(function_name=FUNCTION_NAME, payload=PAYLOAD) mock_conn().invoke.assert_called_once_with( @@ -50,80 +57,77 @@ def test_invoke_lambda(self, mock_conn): Payload=PAYLOAD, ) - @mock.patch( - "airflow.providers.amazon.aws.hooks.lambda_function.LambdaHook.conn", new_callable=mock.PropertyMock + @pytest.mark.parametrize( + "hook_params, boto3_params", + [ + pytest.param( + { + "function_name": FUNCTION_NAME, + "runtime": RUNTIME, + "role": ROLE, + "handler": HANDLER, + "code": CODE, + "package_type": "Zip", + }, + { + "FunctionName": FUNCTION_NAME, + "Runtime": RUNTIME, + "Role": ROLE, + "Handler": HANDLER, + "Code": CODE, + "PackageType": "Zip", + }, + id="'Zip' as 'package_type'", + ), + pytest.param( + { + "function_name": FUNCTION_NAME, + "role": ROLE, + "code": CODE, + "package_type": "Image", + }, + { + "FunctionName": FUNCTION_NAME, + "Role": ROLE, + "Code": CODE, + "PackageType": "Image", + }, + id="'Image' as 'package_type'", + ), + ], ) - def test_create_lambda_with_zip_package_type(self, mock_conn): - mock_conn().create_function.return_value = {} - - hook = LambdaHook(aws_conn_id="aws_default") - hook.create_lambda( - function_name=FUNCTION_NAME, - runtime=RUNTIME, - role=ROLE, - handler=HANDLER, - code=CODE, - package_type="Zip", - ) - - mock_conn().create_function.assert_called_once_with( - FunctionName=FUNCTION_NAME, - Runtime=RUNTIME, - Role=ROLE, - Handler=HANDLER, - Code=CODE, - PackageType="Zip", - ) - - @mock.patch( - "airflow.providers.amazon.aws.hooks.lambda_function.LambdaHook.conn", new_callable=mock.PropertyMock - ) - def test_create_lambda_with_zip_package_type_and_no_runtime(self, mock_conn): - mock_conn().create_function.return_value = {} - - hook = LambdaHook(aws_conn_id="aws_default") - with pytest.raises(TypeError): - hook.create_lambda( - function_name=FUNCTION_NAME, - role=ROLE, - handler=HANDLER, - code=CODE, - package_type="Zip", - ) - - @mock.patch( - "airflow.providers.amazon.aws.hooks.lambda_function.LambdaHook.conn", new_callable=mock.PropertyMock + def test_create_lambda(self, hook_params, boto3_params, hook): + hook.conn.create_function.reset_mock() + hook.conn.create_function.return_value = {} + hook.create_lambda(**hook_params) + + hook.conn.create_function.assert_called_once_with(**boto3_params) + + @pytest.mark.parametrize( + "params", + [ + pytest.param( + { + "handler": HANDLER, + }, + id="'runtime' not provided", + ), + pytest.param( + { + "runtime": RUNTIME, + }, + id="'handler' not provided", + ), + ], ) - def test_create_lambda_with_zip_package_type_and_no_handler(self, mock_conn): - mock_conn().create_function.return_value = {} + def test_create_lambda_with_zip_package_type_and_missing_args(self, params, hook): + hook.conn.create_function.return_value = {} - hook = LambdaHook(aws_conn_id="aws_default") with pytest.raises(TypeError): hook.create_lambda( function_name=FUNCTION_NAME, - runtime=RUNTIME, role=ROLE, code=CODE, package_type="Zip", + **params, ) - - @mock.patch( - "airflow.providers.amazon.aws.hooks.lambda_function.LambdaHook.conn", new_callable=mock.PropertyMock - ) - def test_create_lambda_with_image_package_type(self, mock_conn): - mock_conn().create_function.return_value = {} - - hook = LambdaHook(aws_conn_id="aws_default") - hook.create_lambda( - function_name=FUNCTION_NAME, - role=ROLE, - code=CODE, - package_type="Image", - ) - - mock_conn().create_function.assert_called_once_with( - FunctionName=FUNCTION_NAME, - Role=ROLE, - Code=CODE, - PackageType="Image", - ) diff --git a/tests/providers/amazon/aws/operators/test_lambda.py b/tests/providers/amazon/aws/operators/test_lambda_function.py similarity index 73% rename from tests/providers/amazon/aws/operators/test_lambda.py rename to tests/providers/amazon/aws/operators/test_lambda_function.py index c43125ee78ae7..0fef5321b2733 100644 --- a/tests/providers/amazon/aws/operators/test_lambda.py +++ b/tests/providers/amazon/aws/operators/test_lambda_function.py @@ -20,13 +20,56 @@ import io import json import zipfile +from unittest import mock import pytest from moto import mock_iam, mock_lambda, mock_sts from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook -from airflow.providers.amazon.aws.operators.lambda_function import AwsLambdaInvokeFunctionOperator +from airflow.providers.amazon.aws.operators.lambda_function import ( + AwsLambdaInvokeFunctionOperator, + LambdaCreateFunctionOperator, +) + +FUNCTION_NAME = "function_name" +ROLE_ARN = "role_arn" +IMAGE_URI = "image_uri" + + +class TestLambdaCreateFunctionOperator: + @mock.patch.object(LambdaHook, "create_lambda") + @mock.patch.object(LambdaHook, "conn") + def test_create_lambda_without_wait_for_completion(self, mock_hook_conn, mock_hook_create_lambda): + operator = LambdaCreateFunctionOperator( + task_id="task_test", + function_name=FUNCTION_NAME, + role=ROLE_ARN, + code={ + "ImageUri": IMAGE_URI, + }, + ) + operator.execute(None) + + mock_hook_create_lambda.assert_called_once() + mock_hook_conn.get_waiter.assert_not_called() + + @mock.patch.object(LambdaHook, "create_lambda") + @mock.patch.object(LambdaHook, "conn") + def test_create_lambda_with_wait_for_completion(self, mock_hook_conn, mock_hook_create_lambda): + operator = LambdaCreateFunctionOperator( + task_id="task_test", + function_name=FUNCTION_NAME, + role=ROLE_ARN, + code={ + "ImageUri": IMAGE_URI, + }, + wait_for_completion=True, + ) + operator.execute(None) + + mock_hook_create_lambda.assert_called_once() + mock_hook_conn.get_waiter.assert_called_once_with("function_active_v2") @mock_lambda diff --git a/tests/providers/amazon/aws/sensors/test_lambda_function.py b/tests/providers/amazon/aws/sensors/test_lambda_function.py new file mode 100644 index 0000000000000..91d7e81b71642 --- /dev/null +++ b/tests/providers/amazon/aws/sensors/test_lambda_function.py @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow import AirflowException +from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook +from airflow.providers.amazon.aws.sensors.lambda_function import LambdaFunctionStateSensor + +FUNCTION_NAME = "function_name" + + +class TestLambdaFunctionStateSensor: + @pytest.mark.parametrize( + "get_function_output, expect_failure, expected", + [ + pytest.param( + {"Configuration": {"State": "Active"}}, + False, + True, + id="Active state", + ), + pytest.param( + {"Configuration": {"State": "Pending"}}, + False, + False, + id="Pending state", + ), + pytest.param( + {"Configuration": {"State": "Failed"}}, + True, + None, + id="Failed state", + ), + ], + ) + def test_poke(self, get_function_output, expect_failure, expected): + with mock.patch.object(LambdaHook, "conn") as mock_conn: + mock_conn.get_function.return_value = get_function_output + sensor = LambdaFunctionStateSensor( + task_id="test_sensor", + function_name=FUNCTION_NAME, + ) + + if expect_failure: + with pytest.raises(AirflowException): + sensor.poke({}) + else: + result = sensor.poke({}) + assert result == expected + + mock_conn.get_function.assert_called_once_with( + FunctionName=FUNCTION_NAME, + ) diff --git a/tests/system/providers/amazon/aws/example_lambda.py b/tests/system/providers/amazon/aws/example_lambda.py index d6d4d54c9af5b..f68b3dc43c6fd 100644 --- a/tests/system/providers/amazon/aws/example_lambda.py +++ b/tests/system/providers/amazon/aws/example_lambda.py @@ -26,7 +26,11 @@ from airflow import models from airflow.decorators import task from airflow.models.baseoperator import chain -from airflow.providers.amazon.aws.operators.lambda_function import AwsLambdaInvokeFunctionOperator +from airflow.providers.amazon.aws.operators.lambda_function import ( + AwsLambdaInvokeFunctionOperator, + LambdaCreateFunctionOperator, +) +from airflow.providers.amazon.aws.sensors.lambda_function import LambdaFunctionStateSensor from airflow.utils.trigger_rule import TriggerRule from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder, purge_logs @@ -54,28 +58,6 @@ def create_zip(content: str): return zip_output.read() -@task -def create_lambda(function_name: str, role_arn: str): - client = boto3.client("lambda") - client.create_function( - FunctionName=function_name, - Runtime="python3.9", - Role=role_arn, - Handler="lambda_function.test", - Code={ - "ZipFile": create_zip(CODE_CONTENT), - }, - Description="Function used for system tests", - ) - - -@task -def await_lambda(function_name: str): - client = boto3.client("lambda") - waiter = client.get_waiter("function_active_v2") - waiter.wait(FunctionName=function_name) - - @task(trigger_rule=TriggerRule.ALL_DONE) def delete_lambda(function_name: str): client = boto3.client("lambda") @@ -103,21 +85,42 @@ def delete_logs(function_name: str) -> None: test_context = sys_test_context_task() lambda_function_name: str = f"{test_context[ENV_ID_KEY]}-function" + role_arn = test_context[ROLE_ARN_KEY] + + # [START howto_operator_create_lambda_function] + create_lambda_function = LambdaCreateFunctionOperator( + task_id="create_lambda_function", + function_name=lambda_function_name, + runtime="python3.9", + role=role_arn, + handler="lambda_function.test", + code={ + "ZipFile": create_zip(CODE_CONTENT), + }, + ) + # [END howto_operator_create_lambda_function] + + # [START howto_sensor_lambda_function_state] + wait_lambda_function_state = LambdaFunctionStateSensor( + task_id="wait_lambda_function_state", + function_name=lambda_function_name, + ) + # [END howto_sensor_lambda_function_state] - # [START howto_operator_lambda] + # [START howto_operator_invoke_lambda_function] invoke_lambda_function = AwsLambdaInvokeFunctionOperator( task_id="invoke_lambda_function", function_name=lambda_function_name, payload=json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}}), ) - # [END howto_operator_lambda] + # [END howto_operator_invoke_lambda_function] chain( # TEST SETUP test_context, - create_lambda(lambda_function_name, test_context[ROLE_ARN_KEY]), - await_lambda(lambda_function_name), # TEST BODY + create_lambda_function, + wait_lambda_function_state, invoke_lambda_function, # TEST TEARDOWN delete_lambda(lambda_function_name),