Add instances option to target specific fleet instances#3925
Add instances option to target specific fleet instances#3925fededagos wants to merge 12 commits into
Conversation
Introduce an `instances` run profile option that pins a run to specific existing fleet instances (nodes). Each value matches an instance by its name (e.g. `my-fleet-0`) or by its hostname/IP address. When set, `filter_instances` keeps only matching instances and the job assignment phase never provisions new capacity to satisfy a node selector, terminating with a no-capacity error instead.
Reject runs that target fewer instances than the number of nodes they require, surfaced during planning via `validate_run_spec_and_set_defaults`. Exclude new-capacity backend offers from the run plan when `instances` is set, since they are never provisioned and would otherwise mislead the `dstack apply`/`dstack offer` output.
Add a 'Targeting specific instances' section to the shared fleets snippet (dev environments, tasks, services) and a corresponding tip in the protips guide.
Handle an explicit empty `instances` list consistently across the assignment gate, plan output, and instance filtering by checking `is not None` instead of truthiness, so an empty list targets existing instances only (rather than silently allowing new-capacity provisioning and showing unusable offers). Add regression tests ensuring the instance selector is applied on the multinode and shared-instances filter paths.
instances option to target specific fleet nodes| def _validate_fleet_instance_selector_fleet(v: str) -> str: | ||
| EntityReference.parse(v) | ||
| return v | ||
|
|
||
|
|
||
| class FleetInstanceSelector(CoreModel): | ||
| fleet: Annotated[ | ||
| str, | ||
| Field( | ||
| description=( | ||
| "The fleet name. For fleets owned by the current project, specify the fleet name." | ||
| " For a fleet from another project, specify `<project name>/<fleet name>`" | ||
| ), | ||
| min_length=1, | ||
| ), | ||
| ] | ||
| instance: Annotated[int, Field(description="The fleet instance number", ge=0)] | ||
|
|
||
| _validate_fleet = validator("fleet", allow_reuse=True)(_validate_fleet_instance_selector_fleet) |
There was a problem hiding this comment.
(nit) I would annotate fleet as EntityReference instead of str. That would:
- remove the need for parsing it on each access;
- allow the type checker to enforce correct usage across the codebase;
- allow to optionally use the verbose object notation in configurations, which would be consistent with other properties (1, 2);
instances: - fleet: project: main name: my-fleet instance: 0
And also add str as an option for this field in schema_extra
There was a problem hiding this comment.
Annotated fleet as EntityReference, with the string shorthand parsed in a pre-validator and str kept in the schema via schema_extra, following the fleets field pattern. The object notation is now accepted in configurations.
| _validate_fleet = validator("fleet", allow_reuse=True)(_validate_fleet_instance_selector_fleet) | ||
|
|
||
|
|
||
| InstanceSelector = Union[InstanceNameSelector, InstanceHostnameSelector, FleetInstanceSelector] |
There was a problem hiding this comment.
(nit) Not described in the .dstack.yml references. Consider adding a section similar to volumes
There was a problem hiding this comment.
Added instances sections to the dev-environment, task, and service references with tabs per selector type and a short-syntax note.
| async def _load_fleet_project_if_needed( | ||
| session: AsyncSession, | ||
| fleet_model: Optional[FleetModel], | ||
| ) -> None: | ||
| if fleet_model is None or "project" not in sa_inspect(fleet_model).unloaded: | ||
| return | ||
| await session.execute( | ||
| select(FleetModel) | ||
| .where(FleetModel.id == fleet_model.id) | ||
| .options(joinedload(FleetModel.project)) | ||
| .execution_options(populate_existing=True) | ||
| ) |
There was a problem hiding this comment.
(nit) This is a rather unusual pattern for our codebase. Our typical pattern is to load all the required relationships when fetching the model from the database (in this case, in _load_submitted_job_context), which I think is preferred, as it avoids extra roundtrips to the database
There was a problem hiding this comment.
Folded the fleet project load into the existing _load_submitted_job_context / _fetch_run_model_for_submitted_job queries and removed _load_fleet_project_if_needed.
| def instance_matches_hostname_selector( | ||
| instance: InstanceModel, selector: InstanceHostnameSelector | ||
| ) -> bool: | ||
| candidates = set() | ||
| jpd = get_instance_provisioning_data(instance) | ||
| if jpd is not None and jpd.hostname is not None: | ||
| candidates.add(jpd.hostname.lower()) | ||
| rci = get_instance_remote_connection_info(instance) | ||
| if rci is not None: | ||
| candidates.add(rci.host.lower()) | ||
| return selector.hostname.lower() in candidates |
There was a problem hiding this comment.
Match by private_ip too? I would expect it based on InstanceHostnameSelector.hostname description.
The fleet instance hostname or IP address
There was a problem hiding this comment.
The selector now also matches JobProvisioningData.internal_ip.
| def instance_matches_fleet_instance_selector( | ||
| instance: InstanceModel, | ||
| selector: FleetInstanceSelector, | ||
| *, | ||
| project: Optional[ProjectModel] = None, | ||
| fleet: Optional[FleetModel] = None, | ||
| ) -> bool: | ||
| fleet_ref = EntityReference.parse(selector.fleet) | ||
|
|
||
| if fleet is None: | ||
| # Avoid triggering a lazy load in async code. | ||
| if "fleet" in sa_inspect(instance).unloaded or instance.fleet is None: | ||
| return False | ||
| fleet = instance.fleet | ||
|
|
||
| if fleet.name.lower() != fleet_ref.name.lower(): | ||
| return False | ||
| if instance.instance_num != selector.instance: | ||
| return False | ||
|
|
||
| if fleet_ref.project is None: | ||
| if project is not None and fleet.project_id != project.id: | ||
| return False | ||
| return True | ||
|
|
||
| if "project" in sa_inspect(fleet).unloaded or fleet.project is None: | ||
| return False | ||
| return fleet.project.name.lower() == fleet_ref.project.lower() |
There was a problem hiding this comment.
(nit) Looks quite error-prone to me. The function relies on several data sources at once (fleet vs instance.fleet, project vs fleet.project vs instance.fleet.project), and silently returns a potentially incorrect result if the caller doesn't provide the right arguments (e.g., if fleet is not provided and instance.fleet is not loaded).
I.e., the correctness of the function depends on the caller following some implicit contracts. To me, it's quite difficult to tell whether all code paths follow these contracts and whether they will keep doing so in the future.
If possible, I'd prefer the function (and any dependent code paths) to only accept instance and selector and require all the necessary relationships to be loaded.
There was a problem hiding this comment.
Reworked the contract: the matchers now take only the instance, the selector, and the current project (required — needed to interpret unqualified fleet references). The fleet argument and the unloaded-relationship fallbacks are gone: instance.fleet is populated at load time (set_committed_value — SQLAlchemy doesn't populate the reverse many-to-one when loading through FleetModel.instances) and fleet.project is always eager-loaded, so a missing relationship fails loudly instead of silently not matching. This also removed the conditional load_fleet_project plumbing.
| ) -> List[InstanceModel]: | ||
| fleet_load = joinedload(InstanceModel.fleet) | ||
| if load_fleet_project: | ||
| fleet_load = fleet_load.joinedload(FleetModel.project) |
There was a problem hiding this comment.
Added .load_only(ProjectModel.name) to the fleet project loads.
| # If `instances` is set, backend offers cannot satisfy the run. Otherwise, | ||
| # keep the existing optimization that skips backend requests when pool | ||
| # capacity is already enough. |
There was a problem hiding this comment.
(nit) The comment explains how this PR changes the code ("keep the existing optimization") instead of explaining what the code does. The reader won't understand the comment without seeing the previous version
There was a problem hiding this comment.
Rewrote the comment to describe the current behavior.
| instances = run_spec.merged_profile.instances | ||
| if instances is not None: | ||
| nodes_required_num = get_nodes_required_num(run_spec) | ||
| if len(instances) < nodes_required_num: | ||
| raise ServerClientError( | ||
| f"`instances` specifies {len(instances)} instance(s)" | ||
| f" but the run requires {nodes_required_num} nodes." | ||
| " Specify at least as many instances as nodes." | ||
| ) |
There was a problem hiding this comment.
Even if there are less instances than nodes_required_num, they may still be able to accommodate the run if they have enough blocks.
There are a few other places in the PR that appear to not take blocks into consideration (search by required_instance_offers)
There was a problem hiding this comment.
Right about services: replicas can pack onto one instance with enough idle blocks, so the up-front check now applies only to multinode tasks, where each node takes a whole instance (min_blocks = total_blocks for multinode in get_shared_instances_with_offers, and multinode skips instances with any busy block). For the same reason the required_instance_offers comparisons should be blocks-safe: len(jobs_to_provision) > 1 only happens in the multinode master path, and each instance yields at most one offer (the block-size loop breaks on first match), so offer count equals distinct usable instances. Let me know if there's a path I'm missing.
Adds an
instancesoption to run configurations (dev environments, tasks, services) that restricts a run to specific existing fleet instances.Syntax
Long forms:
Short form for matching by instance name:
The
fleetform also supports<project name>/<fleet name>for fleets from another project.Behavior
instanceshas allow-list semantics: a run is placed only on a matching existing instance.instancesis set,dstacknever provisions new instances to satisfy the run.retrycan be used to wait for a selected busy instance to free up.instancesis set because they cannot satisfy the selector.Implementation
ProfileParams:name,hostname, andfleet+instance, while preserving the string shorthand as an instance-name selector.instancesfor older client/server compatibility paths.Docs
Updated the shared fleet-management snippet and protips guide. The docs promote the explicit syntax first and keep the short instance-name syntax in a collapsible section.
Testing
uv run ruff check .uv run pyright -p .uv run pytest—2607 passed, 1055 skippeddstack server:fleetsplus all fourinstancessyntaxes completed successfully.instancessyntaxes completed successfully.AI Assistance
This PR includes AI-assisted changes. The original PR noted Claude Code assistance; follow-up schema, implementation review, tests, docs, and E2E verification were assisted by Codex.