From fb8edfe87016541204e1f0883cb9bea6e02d6dfb Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 4 Feb 2025 15:58:25 +0100 Subject: [PATCH 01/11] Raise custom exception --- .../airflow/providers/edge/cli/api_client.py | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/providers/edge/src/airflow/providers/edge/cli/api_client.py b/providers/edge/src/airflow/providers/edge/cli/api_client.py index 60c230dada1ca..5c5315e100efa 100644 --- a/providers/edge/src/airflow/providers/edge/cli/api_client.py +++ b/providers/edge/src/airflow/providers/edge/cli/api_client.py @@ -39,10 +39,9 @@ WorkerStateBody, ) from airflow.utils.state import TaskInstanceState # noqa: TC001 - if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey - from airflow.providers.edge.models.edge_worker import EdgeWorkerState + from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException logger = logging.getLogger(__name__) @@ -92,13 +91,18 @@ def worker_register( hostname: str, state: EdgeWorkerState, queues: list[str] | None, sysinfo: dict ) -> datetime: """Register worker with the Edge API.""" - result = _make_generic_request( - "POST", - f"worker/{quote(hostname)}", - WorkerStateBody(state=state, jobs_active=0, queues=queues, sysinfo=sysinfo).model_dump_json( - exclude_unset=True - ), - ) + try: + result = _make_generic_request( + "POST", + f"worker/{quote(hostname)}", + WorkerStateBody(state=state, jobs_active=0, queues=queues, sysinfo=sysinfo).model_dump_json( + exclude_unset=True + ), + ) + except requests.HTTPError as e: + if e.response.status_code == 400: + raise EdgeWorkerVersionException + raise e return datetime.fromisoformat(result) @@ -106,13 +110,18 @@ def worker_set_state( hostname: str, state: EdgeWorkerState, jobs_active: int, queues: list[str] | None, sysinfo: dict ) -> WorkerSetStateReturn: """Update the state of the worker in the central site and thereby implicitly heartbeat.""" - result = _make_generic_request( - "PATCH", - f"worker/{quote(hostname)}", - WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo).model_dump_json( - exclude_unset=True - ), - ) + try: + result = _make_generic_request( + "PATCH", + f"worker/{quote(hostname)}", + WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo).model_dump_json( + exclude_unset=True + ), + ) + except requests.HTTPError as e: + if e.response.status_code == 400: + raise EdgeWorkerVersionException + raise e return WorkerSetStateReturn(**result) From 5fa717415fe12eba4a8423d4d25731a7ffcbaa45 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 4 Feb 2025 15:59:04 +0100 Subject: [PATCH 02/11] fix static checks --- .../edge/src/airflow/providers/edge/cli/api_client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/providers/edge/src/airflow/providers/edge/cli/api_client.py b/providers/edge/src/airflow/providers/edge/cli/api_client.py index 5c5315e100efa..fcbac8a7564a6 100644 --- a/providers/edge/src/airflow/providers/edge/cli/api_client.py +++ b/providers/edge/src/airflow/providers/edge/cli/api_client.py @@ -39,6 +39,7 @@ WorkerStateBody, ) from airflow.utils.state import TaskInstanceState # noqa: TC001 + if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException @@ -114,9 +115,9 @@ def worker_set_state( result = _make_generic_request( "PATCH", f"worker/{quote(hostname)}", - WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo).model_dump_json( - exclude_unset=True - ), + WorkerStateBody( + state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo + ).model_dump_json(exclude_unset=True), ) except requests.HTTPError as e: if e.response.status_code == 400: From 467ea93fd455750624c9ded14b8f2e7ef5b9f942 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 4 Feb 2025 15:58:25 +0100 Subject: [PATCH 03/11] Raise custom exception --- .../airflow/providers/edge/cli/api_client.py | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/providers/edge/src/airflow/providers/edge/cli/api_client.py b/providers/edge/src/airflow/providers/edge/cli/api_client.py index 60c230dada1ca..5c5315e100efa 100644 --- a/providers/edge/src/airflow/providers/edge/cli/api_client.py +++ b/providers/edge/src/airflow/providers/edge/cli/api_client.py @@ -39,10 +39,9 @@ WorkerStateBody, ) from airflow.utils.state import TaskInstanceState # noqa: TC001 - if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey - from airflow.providers.edge.models.edge_worker import EdgeWorkerState + from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException logger = logging.getLogger(__name__) @@ -92,13 +91,18 @@ def worker_register( hostname: str, state: EdgeWorkerState, queues: list[str] | None, sysinfo: dict ) -> datetime: """Register worker with the Edge API.""" - result = _make_generic_request( - "POST", - f"worker/{quote(hostname)}", - WorkerStateBody(state=state, jobs_active=0, queues=queues, sysinfo=sysinfo).model_dump_json( - exclude_unset=True - ), - ) + try: + result = _make_generic_request( + "POST", + f"worker/{quote(hostname)}", + WorkerStateBody(state=state, jobs_active=0, queues=queues, sysinfo=sysinfo).model_dump_json( + exclude_unset=True + ), + ) + except requests.HTTPError as e: + if e.response.status_code == 400: + raise EdgeWorkerVersionException + raise e return datetime.fromisoformat(result) @@ -106,13 +110,18 @@ def worker_set_state( hostname: str, state: EdgeWorkerState, jobs_active: int, queues: list[str] | None, sysinfo: dict ) -> WorkerSetStateReturn: """Update the state of the worker in the central site and thereby implicitly heartbeat.""" - result = _make_generic_request( - "PATCH", - f"worker/{quote(hostname)}", - WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo).model_dump_json( - exclude_unset=True - ), - ) + try: + result = _make_generic_request( + "PATCH", + f"worker/{quote(hostname)}", + WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo).model_dump_json( + exclude_unset=True + ), + ) + except requests.HTTPError as e: + if e.response.status_code == 400: + raise EdgeWorkerVersionException + raise e return WorkerSetStateReturn(**result) From e5581f96087dfe31bbf68de91f39e109224cd0f3 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 4 Feb 2025 15:59:04 +0100 Subject: [PATCH 04/11] fix static checks --- .../edge/src/airflow/providers/edge/cli/api_client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/providers/edge/src/airflow/providers/edge/cli/api_client.py b/providers/edge/src/airflow/providers/edge/cli/api_client.py index 5c5315e100efa..fcbac8a7564a6 100644 --- a/providers/edge/src/airflow/providers/edge/cli/api_client.py +++ b/providers/edge/src/airflow/providers/edge/cli/api_client.py @@ -39,6 +39,7 @@ WorkerStateBody, ) from airflow.utils.state import TaskInstanceState # noqa: TC001 + if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException @@ -114,9 +115,9 @@ def worker_set_state( result = _make_generic_request( "PATCH", f"worker/{quote(hostname)}", - WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo).model_dump_json( - exclude_unset=True - ), + WorkerStateBody( + state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo + ).model_dump_json(exclude_unset=True), ) except requests.HTTPError as e: if e.response.status_code == 400: From 4cfe99f7516f1e1f127cdeee1b91dab5cd02766b Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 4 Feb 2025 15:58:25 +0100 Subject: [PATCH 05/11] Raise custom exception --- .../airflow/providers/edge/cli/api_client.py | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/providers/edge/src/airflow/providers/edge/cli/api_client.py b/providers/edge/src/airflow/providers/edge/cli/api_client.py index 60c230dada1ca..5c5315e100efa 100644 --- a/providers/edge/src/airflow/providers/edge/cli/api_client.py +++ b/providers/edge/src/airflow/providers/edge/cli/api_client.py @@ -39,10 +39,9 @@ WorkerStateBody, ) from airflow.utils.state import TaskInstanceState # noqa: TC001 - if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey - from airflow.providers.edge.models.edge_worker import EdgeWorkerState + from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException logger = logging.getLogger(__name__) @@ -92,13 +91,18 @@ def worker_register( hostname: str, state: EdgeWorkerState, queues: list[str] | None, sysinfo: dict ) -> datetime: """Register worker with the Edge API.""" - result = _make_generic_request( - "POST", - f"worker/{quote(hostname)}", - WorkerStateBody(state=state, jobs_active=0, queues=queues, sysinfo=sysinfo).model_dump_json( - exclude_unset=True - ), - ) + try: + result = _make_generic_request( + "POST", + f"worker/{quote(hostname)}", + WorkerStateBody(state=state, jobs_active=0, queues=queues, sysinfo=sysinfo).model_dump_json( + exclude_unset=True + ), + ) + except requests.HTTPError as e: + if e.response.status_code == 400: + raise EdgeWorkerVersionException + raise e return datetime.fromisoformat(result) @@ -106,13 +110,18 @@ def worker_set_state( hostname: str, state: EdgeWorkerState, jobs_active: int, queues: list[str] | None, sysinfo: dict ) -> WorkerSetStateReturn: """Update the state of the worker in the central site and thereby implicitly heartbeat.""" - result = _make_generic_request( - "PATCH", - f"worker/{quote(hostname)}", - WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo).model_dump_json( - exclude_unset=True - ), - ) + try: + result = _make_generic_request( + "PATCH", + f"worker/{quote(hostname)}", + WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo).model_dump_json( + exclude_unset=True + ), + ) + except requests.HTTPError as e: + if e.response.status_code == 400: + raise EdgeWorkerVersionException + raise e return WorkerSetStateReturn(**result) From dfca921aef71f6766145e0b3e87b4b0f16d31321 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 4 Feb 2025 15:59:04 +0100 Subject: [PATCH 06/11] fix static checks --- .../edge/src/airflow/providers/edge/cli/api_client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/providers/edge/src/airflow/providers/edge/cli/api_client.py b/providers/edge/src/airflow/providers/edge/cli/api_client.py index 5c5315e100efa..fcbac8a7564a6 100644 --- a/providers/edge/src/airflow/providers/edge/cli/api_client.py +++ b/providers/edge/src/airflow/providers/edge/cli/api_client.py @@ -39,6 +39,7 @@ WorkerStateBody, ) from airflow.utils.state import TaskInstanceState # noqa: TC001 + if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException @@ -114,9 +115,9 @@ def worker_set_state( result = _make_generic_request( "PATCH", f"worker/{quote(hostname)}", - WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo).model_dump_json( - exclude_unset=True - ), + WorkerStateBody( + state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo + ).model_dump_json(exclude_unset=True), ) except requests.HTTPError as e: if e.response.status_code == 400: From 67ef5d5728ee8dc5ea9525dcd7705db6c02bc964 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 4 Feb 2025 15:58:25 +0100 Subject: [PATCH 07/11] Raise custom exception --- providers/edge/src/airflow/providers/edge/cli/api_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/edge/src/airflow/providers/edge/cli/api_client.py b/providers/edge/src/airflow/providers/edge/cli/api_client.py index fcbac8a7564a6..e8c3f898ac20c 100644 --- a/providers/edge/src/airflow/providers/edge/cli/api_client.py +++ b/providers/edge/src/airflow/providers/edge/cli/api_client.py @@ -39,7 +39,6 @@ WorkerStateBody, ) from airflow.utils.state import TaskInstanceState # noqa: TC001 - if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException From a1d6228f884f813fed0395e512c697592c06b5aa Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Tue, 4 Feb 2025 15:59:04 +0100 Subject: [PATCH 08/11] fix static checks --- providers/edge/src/airflow/providers/edge/cli/api_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/edge/src/airflow/providers/edge/cli/api_client.py b/providers/edge/src/airflow/providers/edge/cli/api_client.py index e8c3f898ac20c..fcbac8a7564a6 100644 --- a/providers/edge/src/airflow/providers/edge/cli/api_client.py +++ b/providers/edge/src/airflow/providers/edge/cli/api_client.py @@ -39,6 +39,7 @@ WorkerStateBody, ) from airflow.utils.state import TaskInstanceState # noqa: TC001 + if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException From dc28390fb6585460c717dc98e357d4767c1590e5 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Wed, 5 Feb 2025 09:03:57 +0100 Subject: [PATCH 09/11] add exception messgae --- .../edge/src/airflow/providers/edge/cli/api_client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/providers/edge/src/airflow/providers/edge/cli/api_client.py b/providers/edge/src/airflow/providers/edge/cli/api_client.py index fcbac8a7564a6..c19504787d75c 100644 --- a/providers/edge/src/airflow/providers/edge/cli/api_client.py +++ b/providers/edge/src/airflow/providers/edge/cli/api_client.py @@ -30,6 +30,7 @@ from tenacity import before_log, wait_random_exponential from airflow.configuration import conf +from airflow.providers.edge.models.edge_worker import EdgeWorkerVersionException from airflow.providers.edge.worker_api.auth import jwt_signer from airflow.providers.edge.worker_api.datamodels import ( EdgeJobFetched, @@ -42,7 +43,7 @@ if TYPE_CHECKING: from airflow.models.taskinstancekey import TaskInstanceKey - from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException + from airflow.providers.edge.models.edge_worker import EdgeWorkerState logger = logging.getLogger(__name__) @@ -102,7 +103,7 @@ def worker_register( ) except requests.HTTPError as e: if e.response.status_code == 400: - raise EdgeWorkerVersionException + raise EdgeWorkerVersionException(str(e)) raise e return datetime.fromisoformat(result) @@ -121,7 +122,7 @@ def worker_set_state( ) except requests.HTTPError as e: if e.response.status_code == 400: - raise EdgeWorkerVersionException + raise EdgeWorkerVersionException(str(e)) raise e return WorkerSetStateReturn(**result) From a0a979db8ea2d9a09bfcc6295fdb7297b965d97a Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Wed, 5 Feb 2025 09:10:42 +0100 Subject: [PATCH 10/11] Increment version --- providers/edge/README.rst | 6 +++--- providers/edge/docs/changelog.rst | 8 ++++++++ providers/edge/provider.yaml | 2 +- providers/edge/pyproject.toml | 6 +++--- providers/edge/src/airflow/providers/edge/__init__.py | 2 +- .../edge/src/airflow/providers/edge/get_provider_info.py | 2 +- 6 files changed, 17 insertions(+), 9 deletions(-) diff --git a/providers/edge/README.rst b/providers/edge/README.rst index 9bcb8112dc3a7..c10204b4749f0 100644 --- a/providers/edge/README.rst +++ b/providers/edge/README.rst @@ -24,7 +24,7 @@ Package ``apache-airflow-providers-edge`` -Release: ``0.13.0pre0`` +Release: ``0.13.1pre0`` Handle edge workers on remote sites via HTTP(s) connection and orchestrates work over distributed sites @@ -37,7 +37,7 @@ This is a provider package for ``edge`` provider. All classes for this provider are in ``airflow.providers.edge`` python package. You can find package information and changelog for the provider -in the `documentation `_. +in the `documentation `_. Installation ------------ @@ -60,4 +60,4 @@ PIP package Version required ================== ================== The changelog for the provider package can be found in the -`changelog `_. +`changelog `_. diff --git a/providers/edge/docs/changelog.rst b/providers/edge/docs/changelog.rst index 87759f00b20b6..83eabd4ff1829 100644 --- a/providers/edge/docs/changelog.rst +++ b/providers/edge/docs/changelog.rst @@ -27,6 +27,14 @@ Changelog --------- +0.13.1pre0 +.......... + +Misc +~~~~ + +* ``EdgeWorkerVersionException is raised if http 400 is responded on set_state.`` + 0.13.0pre0 .......... diff --git a/providers/edge/provider.yaml b/providers/edge/provider.yaml index 09ce196e17823..7bb5ce88ea967 100644 --- a/providers/edge/provider.yaml +++ b/providers/edge/provider.yaml @@ -25,7 +25,7 @@ source-date-epoch: 1737371680 # note that those versions are maintained by release manager - do not update them manually versions: - - 0.13.0pre0 + - 0.13.1pre0 plugins: - name: edge_executor diff --git a/providers/edge/pyproject.toml b/providers/edge/pyproject.toml index 0e28b816ffd37..759e8c73a9cad 100644 --- a/providers/edge/pyproject.toml +++ b/providers/edge/pyproject.toml @@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi" [project] name = "apache-airflow-providers-edge" -version = "0.13.0pre0" +version = "0.13.1pre0" description = "Provider package apache-airflow-providers-edge for Apache Airflow" readme = "README.rst" authors = [ @@ -62,8 +62,8 @@ dependencies = [ ] [project.urls] -"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.13.0pre0" -"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.13.0pre0/changelog.html" +"Documentation" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.13.1pre0" +"Changelog" = "https://airflow.apache.org/docs/apache-airflow-providers-edge/0.13.1pre0/changelog.html" "Bug Tracker" = "https://github.com/apache/airflow/issues" "Source Code" = "https://github.com/apache/airflow" "Slack Chat" = "https://s.apache.org/airflow-slack" diff --git a/providers/edge/src/airflow/providers/edge/__init__.py b/providers/edge/src/airflow/providers/edge/__init__.py index ac0be0ebf9e85..749d124f95886 100644 --- a/providers/edge/src/airflow/providers/edge/__init__.py +++ b/providers/edge/src/airflow/providers/edge/__init__.py @@ -29,7 +29,7 @@ __all__ = ["__version__"] -__version__ = "0.13.0pre0" +__version__ = "0.13.1pre0" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( "2.10.0" diff --git a/providers/edge/src/airflow/providers/edge/get_provider_info.py b/providers/edge/src/airflow/providers/edge/get_provider_info.py index 66fa378ef7811..9f972d4371ed0 100644 --- a/providers/edge/src/airflow/providers/edge/get_provider_info.py +++ b/providers/edge/src/airflow/providers/edge/get_provider_info.py @@ -28,7 +28,7 @@ def get_provider_info(): "description": "Handle edge workers on remote sites via HTTP(s) connection and orchestrates work over distributed sites\n", "state": "not-ready", "source-date-epoch": 1737371680, - "versions": ["0.13.0pre0"], + "versions": ["0.13.1pre0"], "plugins": [ { "name": "edge_executor", From b1e41bd33508daad5133b18dc25010af7bcd9854 Mon Sep 17 00:00:00 2001 From: "Majoros Donat (XC-DX/EET2-Bp)" Date: Wed, 5 Feb 2025 09:48:44 +0100 Subject: [PATCH 11/11] change misc to fix --- providers/edge/docs/changelog.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/edge/docs/changelog.rst b/providers/edge/docs/changelog.rst index 83eabd4ff1829..393239fb395f8 100644 --- a/providers/edge/docs/changelog.rst +++ b/providers/edge/docs/changelog.rst @@ -30,8 +30,8 @@ Changelog 0.13.1pre0 .......... -Misc -~~~~ +Fix +~~~ * ``EdgeWorkerVersionException is raised if http 400 is responded on set_state.``