Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions providers/src/airflow/providers/amazon/aws/hooks/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import warnings
from typing import Any

import tenacity
from botocore.exceptions import ClientError
from tenacity import retry_if_exception, stop_after_attempt, wait_fixed

from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
Expand Down Expand Up @@ -311,6 +313,15 @@ def cancel_running_jobs(
return count


def is_connection_being_updated_exception(exception: BaseException) -> bool:
return (
isinstance(exception, ClientError)
and exception.response["Error"]["Code"] == "ValidationException"
and "is not reachable as its connection is currently being updated"
in exception.response["Error"]["Message"]
)


class EmrContainerHook(AwsBaseHook):
"""
Interact with Amazon EMR Containers (Amazon EMR on EKS).
Expand Down Expand Up @@ -348,6 +359,15 @@ def __init__(self, *args: Any, virtual_cluster_id: str | None = None, **kwargs:
super().__init__(client_type="emr-containers", *args, **kwargs) # type: ignore
self.virtual_cluster_id = virtual_cluster_id

# Retry this method when the ``create_virtual_cluster`` raises
# "Cluster XXX is not reachable as its connection is currently being updated".
# Even though the EKS cluster status is ``ACTIVE``, ``create_virtual_cluster`` can raise this error.
# Retrying is the only option.
@tenacity.retry(
retry=retry_if_exception(is_connection_being_updated_exception),
stop=stop_after_attempt(5),
wait=wait_fixed(10),
)
def create_emr_on_eks_cluster(
self,
virtual_cluster_name: str,
Expand Down
1 change: 0 additions & 1 deletion providers/tests/system/amazon/aws/example_emr_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ def delete_virtual_cluster(virtual_cluster_id):
# The launch template enforces IMDSv2 and is required for internal
# compliance when running these system tests on AWS infrastructure.
create_nodegroup_kwargs={"launchTemplate": {"name": launch_template_name}},
create_cluster_kwargs={"version": "1.32"},
)

await_create_nodegroup = EksNodegroupStateSensor(
Expand Down