From fef2daf398ee7373373a35375ddb16360d78a568 Mon Sep 17 00:00:00 2001 From: phi Date: Tue, 11 Feb 2025 20:25:20 +0900 Subject: [PATCH 1/6] feat: add args labels --- .../docker/src/airflow/providers/docker/operators/docker.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/providers/docker/src/airflow/providers/docker/operators/docker.py b/providers/docker/src/airflow/providers/docker/operators/docker.py index a175b3ef4c9f8..dd9bd630bec2b 100644 --- a/providers/docker/src/airflow/providers/docker/operators/docker.py +++ b/providers/docker/src/airflow/providers/docker/operators/docker.py @@ -192,6 +192,8 @@ class DockerOperator(BaseOperator): Incompatible with ``"host"`` in ``network_mode``. :param ulimits: List of ulimit options to set for the container. Each item should be a :py:class:`docker.types.Ulimit` instance. + :param labels: A dictionary of name-value labels (e.g. ``{"label1": "value1", "label2": "value2"}``) + or a list of names of labels to set with empty values (e.g. ``["label1", "label2"]``) """ # !!! Changes in DockerOperator's arguments should be also reflected in !!! @@ -255,6 +257,7 @@ def __init__( skip_on_exit_code: int | Container[int] | None = None, port_bindings: dict | None = None, ulimits: list[Ulimit] | None = None, + labels: dict[str, str] | list[str] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -301,6 +304,7 @@ def __init__( self.cap_add = cap_add self.extra_hosts = extra_hosts self.ulimits = ulimits or [] + self.labels = labels self.container: dict = None # type: ignore[assignment] self.retrieve_output = retrieve_output @@ -407,6 +411,7 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[ working_dir=self.working_dir, tty=self.tty, hostname=self.hostname, + labels=self.labels, ) log_stream = self.cli.attach(container=self.container["Id"], stdout=True, stderr=True, stream=True) try: From 685c4b76d3790f3e12a735a8d7cd5806b0b5faa1 Mon Sep 17 00:00:00 2001 From: phi Date: Tue, 11 Feb 2025 20:25:34 +0900 Subject: [PATCH 2/6] fix: add larg labels stub --- airflow/decorators/__init__.pyi | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index ab8f636cb059b..34f110983cbce 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -404,6 +404,7 @@ class TaskDecoratorCollection: skip_on_exit_code: int | Container[int] | None = None, port_bindings: dict | None = None, ulimits: list[dict] | None = None, + labels: dict[str, str] | list[str] | None = None, **kwargs, ) -> TaskDecorator: """Create a decorator to convert the decorated callable to a Docker task. @@ -508,6 +509,8 @@ class TaskDecoratorCollection: Incompatible with ``"host"`` in ``network_mode``. :param ulimits: List of ulimit options to set for the container. Each item should be a :py:class:`docker.types.Ulimit` instance. + :param labels: A dictionary of name-value labels (e.g. ``{"label1": "value1", "label2": "value2"}``) + or a list of names of labels to set with empty values (e.g. ``["label1", "label2"]``) """ # [END decorator_signature] def kubernetes( From 9c6a3569f38b5b601f28efbd79a5b29cd1c94c78 Mon Sep 17 00:00:00 2001 From: phi Date: Tue, 11 Feb 2025 20:46:33 +0900 Subject: [PATCH 3/6] test: add labels test --- .../tests/provider_tests/docker/operators/test_docker.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/providers/docker/tests/provider_tests/docker/operators/test_docker.py b/providers/docker/tests/provider_tests/docker/operators/test_docker.py index 2e9b001959279..f80b2cfe1f0ef 100644 --- a/providers/docker/tests/provider_tests/docker/operators/test_docker.py +++ b/providers/docker/tests/provider_tests/docker/operators/test_docker.py @@ -781,3 +781,11 @@ def test_docker_host_env_unset(self, monkeypatch): def test_fetch_logs(self, logger_mock, log_lines, expected_lines): fetch_logs(log_lines, logger_mock) assert logger_mock.info.call_args_list == [call("%s", line) for line in expected_lines] + + @pytest.mark.parametrize("labels", ({"key": "value"}, ["key=value"])) + def test_labels(self, labels: dict[str, str] | list[str]): + operator = DockerOperator(task_id="test", image="test", labels=labels) + operator.execute(None) + self.client_mock.create_container.assert_called_once() + assert "labels" in self.client_mock.create_container.call_args.kwargs + assert labels == self.client_mock.create_container.call_args.kwargs["labels"] From 8ff85bd18320c2526f95021be6c82c5c454d5243 Mon Sep 17 00:00:00 2001 From: phi Date: Tue, 11 Feb 2025 22:08:18 +0900 Subject: [PATCH 4/6] fix: assert_called_once_with error --- .../tests/provider_tests/docker/operators/test_docker.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/providers/docker/tests/provider_tests/docker/operators/test_docker.py b/providers/docker/tests/provider_tests/docker/operators/test_docker.py index f80b2cfe1f0ef..dcab3ee883796 100644 --- a/providers/docker/tests/provider_tests/docker/operators/test_docker.py +++ b/providers/docker/tests/provider_tests/docker/operators/test_docker.py @@ -226,6 +226,7 @@ def test_execute(self): tty=True, hostname=TEST_CONTAINER_HOSTNAME, ports=[], + labels=None, ) self.client_mock.create_host_config.assert_called_once_with( mounts=[ @@ -319,6 +320,7 @@ def test_execute_no_temp_dir(self): ipc_mode=None, port_bindings={}, ulimits=[], + labels=None, ) self.tempdir_mock.assert_not_called() self.client_mock.images.assert_called_once_with(name=TEST_IMAGE) @@ -392,6 +394,7 @@ def test_execute_fallback_temp_dir(self, caplog): tty=True, hostname=None, ports=[], + labels=None, ), call( command="env", @@ -430,6 +433,7 @@ def test_execute_fallback_temp_dir(self, caplog): ipc_mode=None, port_bindings={}, ulimits=[], + labels=None, ), call( mounts=[ @@ -450,6 +454,7 @@ def test_execute_fallback_temp_dir(self, caplog): ipc_mode=None, port_bindings={}, ulimits=[], + labels=None, ), ] ) @@ -508,6 +513,7 @@ def test_environment_overrides_env_file(self, stringio_mock): tty=True, hostname=None, ports=[], + labels=None, ) stringio_mock.assert_called_once_with("UNIT=FILE\nPRIVATE=FILE\nVAR=VALUE") self.dotenv_mock.assert_called_once_with(stream="UNIT=FILE\nPRIVATE=FILE\nVAR=VALUE") From 917df4bb86221b6dc62e521ae2c580bd130f8662 Mon Sep 17 00:00:00 2001 From: phi Date: Tue, 11 Feb 2025 22:24:39 +0900 Subject: [PATCH 5/6] fix: create_container.assert_* error --- .../docker/tests/provider_tests/docker/operators/test_docker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/providers/docker/tests/provider_tests/docker/operators/test_docker.py b/providers/docker/tests/provider_tests/docker/operators/test_docker.py index dcab3ee883796..720a06ffee0d4 100644 --- a/providers/docker/tests/provider_tests/docker/operators/test_docker.py +++ b/providers/docker/tests/provider_tests/docker/operators/test_docker.py @@ -300,6 +300,7 @@ def test_execute_no_temp_dir(self): tty=True, hostname=TEST_CONTAINER_HOSTNAME, ports=[], + labels=None, ) self.client_mock.create_host_config.assert_called_once_with( mounts=[ @@ -408,6 +409,7 @@ def test_execute_fallback_temp_dir(self, caplog): tty=True, hostname=None, ports=[], + labels=None, ), ] ) From 3724bc516a12b5a0ae8978d379d0e2f961966b02 Mon Sep 17 00:00:00 2001 From: phi Date: Tue, 11 Feb 2025 22:45:05 +0900 Subject: [PATCH 6/6] fix: rm create_host_config labels --- .../tests/provider_tests/docker/operators/test_docker.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/providers/docker/tests/provider_tests/docker/operators/test_docker.py b/providers/docker/tests/provider_tests/docker/operators/test_docker.py index 720a06ffee0d4..50ff7e4f5533b 100644 --- a/providers/docker/tests/provider_tests/docker/operators/test_docker.py +++ b/providers/docker/tests/provider_tests/docker/operators/test_docker.py @@ -321,7 +321,6 @@ def test_execute_no_temp_dir(self): ipc_mode=None, port_bindings={}, ulimits=[], - labels=None, ) self.tempdir_mock.assert_not_called() self.client_mock.images.assert_called_once_with(name=TEST_IMAGE) @@ -435,7 +434,6 @@ def test_execute_fallback_temp_dir(self, caplog): ipc_mode=None, port_bindings={}, ulimits=[], - labels=None, ), call( mounts=[ @@ -456,7 +454,6 @@ def test_execute_fallback_temp_dir(self, caplog): ipc_mode=None, port_bindings={}, ulimits=[], - labels=None, ), ] )