Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,18 @@ config:
example: my_company.aws.MyCustomSessionFactory
type: string
version_added: 3.1.1
s3_log_write_mode:
description: |
Controls how the S3 remote log handler writes to S3. In ``append`` mode (the
default), the handler downloads the existing S3 object and concatenates the new
log content onto it. In ``replace`` mode, the handler overwrites the S3 object
with the full local log file without downloading first. Use ``replace`` when the
local log file is authoritative (e.g. shared filesystem) and you want to avoid
the download-and-concatenate overhead.
type: string
version_added: ~
example: ~
default: "append"
cloudwatch_task_handler_json_serializer:
description: |
By default, when logging non-string messages, all non-json objects are logged as `null`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
remote_loc = os.path.join(self.remote_base, path)

if local_loc.is_file():
# read log and remove old logs to get just the latest additions
log = local_loc.read_text()
has_uploaded = self.write(log, remote_loc)
append = conf.get("aws", "s3_log_write_mode", fallback="append") != "replace"
has_uploaded = self.write(log, remote_loc, append=append)
if has_uploaded and self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))
elif has_uploaded:
local_loc.write_text("")

@cached_property
def hook(self):
Expand Down Expand Up @@ -121,7 +119,7 @@ def write(
try:
if append and self.s3_log_exists(remote_log_location):
old_log = self.s3_read(remote_log_location)
if old_log:
if old_log and not log.startswith(old_log):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a VERY expensive operation. Do we really need it? I'm not clear on what the use case is

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two competing use cases:

In the former use case, the startswith check gets more expensive as the logs grow if the task never lands on another worker (e.g. due to celery queue settings). The added expense prevents the duplicated logs and lowers the OOM chance.

In the latter use case, the check grows more expensive over time, but there's a way to opt out and set the config option.

I used the local log file truncation initially to avoid this comparison cost, but it caused the shared logs volume problem.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared volume logs isn't recommended generally I thought for this and a whole host of other problems.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of the state of that recommendation. I've used that dual logging configuration in the past, but don't currently use it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for my use case (the former), I can set delete_local_logs=True. Which is effectively the same as truncating. But that doesn't solve the case of log duplication in the latter case.

sep = "" if old_log.endswith("\n") else "\n"
log = f"{old_log}{sep}{log}"
except Exception:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,13 @@ def get_provider_info():
"type": "string",
"version_added": "3.1.1",
},
"s3_log_write_mode": {
"description": "Controls how the S3 remote log handler writes to S3. In ``append`` mode (the\ndefault), the handler downloads the existing S3 object and concatenates the new\nlog content onto it. In ``replace`` mode, the handler overwrites the S3 object\nwith the full local log file without downloading first. Use ``replace`` when the\nlocal log file is authoritative (e.g. shared filesystem) and you want to avoid\nthe download-and-concatenate overhead.\n",
"type": "string",
"version_added": None,
"example": None,
"default": "append",
},
"cloudwatch_task_handler_json_serializer": {
"description": "By default, when logging non-string messages, all non-json objects are logged as `null`.\nExcept `datetime` objects which are ISO formatted. Users can optionally use a `repr` serializer or\nprovide their own JSON serializer for any non-JSON-serializable objects in the logged message.\n\n* `airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize` uses `repr` (be aware\n there is the potential of logging sensitive data depending on the `repr` method of logged objects)\n* `airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy` uses `null`.\n\nIf a custom serializer is provided, it must adhere to `Callable[[Any], str | None]`, where `None`\nserializes to `null` (e.g. `def my_serializer(o: Any) -> str | None`). Since this is on the logging\npath and it's possible there's an exception being handled, special care should be taken to fail\ngracefully without raising a new exception inside of your serializer.\n",
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,9 @@ def test_write_existing(self):
def test_upload_repeated_appends_no_duplication(self):
"""Simulate reschedule-mode sensor: each cycle appends to the local log, then uploads.

Without truncation after upload, the S3 object accumulates duplicate
lines and grows O(N^2). The correct behavior is that each line appears
in S3 exactly once.
Without the startswith guard, the S3 object accumulates duplicate lines
and grows O(N^2). The correct behavior is that each line appears in S3
exactly once.
"""
local_log = self.subject.base_log_folder / "1.log"
local_log.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -201,6 +201,34 @@ def test_upload_repeated_appends_no_duplication(self):

body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read()
assert body == b"cycle 1\ncycle 2\ncycle 3\n"
assert local_log.read_text() == "cycle 1\ncycle 2\ncycle 3\n"

def test_upload_append_from_different_worker(self):
"""When a different worker uploads new content, it appends to the existing S3 object."""
self.conn.put_object(Bucket="bucket", Key=self.remote_log_key, Body=b"cycle 1\ncycle 2\n")

local_log = self.subject.base_log_folder / "1.log"
local_log.parent.mkdir(parents=True, exist_ok=True)
local_log.write_text("cycle 3\n")

self.subject.upload(local_log, self.ti)

body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read()
assert body == b"cycle 1\ncycle 2\ncycle 3\n"

@conf_vars({("aws", "s3_log_write_mode"): "replace"})
def test_upload_replace_mode(self):
"""When s3_log_write_mode is 'replace', S3 is overwritten without downloading."""
self.conn.put_object(Bucket="bucket", Key=self.remote_log_key, Body=b"old content\n")

local_log = self.subject.base_log_folder / "1.log"
local_log.parent.mkdir(parents=True, exist_ok=True)
local_log.write_text("new content\n")

self.subject.upload(local_log, self.ti)

body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read()
assert body == b"new content\n"

def test_write_raises(self, caplog):
url = "s3://nonexistentbucket/foo"
Expand Down
Loading