From f98a8d042b49a7e3f7179808c990f35ad27b0e7e Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 17 Jun 2026 00:15:22 +0100 Subject: [PATCH] Fix SSHRemoteJobOperator orphaning the remote job on cancellation --- .../airflow/providers/ssh/utils/remote_job.py | 50 +++++++++-- .../tests/unit/ssh/utils/test_remote_job.py | 90 +++++++++++++++++-- 2 files changed, 125 insertions(+), 15 deletions(-) diff --git a/providers/ssh/src/airflow/providers/ssh/utils/remote_job.py b/providers/ssh/src/airflow/providers/ssh/utils/remote_job.py index d761896651dc9..0481b09b57776 100644 --- a/providers/ssh/src/airflow/providers/ssh/utils/remote_job.py +++ b/providers/ssh/src/airflow/providers/ssh/utils/remote_job.py @@ -158,11 +158,12 @@ def build_posix_wrapper_command( environment: dict[str, str] | None = None, ) -> str: """ - Build a POSIX shell wrapper that runs the command detached via nohup. + Build a POSIX shell wrapper that runs the command detached. The wrapper: - Creates the job directory - - Starts the command in the background with nohup + - Starts the command detached in its own session via ``setsid`` (falling back to + ``nohup`` when ``setsid`` is unavailable) - Redirects stdout/stderr to the log file - Writes the exit code atomically on completion - Writes the PID for potential cancellation @@ -181,6 +182,12 @@ def build_posix_wrapper_command( escaped_command = command.replace("'", "'\"'\"'") + # Launch detached under ``setsid`` so the job is its own session/process-group + # leader. ``$!`` is then the leader PID *and* the PGID (verified: setsid does not + # fork when started as a background job), recorded synchronously just like before, + # so cancellation can signal the whole job tree instead of orphaning the user + # command. Without ``setsid`` (some macOS/BSD hosts) ``$!`` is just the wrapper PID + # and cancellation degrades to the previous single-process behaviour. wrapper = f"""set -euo pipefail job_dir='{paths.job_dir}' log_file='{paths.log_file}' @@ -192,7 +199,7 @@ def build_posix_wrapper_command( mkdir -p "$job_dir" : > "$log_file" -nohup bash -c ' +job_script=' set +e export LOG_FILE="'"$log_file"'" export STATUS_FILE="'"$status_file"'" @@ -201,8 +208,13 @@ def build_posix_wrapper_command( echo -n "$ec" > "'"$exit_code_tmp"'" mv "'"$exit_code_tmp"'" "'"$exit_code_file"'" exit 0 -' >/dev/null 2>&1 & +' +if command -v setsid >/dev/null 2>&1; then + setsid bash -c "$job_script" >/dev/null 2>&1 & +else + nohup bash -c "$job_script" >/dev/null 2>&1 & +fi echo -n $! > "$pid_file" echo "{paths.job_id}" """ @@ -379,24 +391,48 @@ def build_posix_kill_command(pid_file: str) -> str: """ Build a POSIX command to kill the remote process. + Signals the whole process group first (the negative-PID form ``kill -``) so + the user command and anything it spawned are terminated together, not just the + wrapper. The recorded PID is the job's session/group leader when it was launched + under ``setsid`` (see :func:`build_posix_wrapper_command`); if the job is not a + group leader (host without ``setsid``), the group signal is a no-op and we fall + back to killing the single PID, matching the previous behaviour. + + The pid value is validated as an integer ``> 1`` before being negated: a corrupt + or partial pid of ``0``/``1`` would otherwise turn ``kill -`` into a broadcast + to every process the SSH account can signal. Best-effort: the command never fails + the SSH call. + :param pid_file: Path to the PID file :return: Shell command to kill the process """ - return f"test -f '{pid_file}' && kill $(cat '{pid_file}') 2>/dev/null || true" + return ( + f"if test -f '{pid_file}'; then " + f"p=\"$(cat '{pid_file}')\"; " + f'if [ "$p" -gt 1 ] 2>/dev/null; then ' + f'kill -TERM -"$p" 2>/dev/null || kill -TERM "$p" 2>/dev/null || true; ' + "fi; fi" + ) def build_windows_kill_command(pid_file: str) -> str: """ Build a PowerShell command to kill the remote process. + Uses ``taskkill /T`` to terminate the recorded process *and its child + processes* (the Windows equivalent of a process-group kill), so the user + command launched by the detached wrapper is not left orphaned. ``$procId`` is + used instead of ``$pid`` because ``$PID`` is a read-only automatic variable in + PowerShell. + :param pid_file: Path to the PID file :return: PowerShell command to kill the process """ escaped_path = pid_file.replace("'", "''") script = f"""$path = '{escaped_path}' if (Test-Path $path) {{ - $pid = Get-Content $path - Stop-Process -Id $pid -Force -ErrorAction SilentlyContinue + $procId = Get-Content $path + & taskkill.exe /PID $procId /T /F 2>$null }}""" script_bytes = script.encode("utf-16-le") encoded_script = base64.b64encode(script_bytes).decode("ascii") diff --git a/providers/ssh/tests/unit/ssh/utils/test_remote_job.py b/providers/ssh/tests/unit/ssh/utils/test_remote_job.py index 1ababb62e08d7..4e7eca9b6973c 100644 --- a/providers/ssh/tests/unit/ssh/utils/test_remote_job.py +++ b/providers/ssh/tests/unit/ssh/utils/test_remote_job.py @@ -18,6 +18,11 @@ from __future__ import annotations import base64 +import os +import shutil +import subprocess +import time +from pathlib import Path import pytest @@ -126,6 +131,18 @@ def test_escapes_quotes(self): wrapper = build_posix_wrapper_command("echo 'hello world'", paths) assert wrapper is not None + def test_runs_in_own_process_group(self): + """The job launches under setsid (when available); $! is the leader PID/PGID.""" + paths = RemoteJobPaths(job_id="test_job", remote_os="posix") + wrapper = build_posix_wrapper_command("/path/to/script.sh", paths) + + # New session/process group when setsid exists, plain detached run otherwise + assert "command -v setsid" in wrapper + assert "setsid bash -c" in wrapper + assert "nohup bash -c" in wrapper + # Leader PID recorded synchronously by the launcher ($! == PGID under setsid) + assert 'echo -n $! > "$pid_file"' in wrapper + class TestBuildWindowsWrapperCommand: def test_basic_command(self): @@ -210,20 +227,77 @@ def test_windows_completion_check(self): class TestKillCommands: - def test_posix_kill(self): - """Test POSIX kill command.""" + def test_posix_kill_signals_process_group_then_falls_back(self): + """POSIX kill targets the process group first, then a single PID as fallback.""" cmd = build_posix_kill_command("/tmp/pid") - assert "kill" in cmd - assert "cat" in cmd - - def test_windows_kill(self): - """Test Windows kill command.""" + assert "cat '/tmp/pid'" in cmd + # Negative PID => signal the whole process group (kills the job's children too) + assert 'kill -TERM -"$p"' in cmd + # Fallback for jobs that are not group leaders (host without setsid) + assert 'kill -TERM "$p"' in cmd + # Guard against a corrupt/partial pid: -0/-1 would broadcast to every process + assert '[ "$p" -gt 1 ]' in cmd + assert cmd.endswith("fi") + + def test_windows_kill_terminates_process_tree(self): + """Windows kill terminates the process and its child tree via taskkill /T.""" cmd = build_windows_kill_command("C:\\temp\\pid") assert "powershell.exe" in cmd assert "-EncodedCommand" in cmd encoded_script = cmd.split("-EncodedCommand ")[1] decoded_script = base64.b64decode(encoded_script).decode("utf-16-le") - assert "Stop-Process" in decoded_script + assert "taskkill" in decoded_script + assert "/T" in decoded_script # tree kill (process + children) + # $PID is a read-only automatic variable in PowerShell; must not be assigned + assert "$procId" in decoded_script + assert "$pid =" not in decoded_script + + +@pytest.mark.skipif( + os.name != "posix" or shutil.which("setsid") is None or shutil.which("bash") is None, + reason="needs a POSIX host with bash and setsid to exercise process-group teardown", +) +class TestPosixKillBehaviour: + """End-to-end check that on_kill tears down the whole job tree, not just the wrapper. + + Regression test for the orphaned-process bug: killing only the recorded PID left the + user command (and its children) running, so the exit_code file was never written and + the trigger timed out. The job now runs in its own process group and the kill signals + the group. + """ + + @staticmethod + def _group_alive(pgid: int) -> bool: + # pgrep -g matches by process-group id; rc 0 => at least one member alive. + return subprocess.run(["pgrep", "-g", str(pgid)], capture_output=True, check=False).returncode == 0 + + def test_kill_terminates_whole_job_tree(self, tmp_path): + paths = RemoteJobPaths(job_id="killtree", remote_os="posix", base_dir=str(tmp_path / "jobs")) + # `sleep 300` runs as a child of the wrapper subshell -> the tree the old kill orphaned. + # Run under bash, which is the remote login shell this operator requires (the wrapper + # uses `set -o pipefail`); the kill is run the same way below. + wrapper = build_posix_wrapper_command("sleep 300", paths) + subprocess.run(["bash", "-c", wrapper], check=True, capture_output=True, text=True) + + # The launcher records $! synchronously, so the pid file is present on return. + pid_path = Path(paths.pid_file) + assert pid_path.exists(), "job never wrote its pid file" + pid_text = pid_path.read_text().strip() + assert pid_text, "pid file is empty" + pgid = int(pid_text) + + try: + assert self._group_alive(pgid), "job tree should be running before kill" + + subprocess.run(["bash", "-c", build_posix_kill_command(paths.pid_file)], check=True) + + deadline = time.monotonic() + 5 + while time.monotonic() < deadline and self._group_alive(pgid): + time.sleep(0.05) + assert not self._group_alive(pgid), "kill left part of the job tree running" + finally: + # Belt-and-suspenders: never leave a stray `sleep 300` behind if an assert fails. + subprocess.run(["bash", "-c", f"kill -9 -{pgid} 2>/dev/null || true"], check=False) class TestCleanupCommands: