From 16449378737d6e95479933c7e751788c613efe6e Mon Sep 17 00:00:00 2001 From: syedahsn Date: Fri, 10 Feb 2023 11:21:50 -0800 Subject: [PATCH 1/5] Add EC2CreateInstanceOperator and EC2TerminteInstanceOperator Change system test to use the new operators Add unit tests for new operators --- airflow/providers/amazon/aws/operators/ec2.py | 113 ++++++++++++++++++ .../operators/ec2.rst | 25 ++++ .../amazon/aws/operators/test_ec2.py | 87 ++++++++++++-- .../providers/amazon/aws/example_ec2.py | 99 ++++++++------- 4 files changed, 271 insertions(+), 53 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/ec2.py b/airflow/providers/amazon/aws/operators/ec2.py index 60cb43a32dd09..d127c523311e3 100644 --- a/airflow/providers/amazon/aws/operators/ec2.py +++ b/airflow/providers/amazon/aws/operators/ec2.py @@ -116,3 +116,116 @@ def execute(self, context: Context): target_state="stopped", check_interval=self.check_interval, ) + + +class EC2CreateInstanceOperator(BaseOperator): + """ + Create and start an EC2 Instance using boto3 + + :param image_id: ID of the AMI used to create the instance. + :param max_count: Maximum number of instances to launch. Defaults to 1. + :param min_count: Minimum number of instances to launch. Defaults to 1. + :param aws_conn_id: AWS connection to use + :param region_name: AWS region name associated with the client. + :param poll_interval: Number of seconds to wait before attempting to + check state of instance. Only used if wait_for_completion is True. Default is 20. + :param max_attempts: Maximum number of attempts when checking state of instance. + Only used if wait_for_completion is True. Default is 20. + :param config: Dictionary for arbitrary parameters to the boto3 run_instances call. + :param wait_for_completion: If True, the operator will wait for the instance to be + in the `running` state before returning. + """ + + template_fields: Sequence[str] = ("image_id", "region_name", "config") + + def __init__( + self, + image_id: str, + max_count: int = 1, + min_count: int = 1, + aws_conn_id: str = "aws_default", + region_name: str | None = None, + poll_interval: int = 20, + max_attempts: int = 20, + config: dict | None = None, + wait_for_completion: bool = False, + **kwargs, + ): + super().__init__(**kwargs) + self.image_id = image_id + self.max_count = max_count + self.min_count = min_count + self.aws_conn_id = aws_conn_id + self.region_name = region_name + self.poll_interval = poll_interval + self.max_attempts = max_attempts + self.config = config or {} + self.wait_for_completion = wait_for_completion + + def execute(self, context: Context): + ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name, api_type="client_type") + instance_id = ec2_hook.conn.run_instances( + ImageId=self.image_id, + MinCount=self.min_count, + MaxCount=self.max_count, + **self.config, + )["Instances"][0]["InstanceId"] + self.log.info("Created EC2 instance %s", instance_id) + if self.wait_for_completion: + ec2_hook.get_waiter("instance_running").wait( + InstanceIds=[instance_id], + WaiterConfig={ + "Delay": self.poll_interval, + "MaxAttempts": self.max_attempts, + }, + ) + + return instance_id + + +class EC2TerminateInstanceOperator(BaseOperator): + """ + Terminate an EC2 Instance using boto3 + + :instance_id: ID of the instance to be terminated. + :param aws_conn_id: AWS connection to use + :param region_name: AWS region name associated with the client. + :param poll_interval: Number of seconds to wait before attempting to + check state of instance. Only used if wait_for_completion is True. Default is 20. + :param max_attempts: Maximum number of attempts when checking state of instance. + Only used if wait_for_completion is True. Default is 20. + """ + + template_fields: Sequence[str] = ("instance_id", "region_name") + + def __init__( + self, + instance_id: str, + aws_conn_id: str = "aws_default", + region_name: str | None = None, + poll_interval: int = 20, + max_attempts: int = 20, + wait_for_completion: bool = False, + **kwargs, + ): + super().__init__(**kwargs) + self.instance_id = instance_id + self.aws_conn_id = aws_conn_id + self.region_name = region_name + self.poll_interval = poll_interval + self.max_attempts = max_attempts + self.wait_for_completion = wait_for_completion + + def execute(self, context: Context): + ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name, api_type="client_type") + ec2_hook.conn.terminate_instances(InstanceIds=[self.instance_id]) + + self.log.info("Terminating EC2 instance %s", self.instance_id) + if self.wait_for_completion: + ec2_hook.get_waiter("instance_terminated").wait( + InstanceIds=[self.instance_id], + WaiterConfig={ + "Delay": self.poll_interval, + "MaxAttempts": self.max_attempts, + }, + ) diff --git a/docs/apache-airflow-providers-amazon/operators/ec2.rst b/docs/apache-airflow-providers-amazon/operators/ec2.rst index 37e5d804ee5e1..af216e08da7e4 100644 --- a/docs/apache-airflow-providers-amazon/operators/ec2.rst +++ b/docs/apache-airflow-providers-amazon/operators/ec2.rst @@ -58,6 +58,31 @@ To stop an Amazon EC2 instance you can use :start-after: [START howto_operator_ec2_stop_instance] :end-before: [END howto_operator_ec2_stop_instance] + +Create and start an Amazon EC2 instance +======================================= + +To create and start an Amazon EC2 instance you can use +:class:`~airflow.providers.amazon.aws.operators.ec2.EC2CreateInstanceOperator`. + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ec2.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_ec2_create_instance] + :end-before: [END howto_operator_ec2_create_instance] + + +Terminate an Amazon EC2 instance +================================ + +To terminate an Amazon EC2 instance you can use +:class:`~airflow.providers.amazon.aws.operators.ec2.EC2TerminateInstanceOperator`. + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ec2.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_ec2_terminate_instance] + :end-before: [END howto_operator_ec2_terminate_instance] Sensors ------- diff --git a/tests/providers/amazon/aws/operators/test_ec2.py b/tests/providers/amazon/aws/operators/test_ec2.py index 8fd4c535a1e12..d3e9f968b6111 100644 --- a/tests/providers/amazon/aws/operators/test_ec2.py +++ b/tests/providers/amazon/aws/operators/test_ec2.py @@ -20,23 +20,83 @@ from moto import mock_ec2 from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook -from airflow.providers.amazon.aws.operators.ec2 import EC2StartInstanceOperator, EC2StopInstanceOperator +from airflow.providers.amazon.aws.operators.ec2 import ( + EC2CreateInstanceOperator, + EC2StartInstanceOperator, + EC2StopInstanceOperator, + EC2TerminateInstanceOperator, +) class BaseEc2TestClass: @classmethod - def _create_instance(cls, hook: EC2Hook): - """Create Instance and return instance id.""" + def _get_image_id(cls, hook): + """Get a valid image id to create an instance.""" conn = hook.get_conn() try: ec2_client = conn.meta.client except AttributeError: ec2_client = conn - - # We need existed AMI Image ID otherwise `moto` will raise DeprecationWarning. + + # We need an existing AMI Image ID otherwise `moto` will raise DeprecationWarning. images = ec2_client.describe_images()["Images"] - response = ec2_client.run_instances(MaxCount=1, MinCount=1, ImageId=images[0]["ImageId"]) - return response["Instances"][0]["InstanceId"] + return images[0]["ImageId"] + +class TestEC2CreateInstanceOperator(BaseEc2TestClass): + def test_init(self): + ec2_operator = EC2CreateInstanceOperator( + task_id="test_create_instance", + image_id="test_image_id", + ) + + assert ec2_operator.task_id == "test_create_instance" + assert ec2_operator.image_id == "test_image_id" + assert ec2_operator.max_count == 1 + assert ec2_operator.min_count == 1 + assert ec2_operator.max_attempts == 20 + assert ec2_operator.poll_interval == 20 + + @mock_ec2 + def test_create_intance(self): + ec2_hook = EC2Hook() + create_instance = EC2CreateInstanceOperator( + image_id=self._get_image_id(ec2_hook), + task_id="test_create_instance", + ) + instance_id = create_instance.execute(None) + + assert ec2_hook.get_instance_state(instance_id=instance_id) == "running" + + +class TestEC2TerminateInstanceOperator(BaseEc2TestClass): + def test_init(self): + ec2_operator = EC2TerminateInstanceOperator( + task_id="test_terminate_instance", + instance_id="test_image_id", + ) + + assert ec2_operator.task_id == "test_terminate_instance" + assert ec2_operator.max_attempts == 20 + assert ec2_operator.poll_interval == 20 + + @mock_ec2 + def test_terminate_intance(self): + ec2_hook = EC2Hook() + + create_instance = EC2CreateInstanceOperator( + image_id=self._get_image_id(ec2_hook), + task_id="test_create_instance", + ) + instance_id = create_instance.execute(None) + + assert ec2_hook.get_instance_state(instance_id=instance_id) == "running" + + terminate_instance = EC2TerminateInstanceOperator( + task_id="test_terminate_instance", instance_id=instance_id + ) + terminate_instance.execute(None) + + assert ec2_hook.get_instance_state(instance_id=instance_id) == "terminated" class TestEC2StartInstanceOperator(BaseEc2TestClass): @@ -58,7 +118,11 @@ def test_init(self): def test_start_instance(self): # create instance ec2_hook = EC2Hook() - instance_id = self._create_instance(ec2_hook) + create_instance = EC2CreateInstanceOperator( + image_id=self._get_image_id(ec2_hook), + task_id="test_create_instance", + ) + instance_id = create_instance.execute(None) # start instance start_test = EC2StartInstanceOperator( @@ -89,7 +153,11 @@ def test_init(self): def test_stop_instance(self): # create instance ec2_hook = EC2Hook() - instance_id = self._create_instance(ec2_hook) + create_instance = EC2CreateInstanceOperator( + image_id=self._get_image_id(ec2_hook), + task_id="test_create_instance", + ) + instance_id = create_instance.execute(None) # stop instance stop_test = EC2StopInstanceOperator( @@ -99,3 +167,4 @@ def test_stop_instance(self): stop_test.execute(None) # assert instance state is running assert ec2_hook.get_instance_state(instance_id=instance_id) == "stopped" + diff --git a/tests/system/providers/amazon/aws/example_ec2.py b/tests/system/providers/amazon/aws/example_ec2.py index 36bddc5548ffc..78e3798070e5d 100644 --- a/tests/system/providers/amazon/aws/example_ec2.py +++ b/tests/system/providers/amazon/aws/example_ec2.py @@ -24,7 +24,12 @@ from airflow import DAG from airflow.decorators import task from airflow.models.baseoperator import chain -from airflow.providers.amazon.aws.operators.ec2 import EC2StartInstanceOperator, EC2StopInstanceOperator +from airflow.providers.amazon.aws.operators.ec2 import ( + EC2CreateInstanceOperator, + EC2StartInstanceOperator, + EC2StopInstanceOperator, + EC2TerminateInstanceOperator, +) from airflow.providers.amazon.aws.sensors.ec2 import EC2InstanceStateSensor from airflow.utils.trigger_rule import TriggerRule from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder @@ -62,35 +67,6 @@ def create_key_pair(key_name: str): return key_pair_id -@task -def create_instance(instance_name: str, key_pair_id: str): - client = boto3.client("ec2") - - # Create the instance - instance_id = client.run_instances( - ImageId=_get_latest_ami_id(), - MinCount=1, - MaxCount=1, - InstanceType="t2.micro", - KeyName=key_pair_id, - TagSpecifications=[{"ResourceType": "instance", "Tags": [{"Key": "Name", "Value": instance_name}]}], - # Use IMDSv2 for greater security, see the following doc for more details: - # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html - MetadataOptions={"HttpEndpoint": "enabled", "HttpTokens": "required"}, - )["Instances"][0]["InstanceId"] - - # Wait for it to exist - waiter = client.get_waiter("instance_status_ok") - waiter.wait(InstanceIds=[instance_id]) - - return instance_id - - -@task(trigger_rule=TriggerRule.ALL_DONE) -def terminate_instance(instance: str): - boto3.client("ec2").terminate_instances(InstanceIds=[instance]) - - @task(trigger_rule=TriggerRule.ALL_DONE) def delete_key_pair(key_pair_id: str): boto3.client("ec2").delete_key_pair(KeyName=key_pair_id) @@ -105,43 +81,78 @@ def delete_key_pair(key_pair_id: str): ) as dag: test_context = sys_test_context_task() env_id = test_context[ENV_ID_KEY] - + instance_name = f"{env_id}-instance" key_name = create_key_pair(key_name=f"{env_id}_key_pair") - instance_id = create_instance(instance_name=f"{env_id}-instance", key_pair_id=key_name) + + config = { + "InstanceType": "t2.micro", + "KeyName": key_name, + "TagSpecifications": [ + {"ResourceType": "instance", "Tags": [{"Key": "Name", "Value": instance_name}]} + ], + # Use IMDSv2 for greater security, see the following doc for more details: + # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html + "MetadataOptions": {"HttpEndpoint": "enabled", "HttpTokens": "required"}, + } + # [START howto_operator_ec2_create_instance] + create_instance = EC2CreateInstanceOperator( + task_id="create_instance", + image_id=_get_latest_ami_id(), + max_count=1, + min_count=1, + config=config, + wait_for_completion=True, + ) + # [END howto_operator_ec2_create_instance] + + # [START howto_operator_ec2_stop_instance] + stop_instance_1 = EC2StopInstanceOperator( + task_id="stop_instance_1", + instance_id=create_instance.output, + ) + # [END howto_operator_ec2_stop_instance] + stop_instance_1.trigger_rule = TriggerRule.ALL_DONE + # [START howto_operator_ec2_start_instance] start_instance = EC2StartInstanceOperator( task_id="start_instance", - instance_id=instance_id, + instance_id=create_instance.output, ) # [END howto_operator_ec2_start_instance] # [START howto_sensor_ec2_instance_state] await_instance = EC2InstanceStateSensor( task_id="await_instance", - instance_id=instance_id, + instance_id=create_instance.output, target_state="running", ) # [END howto_sensor_ec2_instance_state] - # [START howto_operator_ec2_stop_instance] - stop_instance = EC2StopInstanceOperator( - task_id="stop_instance", - instance_id=instance_id, + stop_instance_2 = EC2StopInstanceOperator( + task_id="stop_instance_2", + instance_id=create_instance.output, + trigger_rule=TriggerRule.ALL_DONE, ) - # [END howto_operator_ec2_stop_instance] - stop_instance.trigger_rule = TriggerRule.ALL_DONE - + # [START howto_operator_ec2_terminate_instance] + terminate_instance = EC2TerminateInstanceOperator( + task_id="terminate_instance", + instance_id=create_instance.output, + wait_for_completion=True, + ) + # [END howto_operator_ec2_terminate_instance] + terminate_instance.trigger_rule = TriggerRule.ALL_DONE chain( # TEST SETUP test_context, key_name, - instance_id, # TEST BODY + create_instance, + stop_instance_1, start_instance, await_instance, - stop_instance, + stop_instance_2, + terminate_instance, # TEST TEARDOWN - terminate_instance(instance_id), delete_key_pair(key_name), ) From 3226a4f80b8f2b54218b2d9fb279611b29582391 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Mon, 13 Feb 2023 16:00:33 -0800 Subject: [PATCH 2/5] Add support for multiple ids to EC2TerminateInstanceOperator Change system test to terminate without stopping instances --- airflow/providers/amazon/aws/operators/ec2.py | 77 ++++++++++++------- .../operators/ec2.rst | 2 + .../providers/amazon/aws/example_ec2.py | 23 +++--- 3 files changed, 63 insertions(+), 39 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/ec2.py b/airflow/providers/amazon/aws/operators/ec2.py index d127c523311e3..b5e9e3bc54873 100644 --- a/airflow/providers/amazon/aws/operators/ec2.py +++ b/airflow/providers/amazon/aws/operators/ec2.py @@ -122,6 +122,10 @@ class EC2CreateInstanceOperator(BaseOperator): """ Create and start an EC2 Instance using boto3 + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:EC2CreateInstanceOperator` + :param image_id: ID of the AMI used to create the instance. :param max_count: Maximum number of instances to launch. Defaults to 1. :param min_count: Minimum number of instances to launch. Defaults to 1. @@ -136,7 +140,15 @@ class EC2CreateInstanceOperator(BaseOperator): in the `running` state before returning. """ - template_fields: Sequence[str] = ("image_id", "region_name", "config") + template_fields: Sequence[str] = ( + "image_id", + "max_count", + "min_count", + "aws_conn_id", + "region_name", + "config", + "wait_for_completion", + ) def __init__( self, @@ -164,29 +176,37 @@ def __init__( def execute(self, context: Context): ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name, api_type="client_type") - instance_id = ec2_hook.conn.run_instances( + instances = ec2_hook.conn.run_instances( ImageId=self.image_id, MinCount=self.min_count, MaxCount=self.max_count, **self.config, - )["Instances"][0]["InstanceId"] - self.log.info("Created EC2 instance %s", instance_id) - if self.wait_for_completion: - ec2_hook.get_waiter("instance_running").wait( - InstanceIds=[instance_id], - WaiterConfig={ - "Delay": self.poll_interval, - "MaxAttempts": self.max_attempts, - }, - ) + )["Instances"] + instance_ids = [] + for instance in instances: + instance_ids.append(instance["InstanceId"]) + self.log.info("Created EC2 instance %s", instance["InstanceId"]) - return instance_id + if self.wait_for_completion: + ec2_hook.get_waiter("instance_running").wait( + InstanceIds=[instance["InstanceId"]], + WaiterConfig={ + "Delay": self.poll_interval, + "MaxAttempts": self.max_attempts, + }, + ) + + return instance_ids class EC2TerminateInstanceOperator(BaseOperator): """ Terminate an EC2 Instance using boto3 + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:EC2TerminateInstanceOperator` + :instance_id: ID of the instance to be terminated. :param aws_conn_id: AWS connection to use :param region_name: AWS region name associated with the client. @@ -194,13 +214,15 @@ class EC2TerminateInstanceOperator(BaseOperator): check state of instance. Only used if wait_for_completion is True. Default is 20. :param max_attempts: Maximum number of attempts when checking state of instance. Only used if wait_for_completion is True. Default is 20. + :param wait_for_completion: If True, the operator will wait for the instance to be + in the `terminated` state before returning. """ - template_fields: Sequence[str] = ("instance_id", "region_name") + template_fields: Sequence[str] = ("instance_id", "region_name", "aws_conn_id", "wait_for_completion") def __init__( self, - instance_id: str, + instance_id: str | list[str], aws_conn_id: str = "aws_default", region_name: str | None = None, poll_interval: int = 20, @@ -209,7 +231,7 @@ def __init__( **kwargs, ): super().__init__(**kwargs) - self.instance_id = instance_id + self.instance_ids = [*instance_id] self.aws_conn_id = aws_conn_id self.region_name = region_name self.poll_interval = poll_interval @@ -218,14 +240,15 @@ def __init__( def execute(self, context: Context): ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name, api_type="client_type") - ec2_hook.conn.terminate_instances(InstanceIds=[self.instance_id]) - - self.log.info("Terminating EC2 instance %s", self.instance_id) - if self.wait_for_completion: - ec2_hook.get_waiter("instance_terminated").wait( - InstanceIds=[self.instance_id], - WaiterConfig={ - "Delay": self.poll_interval, - "MaxAttempts": self.max_attempts, - }, - ) + ec2_hook.conn.terminate_instances(InstanceIds=self.instance_ids) + + for instance_id in self.instance_ids: + self.log.info("Terminating EC2 instance %s", instance_id) + if self.wait_for_completion: + ec2_hook.get_waiter("instance_terminated").wait( + InstanceIds=[instance_id], + WaiterConfig={ + "Delay": self.poll_interval, + "MaxAttempts": self.max_attempts, + }, + ) diff --git a/docs/apache-airflow-providers-amazon/operators/ec2.rst b/docs/apache-airflow-providers-amazon/operators/ec2.rst index af216e08da7e4..cfe125926f6ac 100644 --- a/docs/apache-airflow-providers-amazon/operators/ec2.rst +++ b/docs/apache-airflow-providers-amazon/operators/ec2.rst @@ -58,6 +58,7 @@ To stop an Amazon EC2 instance you can use :start-after: [START howto_operator_ec2_stop_instance] :end-before: [END howto_operator_ec2_stop_instance] +.. _howto/operator:EC2CreateInstanceOperator: Create and start an Amazon EC2 instance ======================================= @@ -71,6 +72,7 @@ To create and start an Amazon EC2 instance you can use :start-after: [START howto_operator_ec2_create_instance] :end-before: [END howto_operator_ec2_create_instance] +.. _howto/operator:EC2TerminateInstanceOperator: Terminate an Amazon EC2 instance ================================ diff --git a/tests/system/providers/amazon/aws/example_ec2.py b/tests/system/providers/amazon/aws/example_ec2.py index 78e3798070e5d..04e46b559e30e 100644 --- a/tests/system/providers/amazon/aws/example_ec2.py +++ b/tests/system/providers/amazon/aws/example_ec2.py @@ -83,6 +83,7 @@ def delete_key_pair(key_pair_id: str): env_id = test_context[ENV_ID_KEY] instance_name = f"{env_id}-instance" key_name = create_key_pair(key_name=f"{env_id}_key_pair") + image_id = _get_latest_ami_id() config = { "InstanceType": "t2.micro", @@ -94,24 +95,28 @@ def delete_key_pair(key_pair_id: str): # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html "MetadataOptions": {"HttpEndpoint": "enabled", "HttpTokens": "required"}, } + + # EC2CreateInstanceOperator creates and starts the EC2 instances. To test the EC2StartInstanceOperator, + # we will stop the instance, then start them again before terminating them. + # [START howto_operator_ec2_create_instance] create_instance = EC2CreateInstanceOperator( task_id="create_instance", - image_id=_get_latest_ami_id(), + image_id=image_id, max_count=1, min_count=1, config=config, - wait_for_completion=True, ) # [END howto_operator_ec2_create_instance] + create_instance.wait_for_completion = True # [START howto_operator_ec2_stop_instance] - stop_instance_1 = EC2StopInstanceOperator( - task_id="stop_instance_1", + stop_instance = EC2StopInstanceOperator( + task_id="stop_instance", instance_id=create_instance.output, ) # [END howto_operator_ec2_stop_instance] - stop_instance_1.trigger_rule = TriggerRule.ALL_DONE + stop_instance.trigger_rule = TriggerRule.ALL_DONE # [START howto_operator_ec2_start_instance] start_instance = EC2StartInstanceOperator( @@ -128,11 +133,6 @@ def delete_key_pair(key_pair_id: str): ) # [END howto_sensor_ec2_instance_state] - stop_instance_2 = EC2StopInstanceOperator( - task_id="stop_instance_2", - instance_id=create_instance.output, - trigger_rule=TriggerRule.ALL_DONE, - ) # [START howto_operator_ec2_terminate_instance] terminate_instance = EC2TerminateInstanceOperator( task_id="terminate_instance", @@ -147,10 +147,9 @@ def delete_key_pair(key_pair_id: str): key_name, # TEST BODY create_instance, - stop_instance_1, + stop_instance, start_instance, await_instance, - stop_instance_2, terminate_instance, # TEST TEARDOWN delete_key_pair(key_name), From 16b35bc6c827cba8ed5a0507aea7f6c5185e8395 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Fri, 17 Feb 2023 15:35:01 -0800 Subject: [PATCH 3/5] Fix failing tests for terminate operator --- airflow/providers/amazon/aws/operators/ec2.py | 8 ++++--- .../amazon/aws/operators/test_ec2.py | 22 +++++++++---------- .../providers/amazon/aws/example_ec2.py | 16 +++++++++----- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/ec2.py b/airflow/providers/amazon/aws/operators/ec2.py index b5e9e3bc54873..b4c12dd855e63 100644 --- a/airflow/providers/amazon/aws/operators/ec2.py +++ b/airflow/providers/amazon/aws/operators/ec2.py @@ -218,11 +218,11 @@ class EC2TerminateInstanceOperator(BaseOperator): in the `terminated` state before returning. """ - template_fields: Sequence[str] = ("instance_id", "region_name", "aws_conn_id", "wait_for_completion") + template_fields: Sequence[str] = ("instance_ids", "region_name", "aws_conn_id", "wait_for_completion") def __init__( self, - instance_id: str | list[str], + instance_ids: str | list[str], aws_conn_id: str = "aws_default", region_name: str | None = None, poll_interval: int = 20, @@ -231,7 +231,7 @@ def __init__( **kwargs, ): super().__init__(**kwargs) - self.instance_ids = [*instance_id] + self.instance_ids = instance_ids self.aws_conn_id = aws_conn_id self.region_name = region_name self.poll_interval = poll_interval @@ -239,6 +239,8 @@ def __init__( self.wait_for_completion = wait_for_completion def execute(self, context: Context): + if isinstance(self.instance_ids, str): + self.instance_ids = [self.instance_ids] ec2_hook = EC2Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name, api_type="client_type") ec2_hook.conn.terminate_instances(InstanceIds=self.instance_ids) diff --git a/tests/providers/amazon/aws/operators/test_ec2.py b/tests/providers/amazon/aws/operators/test_ec2.py index d3e9f968b6111..4c78667c35de5 100644 --- a/tests/providers/amazon/aws/operators/test_ec2.py +++ b/tests/providers/amazon/aws/operators/test_ec2.py @@ -37,11 +37,12 @@ def _get_image_id(cls, hook): ec2_client = conn.meta.client except AttributeError: ec2_client = conn - + # We need an existing AMI Image ID otherwise `moto` will raise DeprecationWarning. images = ec2_client.describe_images()["Images"] return images[0]["ImageId"] + class TestEC2CreateInstanceOperator(BaseEc2TestClass): def test_init(self): ec2_operator = EC2CreateInstanceOperator( @@ -65,14 +66,14 @@ def test_create_intance(self): ) instance_id = create_instance.execute(None) - assert ec2_hook.get_instance_state(instance_id=instance_id) == "running" + assert ec2_hook.get_instance_state(instance_id=instance_id[0]) == "running" class TestEC2TerminateInstanceOperator(BaseEc2TestClass): def test_init(self): ec2_operator = EC2TerminateInstanceOperator( task_id="test_terminate_instance", - instance_id="test_image_id", + instance_ids="test_image_id", ) assert ec2_operator.task_id == "test_terminate_instance" @@ -89,14 +90,14 @@ def test_terminate_intance(self): ) instance_id = create_instance.execute(None) - assert ec2_hook.get_instance_state(instance_id=instance_id) == "running" + assert ec2_hook.get_instance_state(instance_id=instance_id[0]) == "running" terminate_instance = EC2TerminateInstanceOperator( - task_id="test_terminate_instance", instance_id=instance_id + task_id="test_terminate_instance", instance_ids=instance_id ) terminate_instance.execute(None) - assert ec2_hook.get_instance_state(instance_id=instance_id) == "terminated" + assert ec2_hook.get_instance_state(instance_id=instance_id[0]) == "terminated" class TestEC2StartInstanceOperator(BaseEc2TestClass): @@ -127,11 +128,11 @@ def test_start_instance(self): # start instance start_test = EC2StartInstanceOperator( task_id="start_test", - instance_id=instance_id, + instance_id=instance_id[0], ) start_test.execute(None) # assert instance state is running - assert ec2_hook.get_instance_state(instance_id=instance_id) == "running" + assert ec2_hook.get_instance_state(instance_id=instance_id[0]) == "running" class TestEC2StopInstanceOperator(BaseEc2TestClass): @@ -162,9 +163,8 @@ def test_stop_instance(self): # stop instance stop_test = EC2StopInstanceOperator( task_id="stop_test", - instance_id=instance_id, + instance_id=instance_id[0], ) stop_test.execute(None) # assert instance state is running - assert ec2_hook.get_instance_state(instance_id=instance_id) == "stopped" - + assert ec2_hook.get_instance_state(instance_id=instance_id[0]) == "stopped" diff --git a/tests/system/providers/amazon/aws/example_ec2.py b/tests/system/providers/amazon/aws/example_ec2.py index 04e46b559e30e..51aedd604899b 100644 --- a/tests/system/providers/amazon/aws/example_ec2.py +++ b/tests/system/providers/amazon/aws/example_ec2.py @@ -72,6 +72,11 @@ def delete_key_pair(key_pair_id: str): boto3.client("ec2").delete_key_pair(KeyName=key_pair_id) +@task +def parse_response(instance_ids: list): + return instance_ids[0] + + with DAG( dag_id=DAG_ID, schedule="@once", @@ -109,11 +114,11 @@ def delete_key_pair(key_pair_id: str): ) # [END howto_operator_ec2_create_instance] create_instance.wait_for_completion = True - + instance_id = parse_response(create_instance.output) # [START howto_operator_ec2_stop_instance] stop_instance = EC2StopInstanceOperator( task_id="stop_instance", - instance_id=create_instance.output, + instance_id=instance_id, ) # [END howto_operator_ec2_stop_instance] stop_instance.trigger_rule = TriggerRule.ALL_DONE @@ -121,14 +126,14 @@ def delete_key_pair(key_pair_id: str): # [START howto_operator_ec2_start_instance] start_instance = EC2StartInstanceOperator( task_id="start_instance", - instance_id=create_instance.output, + instance_id=instance_id, ) # [END howto_operator_ec2_start_instance] # [START howto_sensor_ec2_instance_state] await_instance = EC2InstanceStateSensor( task_id="await_instance", - instance_id=create_instance.output, + instance_id=instance_id, target_state="running", ) # [END howto_sensor_ec2_instance_state] @@ -136,7 +141,7 @@ def delete_key_pair(key_pair_id: str): # [START howto_operator_ec2_terminate_instance] terminate_instance = EC2TerminateInstanceOperator( task_id="terminate_instance", - instance_id=create_instance.output, + instance_ids=instance_id, wait_for_completion=True, ) # [END howto_operator_ec2_terminate_instance] @@ -147,6 +152,7 @@ def delete_key_pair(key_pair_id: str): key_name, # TEST BODY create_instance, + instance_id, stop_instance, start_instance, await_instance, From bb9095ed8c6d96a1dc9c5378dc700cae3aa58cd7 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Fri, 17 Feb 2023 16:09:14 -0800 Subject: [PATCH 4/5] Update doc strings to add that the operators can create/terminate multiple instances Add tests for creating/terminating multiple instances --- airflow/providers/amazon/aws/operators/ec2.py | 4 +- .../amazon/aws/operators/test_ec2.py | 41 ++++++++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/ec2.py b/airflow/providers/amazon/aws/operators/ec2.py index b4c12dd855e63..03243cc7aecbd 100644 --- a/airflow/providers/amazon/aws/operators/ec2.py +++ b/airflow/providers/amazon/aws/operators/ec2.py @@ -120,7 +120,7 @@ def execute(self, context: Context): class EC2CreateInstanceOperator(BaseOperator): """ - Create and start an EC2 Instance using boto3 + Create and start a specified number of EC2 Instances using boto3 .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -201,7 +201,7 @@ def execute(self, context: Context): class EC2TerminateInstanceOperator(BaseOperator): """ - Terminate an EC2 Instance using boto3 + Terminate EC2 Instances using boto3 .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/tests/providers/amazon/aws/operators/test_ec2.py b/tests/providers/amazon/aws/operators/test_ec2.py index 4c78667c35de5..adf3ffeb91cc0 100644 --- a/tests/providers/amazon/aws/operators/test_ec2.py +++ b/tests/providers/amazon/aws/operators/test_ec2.py @@ -58,7 +58,7 @@ def test_init(self): assert ec2_operator.poll_interval == 20 @mock_ec2 - def test_create_intance(self): + def test_create_instance(self): ec2_hook = EC2Hook() create_instance = EC2CreateInstanceOperator( image_id=self._get_image_id(ec2_hook), @@ -68,6 +68,21 @@ def test_create_intance(self): assert ec2_hook.get_instance_state(instance_id=instance_id[0]) == "running" + @mock_ec2 + def test_create_multiple_instances(self): + ec2_hook = EC2Hook() + create_instances = EC2CreateInstanceOperator( + task_id="test_create_multiple_instances", + image_id=self._get_image_id(hook=ec2_hook), + min_count=5, + max_count=5, + ) + instance_ids = create_instances.execute(None) + assert len(instance_ids) == 5 + + for id in instance_ids: + assert ec2_hook.get_instance_state(instance_id=id) == "running" + class TestEC2TerminateInstanceOperator(BaseEc2TestClass): def test_init(self): @@ -81,7 +96,7 @@ def test_init(self): assert ec2_operator.poll_interval == 20 @mock_ec2 - def test_terminate_intance(self): + def test_terminate_instance(self): ec2_hook = EC2Hook() create_instance = EC2CreateInstanceOperator( @@ -99,6 +114,28 @@ def test_terminate_intance(self): assert ec2_hook.get_instance_state(instance_id=instance_id[0]) == "terminated" + @mock_ec2 + def test_terminate_multiple_instances(self): + ec2_hook = EC2Hook() + create_instances = EC2CreateInstanceOperator( + task_id="test_create_multiple_instances", + image_id=self._get_image_id(hook=ec2_hook), + min_count=5, + max_count=5, + ) + instance_ids = create_instances.execute(None) + assert len(instance_ids) == 5 + + for id in instance_ids: + assert ec2_hook.get_instance_state(instance_id=id) == "running" + + terminate_instance = EC2TerminateInstanceOperator( + task_id="test_terminate_instance", instance_ids=instance_ids + ) + terminate_instance.execute(None) + for id in instance_ids: + assert ec2_hook.get_instance_state(instance_id=id) == "terminated" + class TestEC2StartInstanceOperator(BaseEc2TestClass): def test_init(self): From 1a4e537d230c7af39cc05c08594bb1ba4668eb28 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Sun, 26 Feb 2023 19:23:26 -0800 Subject: [PATCH 5/5] Fix system test so it passes Fix doc string on EC2TerminateInstanceOperator --- airflow/providers/amazon/aws/operators/ec2.py | 2 +- docs/apache-airflow-providers-amazon/operators/ec2.rst | 1 + tests/system/providers/amazon/aws/example_ec2.py | 6 ++++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/ec2.py b/airflow/providers/amazon/aws/operators/ec2.py index 03243cc7aecbd..5f6b76ce154f9 100644 --- a/airflow/providers/amazon/aws/operators/ec2.py +++ b/airflow/providers/amazon/aws/operators/ec2.py @@ -207,7 +207,7 @@ class EC2TerminateInstanceOperator(BaseOperator): For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EC2TerminateInstanceOperator` - :instance_id: ID of the instance to be terminated. + :param instance_id: ID of the instance to be terminated. :param aws_conn_id: AWS connection to use :param region_name: AWS region name associated with the client. :param poll_interval: Number of seconds to wait before attempting to diff --git a/docs/apache-airflow-providers-amazon/operators/ec2.rst b/docs/apache-airflow-providers-amazon/operators/ec2.rst index cfe125926f6ac..5796c514e0238 100644 --- a/docs/apache-airflow-providers-amazon/operators/ec2.rst +++ b/docs/apache-airflow-providers-amazon/operators/ec2.rst @@ -85,6 +85,7 @@ To terminate an Amazon EC2 instance you can use :dedent: 4 :start-after: [START howto_operator_ec2_terminate_instance] :end-before: [END howto_operator_ec2_terminate_instance] + Sensors ------- diff --git a/tests/system/providers/amazon/aws/example_ec2.py b/tests/system/providers/amazon/aws/example_ec2.py index 51aedd604899b..1e58933dd007b 100644 --- a/tests/system/providers/amazon/aws/example_ec2.py +++ b/tests/system/providers/amazon/aws/example_ec2.py @@ -39,7 +39,8 @@ sys_test_context_task = SystemTestContextBuilder().build() -def _get_latest_ami_id(): +@task +def get_latest_ami_id(): """Returns the AMI ID of the most recently-created Amazon Linux image""" # Amazon is retiring AL2 in 2023 and replacing it with Amazon Linux 2022. @@ -88,7 +89,7 @@ def parse_response(instance_ids: list): env_id = test_context[ENV_ID_KEY] instance_name = f"{env_id}-instance" key_name = create_key_pair(key_name=f"{env_id}_key_pair") - image_id = _get_latest_ami_id() + image_id = get_latest_ami_id() config = { "InstanceType": "t2.micro", @@ -150,6 +151,7 @@ def parse_response(instance_ids: list): # TEST SETUP test_context, key_name, + image_id, # TEST BODY create_instance, instance_id,