diff --git a/providers/amazon/provider.yaml b/providers/amazon/provider.yaml index aa370469f1ad6..ca88080685afd 100644 --- a/providers/amazon/provider.yaml +++ b/providers/amazon/provider.yaml @@ -1121,6 +1121,18 @@ config: example: my_company.aws.MyCustomSessionFactory type: string version_added: 3.1.1 + s3_log_write_mode: + description: | + Controls how the S3 remote log handler writes to S3. In ``append`` mode (the + default), the handler downloads the existing S3 object and concatenates the new + log content onto it. In ``replace`` mode, the handler overwrites the S3 object + with the full local log file without downloading first. Use ``replace`` when the + local log file is authoritative (e.g. shared filesystem) and you want to avoid + the download-and-concatenate overhead. + type: string + version_added: ~ + example: ~ + default: "append" cloudwatch_task_handler_json_serializer: description: | By default, when logging non-string messages, all non-json objects are logged as `null`. diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py index e9cc98ac632f7..51bd79e33b6d7 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -57,13 +57,11 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI): remote_loc = os.path.join(self.remote_base, path) if local_loc.is_file(): - # read log and remove old logs to get just the latest additions log = local_loc.read_text() - has_uploaded = self.write(log, remote_loc) + append = conf.get("aws", "s3_log_write_mode", fallback="append") != "replace" + has_uploaded = self.write(log, remote_loc, append=append) if has_uploaded and self.delete_local_copy: shutil.rmtree(os.path.dirname(local_loc)) - elif has_uploaded: - local_loc.write_text("") @cached_property def hook(self): @@ -121,7 +119,7 @@ def write( try: if append and self.s3_log_exists(remote_log_location): old_log = self.s3_read(remote_log_location) - if old_log: + if old_log and not log.startswith(old_log): sep = "" if old_log.endswith("\n") else "\n" log = f"{old_log}{sep}{log}" except Exception: diff --git a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py index 3a407bb1b2fa6..1fd420e8ad8fd 100644 --- a/providers/amazon/src/airflow/providers/amazon/get_provider_info.py +++ b/providers/amazon/src/airflow/providers/amazon/get_provider_info.py @@ -1241,6 +1241,13 @@ def get_provider_info(): "type": "string", "version_added": "3.1.1", }, + "s3_log_write_mode": { + "description": "Controls how the S3 remote log handler writes to S3. In ``append`` mode (the\ndefault), the handler downloads the existing S3 object and concatenates the new\nlog content onto it. In ``replace`` mode, the handler overwrites the S3 object\nwith the full local log file without downloading first. Use ``replace`` when the\nlocal log file is authoritative (e.g. shared filesystem) and you want to avoid\nthe download-and-concatenate overhead.\n", + "type": "string", + "version_added": None, + "example": None, + "default": "append", + }, "cloudwatch_task_handler_json_serializer": { "description": "By default, when logging non-string messages, all non-json objects are logged as `null`.\nExcept `datetime` objects which are ISO formatted. Users can optionally use a `repr` serializer or\nprovide their own JSON serializer for any non-JSON-serializable objects in the logged message.\n\n* `airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize` uses `repr` (be aware\n there is the potential of logging sensitive data depending on the `repr` method of logged objects)\n* `airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy` uses `null`.\n\nIf a custom serializer is provided, it must adhere to `Callable[[Any], str | None]`, where `None`\nserializes to `null` (e.g. `def my_serializer(o: Any) -> str | None`). Since this is on the logging\npath and it's possible there's an exception being handled, special care should be taken to fail\ngracefully without raising a new exception inside of your serializer.\n", "type": "string", diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index 211a213b3b892..0b71c65338fcf 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -187,9 +187,9 @@ def test_write_existing(self): def test_upload_repeated_appends_no_duplication(self): """Simulate reschedule-mode sensor: each cycle appends to the local log, then uploads. - Without truncation after upload, the S3 object accumulates duplicate - lines and grows O(N^2). The correct behavior is that each line appears - in S3 exactly once. + Without the startswith guard, the S3 object accumulates duplicate lines + and grows O(N^2). The correct behavior is that each line appears in S3 + exactly once. """ local_log = self.subject.base_log_folder / "1.log" local_log.parent.mkdir(parents=True, exist_ok=True) @@ -201,6 +201,34 @@ def test_upload_repeated_appends_no_duplication(self): body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read() assert body == b"cycle 1\ncycle 2\ncycle 3\n" + assert local_log.read_text() == "cycle 1\ncycle 2\ncycle 3\n" + + def test_upload_append_from_different_worker(self): + """When a different worker uploads new content, it appends to the existing S3 object.""" + self.conn.put_object(Bucket="bucket", Key=self.remote_log_key, Body=b"cycle 1\ncycle 2\n") + + local_log = self.subject.base_log_folder / "1.log" + local_log.parent.mkdir(parents=True, exist_ok=True) + local_log.write_text("cycle 3\n") + + self.subject.upload(local_log, self.ti) + + body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read() + assert body == b"cycle 1\ncycle 2\ncycle 3\n" + + @conf_vars({("aws", "s3_log_write_mode"): "replace"}) + def test_upload_replace_mode(self): + """When s3_log_write_mode is 'replace', S3 is overwritten without downloading.""" + self.conn.put_object(Bucket="bucket", Key=self.remote_log_key, Body=b"old content\n") + + local_log = self.subject.base_log_folder / "1.log" + local_log.parent.mkdir(parents=True, exist_ok=True) + local_log.write_text("new content\n") + + self.subject.upload(local_log, self.ti) + + body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read() + assert body == b"new content\n" def test_write_raises(self, caplog): url = "s3://nonexistentbucket/foo"