Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -660,10 +660,12 @@ async def _process_terminating_job(

jrd = get_job_runtime_data(job_model)
jpd = get_job_provisioning_data(job_model)
if jpd is not None:
if jpd is not None and jpd.hostname is not None and jpd.ssh_port is not None:
logger.debug("%s: stopping container", fmt(job_model))
ssh_private_keys = get_instance_ssh_private_keys(instance_model)
if not await _stop_container(job_model, jpd, ssh_private_keys):
# Dangling containers (tasks) are cleared periodically on instance checks by
# `remove_dangling_tasks_from_instance()`
logger.warning(
(
"%s: could not stop container, possibly due to a communication error."
Expand Down
12 changes: 6 additions & 6 deletions src/dstack/_internal/server/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,13 @@ async def stop_runner(job_model: JobModel, instance_model: InstanceModel):
`instance_model.project` must be loaded because SSH key resolution uses the project keys.
"""
ssh_private_keys = get_instance_ssh_private_keys(instance_model)
try:
jpd = get_job_provisioning_data(job_model)
if jpd is not None:
jrd = get_job_runtime_data(job_model)
jpd = get_job_provisioning_data(job_model)
if jpd is not None:
jrd = get_job_runtime_data(job_model)
try:
await run_async(_stop_runner, ssh_private_keys, jpd, jrd, job_model)
except SSHError:
logger.debug("%s: failed to stop runner", fmt(job_model))
except SSHError:
logger.debug("%s: failed to stop runner", fmt(job_model))


@runner_ssh_tunnel
Expand Down
5 changes: 5 additions & 0 deletions src/dstack/_internal/server/services/runner/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def wrapper(
Returns:
is successful
"""
if job_provisioning_data.hostname is None or job_provisioning_data.ssh_port is None:
# The callers may try to establish tunnels even if hostname/ssh_port is missing
# and rely on `False` being returned in this case.
return False

if not settings.SERVER_SSH_POOL_ENABLED or not job_provisioning_data.dockerized:
# Connections from dstack-server to runner's sshd are expected to be short
# as the `inactivity_duration` feature distinguishes user and server connections based on duration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,3 +902,86 @@ async def test_keeps_related_instance_locked_on_processing_exception(
assert job.lock_owner == JobTerminatingPipeline.__name__
assert instance.lock_token == job_lock_token
assert instance.lock_owner == _get_related_instance_lock_owner(job.id)

async def test_stops_job_gracefully_without_provisioning_data_hostname(
self, test_db, session: AsyncSession, worker: JobTerminatingWorker
):
# Regression test for https://github.com/dstackai/dstack/issues/3950.
# Stopping a job that is still provisioning (no hostname/ssh_port yet) must not raise
# when the graceful stop tries to open an SSH tunnel to the runner.
project = await create_project(session=session)
user = await create_user(session=session)
instance = await create_instance(
session=session,
project=project,
status=InstanceStatus.BUSY,
)
repo = await create_repo(session=session, project_id=project.id)
run = await create_run(session=session, project=project, repo=repo, user=user)
jpd = get_job_provisioning_data(dockerized=True)
jpd.hostname = None
jpd.ssh_port = None
job = await create_job(
session=session,
run=run,
status=JobStatus.TERMINATING,
termination_reason=JobTerminationReason.TERMINATED_BY_USER,
job_provisioning_data=jpd,
instance=instance,
)
job.graceful_termination_attempts = 0
_lock_job(job)
await session.commit()

await worker.process(_job_to_pipeline_item(job))

await session.refresh(job)
assert job.status == JobStatus.TERMINATING
assert job.graceful_termination_attempts == 1
assert job.remove_at is not None
assert job.instance_id == instance.id

async def test_terminates_job_without_provisioning_data_hostname(
self, test_db, session: AsyncSession, worker: JobTerminatingWorker
):
# Regression test for https://github.com/dstackai/dstack/issues/3950.
# The container stop is skipped (and must not raise) when the job has no hostname/ssh_port.
# Dangling containers are cleared later on instance checks by `remove_dangling_tasks_from_instance()`.
project = await create_project(session=session)
user = await create_user(session=session)
instance = await create_instance(
session=session,
project=project,
status=InstanceStatus.BUSY,
)
repo = await create_repo(session=session, project_id=project.id)
run = await create_run(session=session, project=project, repo=repo, user=user)
jpd = get_job_provisioning_data(dockerized=True)
jpd.hostname = None
jpd.ssh_port = None
job = await create_job(
session=session,
run=run,
status=JobStatus.TERMINATING,
termination_reason=JobTerminationReason.TERMINATED_BY_USER,
job_provisioning_data=jpd,
instance=instance,
)
job.graceful_termination_attempts = 1
job.remove_at = get_current_datetime() - timedelta(minutes=1)
_lock_job(job)
await session.commit()

with patch(
"dstack._internal.server.background.pipeline_tasks.jobs_terminating._stop_container",
new=AsyncMock(return_value=True),
) as stop_container:
await worker.process(_job_to_pipeline_item(job))

stop_container.assert_not_awaited()

await session.refresh(job)
await session.refresh(instance)
assert job.status == JobStatus.TERMINATED
assert job.instance_id is None
assert instance.status == InstanceStatus.IDLE
Loading