From 18cfac73ba1ae423b0ecd5c1134d39ba805b50e1 Mon Sep 17 00:00:00 2001 From: Jeff Stein Date: Tue, 9 Jun 2026 11:33:12 -0700 Subject: [PATCH 1/4] Compare logs instead of truncating, allow changing object write mode --- .../amazon/aws/log/s3_task_handler.py | 8 ++--- .../amazon/aws/log/test_s3_task_handler.py | 34 +++++++++++++++++-- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py index e9cc98ac632f7..6edf75b65970a 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -57,13 +57,11 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI): remote_loc = os.path.join(self.remote_base, path) if local_loc.is_file(): - # read log and remove old logs to get just the latest additions log = local_loc.read_text() - has_uploaded = self.write(log, remote_loc) + append = conf.get("logging", "object_store_write_mode", fallback="append") != "replace" + has_uploaded = self.write(log, remote_loc, append=append) if has_uploaded and self.delete_local_copy: shutil.rmtree(os.path.dirname(local_loc)) - elif has_uploaded: - local_loc.write_text("") @cached_property def hook(self): @@ -121,7 +119,7 @@ def write( try: if append and self.s3_log_exists(remote_log_location): old_log = self.s3_read(remote_log_location) - if old_log: + if old_log and not log.startswith(old_log): sep = "" if old_log.endswith("\n") else "\n" log = f"{old_log}{sep}{log}" except Exception: diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index 211a213b3b892..3eea73242808f 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -187,9 +187,9 @@ def test_write_existing(self): def test_upload_repeated_appends_no_duplication(self): """Simulate reschedule-mode sensor: each cycle appends to the local log, then uploads. - Without truncation after upload, the S3 object accumulates duplicate - lines and grows O(N^2). The correct behavior is that each line appears - in S3 exactly once. + Without the startswith guard, the S3 object accumulates duplicate lines + and grows O(N^2). The correct behavior is that each line appears in S3 + exactly once. """ local_log = self.subject.base_log_folder / "1.log" local_log.parent.mkdir(parents=True, exist_ok=True) @@ -201,6 +201,34 @@ def test_upload_repeated_appends_no_duplication(self): body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read() assert body == b"cycle 1\ncycle 2\ncycle 3\n" + assert local_log.read_text() == "cycle 1\ncycle 2\ncycle 3\n" + + def test_upload_append_from_different_worker(self): + """When a different worker uploads new content, it appends to the existing S3 object.""" + self.conn.put_object(Bucket="bucket", Key=self.remote_log_key, Body=b"cycle 1\ncycle 2\n") + + local_log = self.subject.base_log_folder / "1.log" + local_log.parent.mkdir(parents=True, exist_ok=True) + local_log.write_text("cycle 3\n") + + self.subject.upload(local_log, self.ti) + + body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read() + assert body == b"cycle 1\ncycle 2\ncycle 3\n" + + @conf_vars({("logging", "object_store_write_mode"): "replace"}) + def test_upload_replace_mode(self): + """When object_store_write_mode is 'replace', S3 is overwritten without downloading.""" + self.conn.put_object(Bucket="bucket", Key=self.remote_log_key, Body=b"old content\n") + + local_log = self.subject.base_log_folder / "1.log" + local_log.parent.mkdir(parents=True, exist_ok=True) + local_log.write_text("new content\n") + + self.subject.upload(local_log, self.ti) + + body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read() + assert body == b"new content\n" def test_write_raises(self, caplog): url = "s3://nonexistentbucket/foo" From 6c3bfde12e41afb7ed002f6ffdba8d868b55f9f4 Mon Sep 17 00:00:00 2001 From: Jeff Stein Date: Wed, 10 Jun 2026 10:30:23 -0700 Subject: [PATCH 2/4] Add configuration option to the registry --- airflow-core/src/airflow/config_templates/config.yml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 9ed0433c6c4b4..f9ad8f05a6888 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -779,6 +779,18 @@ logging: type: string example: ~ default: "False" + object_store_write_mode: + description: | + Controls how remote log handlers write to the remote location. In ``append`` mode + (the default), the handler downloads the existing remote content and concatenates the + new log content onto it. In ``replace`` mode, the handler overwrites the remote + content with the full local log file without downloading first. Use ``replace`` when + the local log file is authoritative (e.g. shared filesystem) and you want to avoid + the download-and-concatenate overhead. + version_added: 3.3.0 + type: string + example: ~ + default: "append" google_key_path: description: | Path to Google Credential JSON file. If omitted, authorization based on `the Application Default From 7c981ed86fae9561d6927c1ff9316e9e58e0e83a Mon Sep 17 00:00:00 2001 From: Jeff Stein Date: Thu, 11 Jun 2026 07:09:30 -0700 Subject: [PATCH 3/4] Move config option to amazon provider --- airflow-core/src/airflow/config_templates/config.yml | 12 ------------ providers/amazon/provider.yaml | 12 ++++++++++++ .../providers/amazon/aws/log/s3_task_handler.py | 2 +- .../unit/amazon/aws/log/test_s3_task_handler.py | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index f9ad8f05a6888..9ed0433c6c4b4 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -779,18 +779,6 @@ logging: type: string example: ~ default: "False" - object_store_write_mode: - description: | - Controls how remote log handlers write to the remote location. In ``append`` mode - (the default), the handler downloads the existing remote content and concatenates the - new log content onto it. In ``replace`` mode, the handler overwrites the remote - content with the full local log file without downloading first. Use ``replace`` when - the local log file is authoritative (e.g. shared filesystem) and you want to avoid - the download-and-concatenate overhead. - version_added: 3.3.0 - type: string - example: ~ - default: "append" google_key_path: description: | Path to Google Credential JSON file. If omitted, authorization based on `the Application Default diff --git a/providers/amazon/provider.yaml b/providers/amazon/provider.yaml index aa370469f1ad6..ca88080685afd 100644 --- a/providers/amazon/provider.yaml +++ b/providers/amazon/provider.yaml @@ -1121,6 +1121,18 @@ config: example: my_company.aws.MyCustomSessionFactory type: string version_added: 3.1.1 + s3_log_write_mode: + description: | + Controls how the S3 remote log handler writes to S3. In ``append`` mode (the + default), the handler downloads the existing S3 object and concatenates the new + log content onto it. In ``replace`` mode, the handler overwrites the S3 object + with the full local log file without downloading first. Use ``replace`` when the + local log file is authoritative (e.g. shared filesystem) and you want to avoid + the download-and-concatenate overhead. + type: string + version_added: ~ + example: ~ + default: "append" cloudwatch_task_handler_json_serializer: description: | By default, when logging non-string messages, all non-json objects are logged as `null`. diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py index 6edf75b65970a..51bd79e33b6d7 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -58,7 +58,7 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI): if local_loc.is_file(): log = local_loc.read_text() - append = conf.get("logging", "object_store_write_mode", fallback="append") != "replace" + append = conf.get("aws", "s3_log_write_mode", fallback="append") != "replace" has_uploaded = self.write(log, remote_loc, append=append) if has_uploaded and self.delete_local_copy: shutil.rmtree(os.path.dirname(local_loc)) diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index 3eea73242808f..0b71c65338fcf 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -216,9 +216,9 @@ def test_upload_append_from_different_worker(self): body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read() assert body == b"cycle 1\ncycle 2\ncycle 3\n" - @conf_vars({("logging", "object_store_write_mode"): "replace"}) + @conf_vars({("aws", "s3_log_write_mode"): "replace"}) def test_upload_replace_mode(self): - """When object_store_write_mode is 'replace', S3 is overwritten without downloading.""" + """When s3_log_write_mode is 'replace', S3 is overwritten without downloading.""" self.conn.put_object(Bucket="bucket", Key=self.remote_log_key, Body=b"old content\n") local_log = self.subject.base_log_folder / "1.log" From d6766d8fccc570c41263278c98303864025bd911 Mon Sep 17 00:00:00 2001 From: Jeff Stein Date: Thu, 11 Jun 2026 08:47:58 -0700 Subject: [PATCH 4/4] Update provider info generated file --- .../src/airflow/providers/amazon/get_provider_info.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py index 3a407bb1b2fa6..1fd420e8ad8fd 100644 --- a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py +++ b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py @@ -1241,6 +1241,13 @@ def get_provider_info(): "type": "string", "version_added": "3.1.1", }, + "s3_log_write_mode": { + "description": "Controls how the S3 remote log handler writes to S3. In ``append`` mode (the\ndefault), the handler downloads the existing S3 object and concatenates the new\nlog content onto it. In ``replace`` mode, the handler overwrites the S3 object\nwith the full local log file without downloading first. Use ``replace`` when the\nlocal log file is authoritative (e.g. shared filesystem) and you want to avoid\nthe download-and-concatenate overhead.\n", + "type": "string", + "version_added": None, + "example": None, + "default": "append", + }, "cloudwatch_task_handler_json_serializer": { "description": "By default, when logging non-string messages, all non-json objects are logged as `null`.\nExcept `datetime` objects which are ISO formatted. Users can optionally use a `repr` serializer or\nprovide their own JSON serializer for any non-JSON-serializable objects in the logged message.\n\n* `airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize` uses `repr` (be aware\n there is the potential of logging sensitive data depending on the `repr` method of logged objects)\n* `airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy` uses `null`.\n\nIf a custom serializer is provided, it must adhere to `Callable[[Any], str | None]`, where `None`\nserializes to `null` (e.g. `def my_serializer(o: Any) -> str | None`). Since this is on the logging\npath and it's possible there's an exception being handled, special care should be taken to fail\ngracefully without raising a new exception inside of your serializer.\n", "type": "string",