Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions providers/ssh/src/airflow/providers/ssh/utils/remote_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,12 @@ def build_posix_wrapper_command(
environment: dict[str, str] | None = None,
) -> str:
"""
Build a POSIX shell wrapper that runs the command detached via nohup.
Build a POSIX shell wrapper that runs the command detached.

The wrapper:
- Creates the job directory
- Starts the command in the background with nohup
- Starts the command detached in its own session via ``setsid`` (falling back to
``nohup`` when ``setsid`` is unavailable)
- Redirects stdout/stderr to the log file
- Writes the exit code atomically on completion
- Writes the PID for potential cancellation
Expand All @@ -181,6 +182,12 @@ def build_posix_wrapper_command(

escaped_command = command.replace("'", "'\"'\"'")

# Launch detached under ``setsid`` so the job is its own session/process-group
# leader. ``$!`` is then the leader PID *and* the PGID (verified: setsid does not
# fork when started as a background job), recorded synchronously just like before,
# so cancellation can signal the whole job tree instead of orphaning the user
# command. Without ``setsid`` (some macOS/BSD hosts) ``$!`` is just the wrapper PID
# and cancellation degrades to the previous single-process behaviour.
wrapper = f"""set -euo pipefail
job_dir='{paths.job_dir}'
log_file='{paths.log_file}'
Expand All @@ -192,7 +199,7 @@ def build_posix_wrapper_command(
mkdir -p "$job_dir"
: > "$log_file"

nohup bash -c '
job_script='
set +e
export LOG_FILE="'"$log_file"'"
export STATUS_FILE="'"$status_file"'"
Expand All @@ -201,8 +208,13 @@ def build_posix_wrapper_command(
echo -n "$ec" > "'"$exit_code_tmp"'"
mv "'"$exit_code_tmp"'" "'"$exit_code_file"'"
exit 0
' >/dev/null 2>&1 &
'

if command -v setsid >/dev/null 2>&1; then
setsid bash -c "$job_script" >/dev/null 2>&1 &
else
nohup bash -c "$job_script" >/dev/null 2>&1 &
fi
echo -n $! > "$pid_file"
echo "{paths.job_id}"
"""
Expand Down Expand Up @@ -379,24 +391,48 @@ def build_posix_kill_command(pid_file: str) -> str:
"""
Build a POSIX command to kill the remote process.

Signals the whole process group first (the negative-PID form ``kill -<pgid>``) so
the user command and anything it spawned are terminated together, not just the
wrapper. The recorded PID is the job's session/group leader when it was launched
under ``setsid`` (see :func:`build_posix_wrapper_command`); if the job is not a
group leader (host without ``setsid``), the group signal is a no-op and we fall
back to killing the single PID, matching the previous behaviour.

The pid value is validated as an integer ``> 1`` before being negated: a corrupt
or partial pid of ``0``/``1`` would otherwise turn ``kill -<pid>`` into a broadcast
to every process the SSH account can signal. Best-effort: the command never fails
the SSH call.

:param pid_file: Path to the PID file
:return: Shell command to kill the process
"""
return f"test -f '{pid_file}' && kill $(cat '{pid_file}') 2>/dev/null || true"
return (
f"if test -f '{pid_file}'; then "
f"p=\"$(cat '{pid_file}')\"; "
f'if [ "$p" -gt 1 ] 2>/dev/null; then '
f'kill -TERM -"$p" 2>/dev/null || kill -TERM "$p" 2>/dev/null || true; '
"fi; fi"
)


def build_windows_kill_command(pid_file: str) -> str:
"""
Build a PowerShell command to kill the remote process.

Uses ``taskkill /T`` to terminate the recorded process *and its child
processes* (the Windows equivalent of a process-group kill), so the user
command launched by the detached wrapper is not left orphaned. ``$procId`` is
used instead of ``$pid`` because ``$PID`` is a read-only automatic variable in
PowerShell.

:param pid_file: Path to the PID file
:return: PowerShell command to kill the process
"""
escaped_path = pid_file.replace("'", "''")
script = f"""$path = '{escaped_path}'
if (Test-Path $path) {{
$pid = Get-Content $path
Stop-Process -Id $pid -Force -ErrorAction SilentlyContinue
$procId = Get-Content $path
& taskkill.exe /PID $procId /T /F 2>$null
}}"""
script_bytes = script.encode("utf-16-le")
encoded_script = base64.b64encode(script_bytes).decode("ascii")
Expand Down
90 changes: 82 additions & 8 deletions providers/ssh/tests/unit/ssh/utils/test_remote_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
from __future__ import annotations

import base64
import os
import shutil
import subprocess
import time
from pathlib import Path

import pytest

Expand Down Expand Up @@ -126,6 +131,18 @@ def test_escapes_quotes(self):
wrapper = build_posix_wrapper_command("echo 'hello world'", paths)
assert wrapper is not None

def test_runs_in_own_process_group(self):
"""The job launches under setsid (when available); $! is the leader PID/PGID."""
paths = RemoteJobPaths(job_id="test_job", remote_os="posix")
wrapper = build_posix_wrapper_command("/path/to/script.sh", paths)

# New session/process group when setsid exists, plain detached run otherwise
assert "command -v setsid" in wrapper
assert "setsid bash -c" in wrapper
assert "nohup bash -c" in wrapper
# Leader PID recorded synchronously by the launcher ($! == PGID under setsid)
assert 'echo -n $! > "$pid_file"' in wrapper


class TestBuildWindowsWrapperCommand:
def test_basic_command(self):
Expand Down Expand Up @@ -210,20 +227,77 @@ def test_windows_completion_check(self):


class TestKillCommands:
def test_posix_kill(self):
"""Test POSIX kill command."""
def test_posix_kill_signals_process_group_then_falls_back(self):
"""POSIX kill targets the process group first, then a single PID as fallback."""
cmd = build_posix_kill_command("/tmp/pid")
assert "kill" in cmd
assert "cat" in cmd

def test_windows_kill(self):
"""Test Windows kill command."""
assert "cat '/tmp/pid'" in cmd
# Negative PID => signal the whole process group (kills the job's children too)
assert 'kill -TERM -"$p"' in cmd
# Fallback for jobs that are not group leaders (host without setsid)
assert 'kill -TERM "$p"' in cmd
# Guard against a corrupt/partial pid: -0/-1 would broadcast to every process
assert '[ "$p" -gt 1 ]' in cmd
assert cmd.endswith("fi")

def test_windows_kill_terminates_process_tree(self):
"""Windows kill terminates the process and its child tree via taskkill /T."""
cmd = build_windows_kill_command("C:\\temp\\pid")
assert "powershell.exe" in cmd
assert "-EncodedCommand" in cmd
encoded_script = cmd.split("-EncodedCommand ")[1]
decoded_script = base64.b64decode(encoded_script).decode("utf-16-le")
assert "Stop-Process" in decoded_script
assert "taskkill" in decoded_script
assert "/T" in decoded_script # tree kill (process + children)
# $PID is a read-only automatic variable in PowerShell; must not be assigned
assert "$procId" in decoded_script
assert "$pid =" not in decoded_script


@pytest.mark.skipif(
os.name != "posix" or shutil.which("setsid") is None or shutil.which("bash") is None,
reason="needs a POSIX host with bash and setsid to exercise process-group teardown",
)
class TestPosixKillBehaviour:
"""End-to-end check that on_kill tears down the whole job tree, not just the wrapper.

Regression test for the orphaned-process bug: killing only the recorded PID left the
user command (and its children) running, so the exit_code file was never written and
the trigger timed out. The job now runs in its own process group and the kill signals
the group.
"""

@staticmethod
def _group_alive(pgid: int) -> bool:
# pgrep -g matches by process-group id; rc 0 => at least one member alive.
return subprocess.run(["pgrep", "-g", str(pgid)], capture_output=True, check=False).returncode == 0

def test_kill_terminates_whole_job_tree(self, tmp_path):
paths = RemoteJobPaths(job_id="killtree", remote_os="posix", base_dir=str(tmp_path / "jobs"))
# `sleep 300` runs as a child of the wrapper subshell -> the tree the old kill orphaned.
# Run under bash, which is the remote login shell this operator requires (the wrapper
# uses `set -o pipefail`); the kill is run the same way below.
wrapper = build_posix_wrapper_command("sleep 300", paths)
subprocess.run(["bash", "-c", wrapper], check=True, capture_output=True, text=True)

# The launcher records $! synchronously, so the pid file is present on return.
pid_path = Path(paths.pid_file)
assert pid_path.exists(), "job never wrote its pid file"
pid_text = pid_path.read_text().strip()
assert pid_text, "pid file is empty"
pgid = int(pid_text)

try:
assert self._group_alive(pgid), "job tree should be running before kill"

subprocess.run(["bash", "-c", build_posix_kill_command(paths.pid_file)], check=True)

deadline = time.monotonic() + 5
while time.monotonic() < deadline and self._group_alive(pgid):
time.sleep(0.05)
assert not self._group_alive(pgid), "kill left part of the job tree running"
finally:
# Belt-and-suspenders: never leave a stray `sleep 300` behind if an assert fails.
subprocess.run(["bash", "-c", f"kill -9 -{pgid} 2>/dev/null || true"], check=False)


class TestCleanupCommands:
Expand Down
Loading