From f7aabeddb4803f32dfcb50910d3b9a1aa305824d Mon Sep 17 00:00:00 2001 From: vincbeck Date: Wed, 5 Feb 2025 18:34:19 -0500 Subject: [PATCH] Update `create_emr_on_eks_cluster` method to try when "cluster is not reachable as its connection is currently being updated" --- .../airflow/providers/amazon/aws/hooks/emr.py | 20 +++++++++++++++++++ .../system/amazon/aws/example_emr_eks.py | 1 - 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/providers/src/airflow/providers/amazon/aws/hooks/emr.py b/providers/src/airflow/providers/amazon/aws/hooks/emr.py index 17ecaae2fd7e1..bd0b7d2039df3 100644 --- a/providers/src/airflow/providers/amazon/aws/hooks/emr.py +++ b/providers/src/airflow/providers/amazon/aws/hooks/emr.py @@ -22,7 +22,9 @@ import warnings from typing import Any +import tenacity from botocore.exceptions import ClientError +from tenacity import retry_if_exception, stop_after_attempt, wait_fixed from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook @@ -311,6 +313,15 @@ def cancel_running_jobs( return count +def is_connection_being_updated_exception(exception: BaseException) -> bool: + return ( + isinstance(exception, ClientError) + and exception.response["Error"]["Code"] == "ValidationException" + and "is not reachable as its connection is currently being updated" + in exception.response["Error"]["Message"] + ) + + class EmrContainerHook(AwsBaseHook): """ Interact with Amazon EMR Containers (Amazon EMR on EKS). @@ -348,6 +359,15 @@ def __init__(self, *args: Any, virtual_cluster_id: str | None = None, **kwargs: super().__init__(client_type="emr-containers", *args, **kwargs) # type: ignore self.virtual_cluster_id = virtual_cluster_id + # Retry this method when the ``create_virtual_cluster`` raises + # "Cluster XXX is not reachable as its connection is currently being updated". + # Even though the EKS cluster status is ``ACTIVE``, ``create_virtual_cluster`` can raise this error. + # Retrying is the only option. + @tenacity.retry( + retry=retry_if_exception(is_connection_being_updated_exception), + stop=stop_after_attempt(5), + wait=wait_fixed(10), + ) def create_emr_on_eks_cluster( self, virtual_cluster_name: str, diff --git a/providers/tests/system/amazon/aws/example_emr_eks.py b/providers/tests/system/amazon/aws/example_emr_eks.py index 2f2b59a662351..cafaf09ed7473 100644 --- a/providers/tests/system/amazon/aws/example_emr_eks.py +++ b/providers/tests/system/amazon/aws/example_emr_eks.py @@ -234,7 +234,6 @@ def delete_virtual_cluster(virtual_cluster_id): # The launch template enforces IMDSv2 and is required for internal # compliance when running these system tests on AWS infrastructure. create_nodegroup_kwargs={"launchTemplate": {"name": launch_template_name}}, - create_cluster_kwargs={"version": "1.32"}, ) await_create_nodegroup = EksNodegroupStateSensor(