diff --git a/chart/templates/workers/worker-deployment.yaml b/chart/templates/workers/worker-deployment.yaml index 33fb357aae6d2..55e89edfbb04e 100644 --- a/chart/templates/workers/worker-deployment.yaml +++ b/chart/templates/workers/worker-deployment.yaml @@ -22,6 +22,7 @@ ################################# {{- $persistence := .Values.workers.persistence.enabled }} {{- $keda := .Values.workers.keda.enabled }} +{{- $hpa := and .Values.workers.hpa.enabled (not .Values.workers.keda.enabled) }} {{- if or (eq .Values.executor "CeleryExecutor") (eq .Values.executor "CeleryKubernetesExecutor") }} {{- $nodeSelector := or .Values.workers.nodeSelector .Values.nodeSelector }} {{- $affinity := or .Values.workers.affinity .Values.affinity }} @@ -59,7 +60,7 @@ spec: {{- if $persistence }} serviceName: {{ include "airflow.fullname" . }}-worker {{- end }} - {{- if not $keda }} + {{- if and (not $keda) (not $hpa) }} replicas: {{ .Values.workers.replicas }} {{- end }} {{- if $revisionHistoryLimit }} diff --git a/chart/templates/workers/worker-hpa.yaml b/chart/templates/workers/worker-hpa.yaml new file mode 100644 index 0000000000000..e6b1a9ae523ea --- /dev/null +++ b/chart/templates/workers/worker-hpa.yaml @@ -0,0 +1,49 @@ +{{/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/}} + +################################ +## Airflow Worker HPA +################################# +{{- if and (and (not .Values.workers.keda.enabled) .Values.workers.hpa.enabled) (has .Values.executor (list "CeleryExecutor" "CeleryKubernetesExecutor")) }} +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: {{ include "airflow.fullname" . }}-worker + labels: + tier: airflow + component: worker-horizontalpodautoscaler + release: {{ .Release.Name }} + chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" + heritage: {{ .Release.Service }} + deploymentName: {{ .Release.Name }}-worker + {{- if or (.Values.labels) (.Values.workers.labels) }} + {{- mustMerge .Values.workers.labels .Values.labels | toYaml | nindent 4 }} + {{- end }} +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: {{ ternary "StatefulSet" "Deployment" .Values.workers.persistence.enabled }} + name: {{ include "airflow.fullname" . }}-worker + minReplicas: {{ .Values.workers.hpa.minReplicaCount }} + maxReplicas: {{ .Values.workers.hpa.maxReplicaCount }} + metrics: {{- toYaml .Values.workers.hpa.metrics | nindent 4 }} + {{- with .Values.workers.hpa.behavior }} + behavior: {{- toYaml . | nindent 4 }} + {{- end }} +{{- end }} diff --git a/chart/values.schema.json b/chart/values.schema.json index 0d931609c0f2a..dacb7ebfd5023 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -1531,6 +1531,53 @@ } } }, + "hpa": { + "description": "HPA configuration.", + "type": "object", + "additionalProperties": false, + "properties": { + "enabled": { + "description": "Allow HPA autoscaling (KEDA must be disabled).", + "type": "boolean", + "default": false + }, + "minReplicaCount": { + "description": "Minimum number of workers created by KEDA.", + "type": "integer", + "default": 0 + }, + "maxReplicaCount": { + "description": "Maximum number of workers created by KEDA.", + "type": "integer", + "default": 5 + }, + "metrics": { + "description": "Specifications for which to use to calculate the desired replica count.", + "type": "array", + "default": [ + { + "type": "Resource", + "resource": { + "name": "cpu", + "target": { + "type": "Utilization", + "averageUtilization": 80 + } + } + } + ], + "items": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricSpec" + } + }, + "behavior": { + "description": "HorizontalPodAutoscalerBehavior configures the scaling behavior of the target.", + "type": "object", + "default": {}, + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.HorizontalPodAutoscalerBehavior" + } + } + }, "persistence": { "description": "Persistence configuration.", "type": "object", @@ -7422,6 +7469,72 @@ "type": "object", "additionalProperties": false }, + "io.k8s.api.autoscaling.v2beta2.ContainerResourceMetricSource": { + "description": "ContainerResourceMetricSource indicates how to scale on a resource metric known to Kubernetes, as specified in requests and limits, describing each pod in the current scale target (e.g. CPU or memory). The values will be averaged together before being compared to the target. Such metrics are built in to Kubernetes, and have special scaling options on top of those available to normal per-pod metrics using the \"pods\" source. Only one \"target\" type should be set.", + "properties": { + "container": { + "description": "container is the name of the container in the pods of the scaling target", + "type": "string" + }, + "name": { + "description": "name is the name of the resource in question.", + "type": "string" + }, + "target": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricTarget", + "description": "target specifies the target value for the given metric" + } + }, + "required": [ + "name", + "target", + "container" + ], + "type": "object", + "additionalProperties": false + }, + "io.k8s.api.autoscaling.v2beta2.CrossVersionObjectReference": { + "description": "CrossVersionObjectReference contains enough information to let you identify the referred resource.", + "properties": { + "apiVersion": { + "description": "API version of the referent", + "type": "string" + }, + "kind": { + "description": "Kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds\"", + "type": "string" + }, + "name": { + "description": "Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names", + "type": "string" + } + }, + "required": [ + "kind", + "name" + ], + "type": "object", + "additionalProperties": false + }, + "io.k8s.api.autoscaling.v2beta2.ExternalMetricSource": { + "description": "ExternalMetricSource indicates how to scale on a metric not associated with any Kubernetes object (for example length of queue in cloud messaging service, or QPS from loadbalancer running outside of cluster).", + "properties": { + "metric": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricIdentifier", + "description": "metric identifies the target metric by name and selector" + }, + "target": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricTarget", + "description": "target specifies the target value for the given metric" + } + }, + "required": [ + "metric", + "target" + ], + "type": "object", + "additionalProperties": false + }, "io.k8s.api.autoscaling.v2beta2.HPAScalingPolicy": { "description": "HPAScalingPolicy is a single policy which must hold true for a specified past interval.", "properties": { @@ -7486,6 +7599,146 @@ "type": "object", "additionalProperties": false }, + "io.k8s.api.autoscaling.v2beta2.MetricIdentifier": { + "description": "MetricIdentifier defines the name and optionally selector for a metric", + "properties": { + "name": { + "description": "name is the name of the given metric", + "type": "string" + }, + "selector": { + "$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector", + "description": "selector is the string-encoded form of a standard kubernetes label selector for the given metric When set, it is passed as an additional parameter to the metrics server for more specific metrics scoping. When unset, just the metricName will be used to gather metrics." + } + }, + "required": [ + "name" + ], + "type": "object", + "additionalProperties": false + }, + "io.k8s.api.autoscaling.v2beta2.MetricSpec": { + "description": "MetricSpec specifies how to scale based on a single metric (only `type` and one other matching field should be set at once).", + "properties": { + "containerResource": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.ContainerResourceMetricSource", + "description": "container resource refers to a resource metric (such as those specified in requests and limits) known to Kubernetes describing a single container in each pod of the current scale target (e.g. CPU or memory). Such metrics are built in to Kubernetes, and have special scaling options on top of those available to normal per-pod metrics using the \"pods\" source. This is an alpha feature and can be enabled by the HPAContainerMetrics feature flag." + }, + "external": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.ExternalMetricSource", + "description": "external refers to a global metric that is not associated with any Kubernetes object. It allows autoscaling based on information coming from components running outside of cluster (for example length of queue in cloud messaging service, or QPS from loadbalancer running outside of cluster)." + }, + "object": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.ObjectMetricSource", + "description": "object refers to a metric describing a single kubernetes object (for example, hits-per-second on an Ingress object)." + }, + "pods": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.PodsMetricSource", + "description": "pods refers to a metric describing each pod in the current scale target (for example, transactions-processed-per-second). The values will be averaged together before being compared to the target value." + }, + "resource": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.ResourceMetricSource", + "description": "resource refers to a resource metric (such as those specified in requests and limits) known to Kubernetes describing each pod in the current scale target (e.g. CPU or memory). Such metrics are built in to Kubernetes, and have special scaling options on top of those available to normal per-pod metrics using the \"pods\" source." + }, + "type": { + "description": "type is the type of metric source. It should be one of \"ContainerResource\", \"External\", \"Object\", \"Pods\" or \"Resource\", each mapping to a matching field in the object. Note: \"ContainerResource\" type is available on when the feature-gate HPAContainerMetrics is enabled", + "type": "string" + } + }, + "required": [ + "type" + ], + "type": "object", + "additionalProperties": false + }, + "io.k8s.api.autoscaling.v2beta2.MetricTarget": { + "description": "MetricTarget defines the target value, average value, or average utilization of a specific metric", + "properties": { + "averageUtilization": { + "description": "averageUtilization is the target value of the average of the resource metric across all relevant pods, represented as a percentage of the requested value of the resource for the pods. Currently only valid for Resource metric source type", + "format": "int32", + "type": "integer" + }, + "averageValue": { + "$ref": "#/definitions/io.k8s.apimachinery.pkg.api.resource.Quantity", + "description": "averageValue is the target value of the average of the metric across all relevant pods (as a quantity)" + }, + "type": { + "description": "type represents whether the metric type is Utilization, Value, or AverageValue", + "type": "string" + }, + "value": { + "$ref": "#/definitions/io.k8s.apimachinery.pkg.api.resource.Quantity", + "description": "value is the target value of the metric (as a quantity)." + } + }, + "required": [ + "type" + ], + "type": "object", + "additionalProperties": false + }, + "io.k8s.api.autoscaling.v2beta2.ObjectMetricSource": { + "description": "ObjectMetricSource indicates how to scale on a metric describing a kubernetes object (for example, hits-per-second on an Ingress object).", + "properties": { + "describedObject": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.CrossVersionObjectReference" + }, + "metric": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricIdentifier", + "description": "metric identifies the target metric by name and selector" + }, + "target": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricTarget", + "description": "target specifies the target value for the given metric" + } + }, + "required": [ + "describedObject", + "target", + "metric" + ], + "type": "object", + "additionalProperties": false + }, + "io.k8s.api.autoscaling.v2beta2.PodsMetricSource": { + "description": "PodsMetricSource indicates how to scale on a metric describing each pod in the current scale target (for example, transactions-processed-per-second). The values will be averaged together before being compared to the target value.", + "properties": { + "metric": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricIdentifier", + "description": "metric identifies the target metric by name and selector" + }, + "target": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricTarget", + "description": "target specifies the target value for the given metric" + } + }, + "required": [ + "metric", + "target" + ], + "type": "object", + "additionalProperties": false + }, + "io.k8s.api.autoscaling.v2beta2.ResourceMetricSource": { + "description": "ResourceMetricSource indicates how to scale on a resource metric known to Kubernetes, as specified in requests and limits, describing each pod in the current scale target (e.g. CPU or memory). The values will be averaged together before being compared to the target. Such metrics are built in to Kubernetes, and have special scaling options on top of those available to normal per-pod metrics using the \"pods\" source. Only one \"target\" type should be set.", + "properties": { + "name": { + "description": "name is the name of the resource in question.", + "type": "string" + }, + "target": { + "$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricTarget", + "description": "target specifies the target value for the given metric" + } + }, + "required": [ + "name", + "target" + ], + "type": "object", + "additionalProperties": false + }, "io.k8s.api.core.v1.AWSElasticBlockStoreVolumeSource": { "description": "Represents a Persistent Disk resource in AWS.\n\nAn AWS EBS disk must exist before mounting to a container. The disk must also be in the same AWS zone as the kubelet. An AWS EBS disk can only be mounted as read/write once. AWS EBS volumes support ownership management and SELinux relabeling.", "properties": { diff --git a/chart/values.yaml b/chart/values.yaml index e67a1f6f640be..65aa036a8fe19 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -592,6 +592,28 @@ workers: # This configuration will be ignored if PGBouncer is not enabled usePgbouncer: true + # Allow HPA (KEDA must be disabled). + hpa: + enabled: false + + # Minimum number of workers created by HPA + minReplicaCount: 0 + + # Maximum number of workers created by HPA + maxReplicaCount: 5 + + # Specifications for which to use to calculate the desired replica count + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 80 + + # Scaling behavior of the target in both Up and Down directions + behavior: {} + persistence: # Enable persistent volumes enabled: true diff --git a/helm_tests/airflow_core/test_worker.py b/helm_tests/airflow_core/test_worker.py index 1cd1af5f7ed9c..2cbae498c02df 100644 --- a/helm_tests/airflow_core/test_worker.py +++ b/helm_tests/airflow_core/test_worker.py @@ -984,6 +984,102 @@ def test_should_use_keda_query(self, query, executor, expected_query): assert expected_query == jmespath.search("spec.triggers[0].metadata.query", docs[0]) +class TestWorkerHPAAutoScaler: + """Tests worker HPA auto scaler.""" + + def test_should_be_disabled_on_keda_enabled(self): + docs = render_chart( + values={ + "executor": "CeleryExecutor", + "workers": { + "keda": {"enabled": True}, + "hpa": {"enabled": True}, + "labels": {"test_label": "test_label_value"}, + }, + }, + show_only=[ + "templates/workers/worker-kedaautoscaler.yaml", + "templates/workers/worker-hpa.yaml", + ], + ) + assert "test_label" in jmespath.search("metadata.labels", docs[0]) + assert jmespath.search("metadata.labels", docs[0])["test_label"] == "test_label_value" + assert len(docs) == 1 + + def test_should_add_component_specific_labels(self): + docs = render_chart( + values={ + "executor": "CeleryExecutor", + "workers": { + "hpa": {"enabled": True}, + "labels": {"test_label": "test_label_value"}, + }, + }, + show_only=["templates/workers/worker-hpa.yaml"], + ) + + assert "test_label" in jmespath.search("metadata.labels", docs[0]) + assert jmespath.search("metadata.labels", docs[0])["test_label"] == "test_label_value" + + def test_should_remove_replicas_field(self): + docs = render_chart( + values={ + "executor": "CeleryExecutor", + "workers": { + "hpa": {"enabled": True}, + }, + }, + show_only=["templates/workers/worker-deployment.yaml"], + ) + assert "replicas" not in jmespath.search("spec", docs[0]) + + @pytest.mark.parametrize( + "metrics, executor, expected_metrics", + [ + # default metrics + ( + None, + "CeleryExecutor", + { + "type": "Resource", + "resource": {"name": "cpu", "target": {"type": "Utilization", "averageUtilization": 80}}, + }, + ), + # custom metric + ( + [ + { + "type": "Pods", + "pods": { + "metric": {"name": "custom"}, + "target": {"type": "Utilization", "averageUtilization": 80}, + }, + } + ], + "CeleryKubernetesExecutor", + { + "type": "Pods", + "pods": { + "metric": {"name": "custom"}, + "target": {"type": "Utilization", "averageUtilization": 80}, + }, + }, + ), + ], + ) + def test_should_use_hpa_metrics(self, metrics, executor, expected_metrics): + docs = render_chart( + values={ + "executor": executor, + "workers": { + "hpa": {"enabled": True, **({"metrics": metrics} if metrics else {})}, + }, + }, + show_only=["templates/workers/worker-hpa.yaml"], + ) + assert expected_metrics == jmespath.search("spec.metrics[0]", docs[0]) + + class TestWorkerNetworkPolicy: """Tests worker network policy.""" diff --git a/helm_tests/other/test_hpa.py b/helm_tests/other/test_hpa.py new file mode 100644 index 0000000000000..dd71a970b12bd --- /dev/null +++ b/helm_tests/other/test_hpa.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import jmespath +import pytest + +from tests.charts.helm_template_generator import render_chart + + +class TestHPA: + """Tests HPA.""" + + def test_hpa_disabled_by_default(self): + """Disabled by default.""" + docs = render_chart( + values={}, + show_only=["templates/workers/worker-hpa.yaml"], + ) + assert docs == [] + + @pytest.mark.parametrize( + "executor, is_created", + [ + ("CeleryExecutor", True), + ("CeleryKubernetesExecutor", True), + ], + ) + def test_hpa_enabled(self, executor, is_created): + """HPA should only be created when enabled and executor is Celery or CeleryKubernetes.""" + docs = render_chart( + values={ + "workers": {"hpa": {"enabled": True}, "persistence": {"enabled": False}}, + "executor": executor, + }, + show_only=["templates/workers/worker-hpa.yaml"], + ) + if is_created: + assert jmespath.search("metadata.name", docs[0]) == "release-name-worker" + else: + assert docs == [] + + @pytest.mark.parametrize( + "min_replicas, max_replicas", + [ + (None, None), + (2, 8), + ], + ) + def test_min_max_replicas(self, min_replicas, max_replicas): + """Verify minimum and maximum replicas.""" + docs = render_chart( + values={ + "workers": { + "hpa": { + "enabled": True, + **({"minReplicaCount": min_replicas} if min_replicas else {}), + **({"maxReplicaCount": max_replicas} if max_replicas else {}), + } + }, + }, + show_only=["templates/workers/worker-hpa.yaml"], + ) + assert jmespath.search("spec.minReplicas", docs[0]) == 0 if min_replicas is None else min_replicas + assert jmespath.search("spec.maxReplicas", docs[0]) == 5 if max_replicas is None else max_replicas + + @pytest.mark.parametrize("executor", ["CeleryExecutor", "CeleryKubernetesExecutor"]) + def test_hpa_behavior(self, executor): + """Verify HPA behavior.""" + expected_behavior = { + "scaleDown": { + "stabilizationWindowSeconds": 300, + "policies": [{"type": "Percent", "value": 100, "periodSeconds": 15}], + } + } + docs = render_chart( + values={ + "workers": { + "hpa": { + "enabled": True, + "behavior": expected_behavior, + }, + }, + "executor": executor, + }, + show_only=["templates/workers/worker-hpa.yaml"], + ) + assert jmespath.search("spec.behavior", docs[0]) == expected_behavior + + @pytest.mark.parametrize( + "enabled, kind", + [ + ("enabled", "StatefulSet"), + ("not_enabled", "Deployment"), + ], + ) + def test_persistence(self, enabled, kind): + """If worker persistence is enabled, scaleTargetRef should be StatefulSet else Deployment.""" + is_enabled = enabled == "enabled" + docs = render_chart( + values={ + "workers": {"hpa": {"enabled": True}, "persistence": {"enabled": is_enabled}}, + "executor": "CeleryExecutor", + }, + show_only=["templates/workers/worker-hpa.yaml"], + ) + assert jmespath.search("spec.scaleTargetRef.kind", docs[0]) == kind