Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,11 @@ def _get_source_includes(self, extra_fields: Iterable[str] = ()) -> list[str]:
["@timestamp", *TASK_LOG_FIELDS, self.host_field, self.offset_field, *extra_fields]
)

def upload(self, path: os.PathLike | str, ti: RuntimeTI):
def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None:
"""Write the log to ElasticSearch."""
if ti is None:
Comment thread
seanghaeli marked this conversation as resolved.
return

path = Path(path)

if path.is_absolute():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,9 @@ def test_read_returns_missing_log_message_when_es_read_returns_none(self, ti):
assert log_source_info == []
assert f"*** Log {log_id} not found in Elasticsearch" in log_messages[0]

def test_upload_returns_early_when_ti_is_none(self, tmp_json_file):
self.elasticsearch_io.upload(tmp_json_file, ti=None)


class TestFormatErrorDetail:
def test_returns_none_for_empty(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,8 +853,11 @@ def __attrs_post_init__(self):
self._doc_type_map: dict[Any, Any] = {}
self._doc_type: list[Any] = []

def upload(self, path: os.PathLike | str, ti: RuntimeTI):
def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None:
"""Emit structured task logs to stdout and/or write them directly to OpenSearch."""
if ti is None:
Comment thread
seanghaeli marked this conversation as resolved.
return

path = Path(path)
local_loc = path if path.is_absolute() else self.base_log_folder.joinpath(path)
if not local_loc.is_file():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,11 @@ def test_get_index_patterns_with_callable(self):
mock_callable.assert_called_once_with({})
assert result == "callable_index_pattern"

def test_upload_returns_early_when_ti_is_none(self, tmp_path):
log_file = tmp_path / "1.log"
log_file.write_text('{"message": "test"}\n')
self.opensearch_io.upload(log_file, ti=None)


class TestFormatErrorDetail:
def test_returns_none_for_empty(self):
Expand Down
2 changes: 1 addition & 1 deletion shared/logging/src/airflow_shared/logging/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def processors(self) -> tuple[structlog.typing.Processor, ...]:
"""
...

def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None:
def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None:
"""Upload the given log path to the remote storage."""
...

Expand Down
4 changes: 4 additions & 0 deletions shared/logging/tests/logging/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,7 @@ def stream(self, relative_path, ti):
def test_non_stream_handler_not_stream_io(self):
handler = DummyRemoteLogIO()
assert not isinstance(handler, RemoteLogStreamIO)

def test_upload_accepts_none_ti(self):
handler = DummyRemoteLogIO()
handler.upload("/some/path", ti=None)
36 changes: 26 additions & 10 deletions task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ def wait(self) -> int:
"""
Wait for the callback subprocess to complete.

Mirrors the structure of ActivitySubprocess.wait() but without heartbeating,
task API state management, or log uploading.
Mirrors the structure of ActivitySubprocess.wait() but without heartbeating
or task API state management.
"""
if self._exit_code is not None:
return self._exit_code
Expand All @@ -276,6 +276,9 @@ def wait(self) -> int:
self.selector.close()

self._exit_code = self._exit_code if self._exit_code is not None else 1

self._upload_logs()

return self._exit_code

def _get_callback_execution_timeout(self) -> int:
Expand All @@ -284,6 +287,18 @@ def _get_callback_execution_timeout(self) -> int:

return conf.getint("callbacks", "callback_execution_timeout", fallback=0)

def _upload_logs(self):
from airflow.sdk.execution_time.supervisor import _remote_logging_conn
from airflow.sdk.log import upload_to_remote

try:
with _remote_logging_conn(self.client):
upload_to_remote(self.process_log)
except Exception:
log.exception(
"Failed to upload callback logs to remote storage", callback_id=self.id, pid=self.pid
)

def _monitor_subprocess(self):
"""
Monitor the subprocess until it exits.
Expand Down Expand Up @@ -362,14 +377,16 @@ def _handle_request(self, msg: CallbackToSupervisor, log: FilteringBoundLogger,
self.send_msg(resp, request_id=req_id, error=None, **dump_opts)


def _configure_logging(log_path: str) -> tuple[FilteringBoundLogger, BinaryIO]:
def _configure_logging(log_path: str, client: Client) -> tuple[FilteringBoundLogger, BinaryIO]:
"""Configure file-based logging for the callback subprocess."""
from airflow.sdk.execution_time.supervisor import _remote_logging_conn
from airflow.sdk.log import init_log_file, logging_processors

log_file = init_log_file(log_path)
log_file_descriptor: BinaryIO = log_file.open("ab")
underlying_logger = structlog.BytesLogger(log_file_descriptor)
processors = logging_processors(json_output=True)
with _remote_logging_conn(client):
processors = logging_processors(json_output=True)
logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="callback").bind()

return logger, log_file_descriptor
Expand Down Expand Up @@ -407,14 +424,13 @@ def supervise_callback(

logger: FilteringBoundLogger
log_file_descriptor: BinaryIO | None = None
if log_path:
logger, log_file_descriptor = _configure_logging(log_path)
else:
# When no log file is requested, still use a callback-specific logger
# so logs are clearly separated from task logs.
logger = structlog.get_logger(logger_name="callback").bind()

with _ensure_client(server, token, client=client) as client:
if log_path:
logger, log_file_descriptor = _configure_logging(log_path, client)
else:
logger = structlog.get_logger(logger_name="callback").bind()

try:
process = CallbackSubprocess.start(
id=id,
Expand Down
10 changes: 6 additions & 4 deletions task-sdk/src/airflow/sdk/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,18 +223,20 @@ def relative_path_from_logger(logger) -> Path | None:
return Path(fname).relative_to(base_log_folder)


def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI):
def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI | None = None):
raw_logger = getattr(logger, "_logger")
# Dedicated logger for remote-upload visibility — operators relying on
# remote log handlers need a way to see when those handlers fail to load
# or fail to upload.
upload_log = structlog.get_logger("airflow.logging.remote")

ti_id = str(ti.id) if ti else None

handler = load_remote_log_handler()
if not handler:
upload_log.warning(
"remote_log_handler_unavailable",
ti_id=str(ti.id),
ti_id=ti_id,
note="Remote log handler could not be loaded; logs will be available locally only.",
)
return
Expand All @@ -244,7 +246,7 @@ def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI):
except Exception as exc:
upload_log.warning(
"remote_log_path_resolution_failed",
ti_id=str(ti.id),
ti_id=ti_id,
exc_info=exc,
)
return
Expand All @@ -257,7 +259,7 @@ def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI):
except Exception as exc:
upload_log.warning(
"remote_log_upload_failed",
ti_id=str(ti.id),
ti_id=ti_id,
log_relative_path=log_relative_path,
exc_info=exc,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,85 @@ def test_handle_requests(
mock_client_method.assert_called_once_with(*client_mock.args, **client_mock.kwargs)


class TestConfigureLogging:
"""Tests for _configure_logging remote logging connection setup."""

def test_configure_logging_uses_remote_logging_conn(self, tmp_path, mocker):
"""Verify that _remote_logging_conn is invoked with the client during logging setup."""
from airflow.sdk.execution_time.callback_supervisor import _configure_logging

mock_client = mocker.Mock()
log_path = str(tmp_path / "callback.log")

mock_remote_conn = mocker.patch(
"airflow.sdk.execution_time.supervisor._remote_logging_conn",
)

logger, fd = _configure_logging(log_path, mock_client)
fd.close()

mock_remote_conn.assert_called_once_with(mock_client)


class TestUploadLogs:
Comment thread
vincbeck marked this conversation as resolved.
"""Tests for CallbackSubprocess._upload_logs."""

@pytest.fixture
def callback_subprocess(self, mocker):
read_end, write_end = socket.socketpair()
proc = CallbackSubprocess(
process_log=mocker.MagicMock(),
id="12345678-1234-5678-1234-567812345678",
pid=12345,
stdin=write_end,
client=mocker.Mock(),
process=mocker.Mock(),
)
yield proc
read_end.close()
write_end.close()

def test_wait_calls_upload_logs_after_subprocess_completes(self, callback_subprocess, mocker):
"""wait() should call _upload_logs() after the subprocess finishes."""
mock_upload = mocker.patch(
"airflow.sdk.execution_time.callback_supervisor.CallbackSubprocess._upload_logs"
)
mocker.patch("airflow.sdk.execution_time.callback_supervisor.CallbackSubprocess._monitor_subprocess")
mocker.patch.object(callback_subprocess, "selector")

callback_subprocess.wait()

mock_upload.assert_called_once()

def test_upload_logs_delegates_to_upload_to_remote(self, callback_subprocess, mocker):
"""_upload_logs calls upload_to_remote with the process logger and no ti."""
mock_upload = mocker.patch("airflow.sdk.log.upload_to_remote")
mocker.patch("airflow.sdk.execution_time.supervisor._remote_logging_conn")

callback_subprocess._upload_logs()

mock_upload.assert_called_once_with(callback_subprocess.process_log)

def test_upload_logs_failure_is_swallowed(self, callback_subprocess, mocker):
"""Upload failures must not propagate — callback exit code should still be returned."""
mocker.patch(
"airflow.sdk.log.upload_to_remote",
side_effect=RuntimeError("S3 unreachable"),
)
mocker.patch("airflow.sdk.execution_time.supervisor._remote_logging_conn")

callback_subprocess._upload_logs()

def test_upload_logs_no_remote_logging_configured(self, callback_subprocess, mocker):
"""When remote logging is not configured, _upload_logs completes without error."""
mock_load_handler = mocker.patch("airflow.sdk.log.load_remote_log_handler", return_value=None)
mocker.patch("airflow.sdk.execution_time.supervisor._remote_logging_conn")

callback_subprocess._upload_logs()

mock_load_handler.assert_called_once()


class TestCallbackExecutionTimeout:
"""Tests for the callback_execution_timeout config enforcement."""

Expand All @@ -286,7 +365,6 @@ def test_timeout_zero_does_not_kill(self, callback_subprocess, mocker):
"""When timeout=0, no kill is issued regardless of how long the subprocess runs."""
proc = callback_subprocess

# Simulate subprocess exiting normally after some iterations
call_count = 0

def fake_service_subprocess(self_arg, **kwargs):
Expand All @@ -313,11 +391,9 @@ def test_timeout_kills_long_running_subprocess(self, callback_subprocess, mocker
"""When timeout>0 and the subprocess exceeds the timeout, it is killed."""
proc = callback_subprocess

# Simulate time progressing beyond the timeout
time_values = iter([100.0, 100.0, 106.0]) # start, start, elapsed > 5s timeout
time_values = iter([100.0, 100.0, 106.0])
mocker.patch("airflow.sdk.execution_time.callback_supervisor.time.monotonic", side_effect=time_values)

# Subprocess never exits on its own
mocker.patch.object(CallbackSubprocess, "_service_subprocess", autospec=True, return_value=None)
mocker.patch.object(
CallbackSubprocess, "_get_callback_execution_timeout", autospec=True, return_value=5
Expand Down
Loading