From 71118ac7b8f2df19572f0f39ce5ec51b1e3496f4 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 29 Jun 2025 23:02:39 +0200 Subject: [PATCH 1/2] Migrate Yandex provider to TaskSDK version compat standard --- .../src/airflow/providers/yandex/links/yq.py | 20 ++----- .../providers/yandex/operators/dataproc.py | 8 +-- .../airflow/providers/yandex/operators/yq.py | 8 +-- .../providers/yandex/version_compat.py | 54 +++++++++++++++++++ .../yandex/tests/unit/yandex/links/test_yq.py | 6 +-- 5 files changed, 63 insertions(+), 33 deletions(-) create mode 100644 providers/yandex/src/airflow/providers/yandex/version_compat.py diff --git a/providers/yandex/src/airflow/providers/yandex/links/yq.py b/providers/yandex/src/airflow/providers/yandex/links/yq.py index 49a42473ca446..2f7ed45fb366e 100644 --- a/providers/yandex/src/airflow/providers/yandex/links/yq.py +++ b/providers/yandex/src/airflow/providers/yandex/links/yq.py @@ -18,24 +18,12 @@ from typing import TYPE_CHECKING +from airflow.providers.yandex.version_compat import BaseOperatorLink, XCom + if TYPE_CHECKING: - from airflow.models import BaseOperator from airflow.models.taskinstancekey import TaskInstanceKey + from airflow.providers.yandex.version_compat import BaseOperator, Context - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context - -from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseOperatorLink - from airflow.sdk.execution_time.xcom import XCom -else: - from airflow.models import XCom # type: ignore[no-redef] - from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef] XCOM_WEBLINK_KEY = "web_link" @@ -50,4 +38,4 @@ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey): @staticmethod def persist(context: Context, task_instance: BaseOperator, web_link: str) -> None: - task_instance.xcom_push(context, key=XCOM_WEBLINK_KEY, value=web_link) + context["ti"].xcom_push(key=XCOM_WEBLINK_KEY, value=web_link) diff --git a/providers/yandex/src/airflow/providers/yandex/operators/dataproc.py b/providers/yandex/src/airflow/providers/yandex/operators/dataproc.py index e389f85abe583..a5418024541e8 100644 --- a/providers/yandex/src/airflow/providers/yandex/operators/dataproc.py +++ b/providers/yandex/src/airflow/providers/yandex/operators/dataproc.py @@ -20,15 +20,11 @@ from dataclasses import dataclass from typing import TYPE_CHECKING -from airflow.models import BaseOperator from airflow.providers.yandex.hooks.dataproc import DataprocHook +from airflow.providers.yandex.version_compat import BaseOperator if TYPE_CHECKING: - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.providers.yandex.version_compat import Context @dataclass diff --git a/providers/yandex/src/airflow/providers/yandex/operators/yq.py b/providers/yandex/src/airflow/providers/yandex/operators/yq.py index da3890f4adfec..d5f6c4901f4d9 100644 --- a/providers/yandex/src/airflow/providers/yandex/operators/yq.py +++ b/providers/yandex/src/airflow/providers/yandex/operators/yq.py @@ -20,16 +20,12 @@ from functools import cached_property from typing import TYPE_CHECKING, Any -from airflow.models import BaseOperator from airflow.providers.yandex.hooks.yq import YQHook from airflow.providers.yandex.links.yq import YQLink +from airflow.providers.yandex.version_compat import BaseOperator if TYPE_CHECKING: - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context + from airflow.providers.yandex.version_compat import Context class YQExecuteQueryOperator(BaseOperator): diff --git a/providers/yandex/src/airflow/providers/yandex/version_compat.py b/providers/yandex/src/airflow/providers/yandex/version_compat.py new file mode 100644 index 0000000000000..082e4a9959eac --- /dev/null +++ b/providers/yandex/src/airflow/providers/yandex/version_compat.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) +AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0) + +# BaseOperator: Use 3.1+ due to xcom_push method missing in SDK BaseOperator 3.0.x +if AIRFLOW_V_3_1_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator + +# Other SDK components: Available since 3.0+ +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperatorLink + from airflow.sdk.definitions.context import Context + from airflow.sdk.execution_time.xcom import XCom +else: + from airflow.models import BaseOperatorLink, XCom + from airflow.utils.context import Context + + +__all__ = ["AIRFLOW_V_3_0_PLUS", "BaseOperator", "BaseOperatorLink", "Context", "XCom"] diff --git a/providers/yandex/tests/unit/yandex/links/test_yq.py b/providers/yandex/tests/unit/yandex/links/test_yq.py index 9bfa0fbede6ff..d9c8ed8ec8c97 100644 --- a/providers/yandex/tests/unit/yandex/links/test_yq.py +++ b/providers/yandex/tests/unit/yandex/links/test_yq.py @@ -22,15 +22,11 @@ from airflow.models.taskinstance import TaskInstance from airflow.providers.yandex.links.yq import YQLink +from airflow.providers.yandex.version_compat import XCom from tests_common.test_utils.mock_operators import MockOperator from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS -if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.execution_time.xcom import XCom -else: - from airflow.models import XCom # type: ignore[no-redef] - yandexcloud = pytest.importorskip("yandexcloud") From 59c83a942ec79d4e3866c1ecccfd229e26a870cc Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 29 Jun 2025 23:46:06 +0200 Subject: [PATCH 2/2] Fix pytest --- providers/yandex/tests/unit/yandex/links/test_yq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/yandex/tests/unit/yandex/links/test_yq.py b/providers/yandex/tests/unit/yandex/links/test_yq.py index d9c8ed8ec8c97..c4d85aaf90fef 100644 --- a/providers/yandex/tests/unit/yandex/links/test_yq.py +++ b/providers/yandex/tests/unit/yandex/links/test_yq.py @@ -42,7 +42,7 @@ def test_persist(): value="g.com", ) else: - ti.xcom_push.assert_called_once_with(key="web_link", value="g.com", execution_date=None) + ti.xcom_push.assert_called_once_with(key="web_link", value="g.com") def test_default_link():